免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 1871 | 回复: 0
打印 上一主题 下一主题

用Oracle Coherence实现Toplink Session Cache之间的数据同步(2) [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2008-05-23 20:14 |只看该作者 |倒序浏览
1)        CoherenceSessionHelper将transport manager添加到当前session中支持server和database两种session
package com.oocl.isdc.sha.frm.tts.remotecommand;

import oracle.toplink.remotecommand.CommandProcessor;
import oracle.toplink.remotecommand.RemoteCommandManager;
import oracle.toplink.sessions.DatabaseSession;
import oracle.toplink.sessions.Session;
import oracle.toplink.threetier.Server;

import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;
import com.oocl.isdc.sha.frm.tts.config.TTSConfigParser;

public class CoherenceSessionHelper {
    public static Session addCoherenceTransportManagerToSession(Session session) {
        RemoteCommandManager commandMgr = new RemoteCommandManager((CommandProcessor) session);
        commandMgr.setChannel((String) TTSConfigParser.getInstance().get(
                TTSConfigConstants.TOPLINK_COMMAND_CHANNEL_KEY));
        CoherenceTransportManager tm = new CoherenceTransportManager(commandMgr);
        tm.setInitialContextFactoryName(TTSConfigConstants.TTS_CONTEXT_FACTOYR_NAME);
        commandMgr.setUrl(tm.getServiceUrl());
        commandMgr.setTransportManager(tm);
        tm.setShouldRemoveConnectionOnError(true);
        if (session instanceof Server) {
            ((Server) session).setCommandManager(commandMgr);
            ((Server) session).setShouldPropagateChanges(true);
            ((Server) session).getCommandManager().initialize();
        } else if (session instanceof DatabaseSession) {
            ((DatabaseSession) session).setCommandManager(commandMgr);
            ((DatabaseSession) session).setShouldPropagateChanges(true);
            ((DatabaseSession) session).getCommandManager().initialize();
        } else {
            throw new IllegalArgumentException("Session must be a server or database session");
        }
        return session;
    }
}
2)        CoherenceCache:自定义一个Coherence NamedCache

package com.oocl.isdc.sha.frm.tts.cohererence.cache;

import java.net.InetAddress;
import java.util.Collection;

import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.Cluster;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.Service;
import com.tangosol.util.MapListener;

public class CoherenceCache {
    private NamedCache namedCache;

    public CoherenceCache() {
        this.namedCache = CacheFactory.getCache("toplinkSyn");
    }

    public void putCache(Object key, Object value) {
        namedCache.put(key, value);
    }

    public Object retrieveCache(Object key) {
        return namedCache.get(key);
    }

    public NamedCache getNamedCache() {
        return namedCache;
    }
   
    public void addMapListener(MapListener listener) {
        this.namedCache.addMapListener(listener);
    }
   
    public void removeMapListener(MapListener listener) {
        this.namedCache.removeMapListener(listener);
    }
   
    @SuppressWarnings("unchecked")
    public Collection retrieveCacheAll() {
       return  this.namedCache.values();
    }
   
    public String getUrl() {
        Member member = getLocalMember();
        InetAddress address = member.getAddress();
        String ipAddress = address.getHostAddress();
        int port = member.getPort();
        StringBuffer sb = new StringBuffer(TTSConfigConstants.TOPLINK_SERVICEID_PREFIX)
            .append(ipAddress).append(":").append(port);
        return sb.toString();
    }
   
    public Member getLocalMember() {
        Service service = namedCache.getCacheService();
        Cluster cluster = service.getCluster();   
        Member member = cluster.getLocalMember();
        return member;
    }
}

3)        CoherenceDiscoveryManager继承DiscoveryManager,主要重写了startDiscovery和stopDiscovery方法

package com.oocl.isdc.sha.frm.tts.remotecommand;

import oracle.toplink.exceptions.ValidationException;
import oracle.toplink.remotecommand.DiscoveryManager;
import oracle.toplink.remotecommand.RemoteCommandManager;

public class CoherenceDiscoveryManager extends DiscoveryManager {
   
    public CoherenceDiscoveryManager(RemoteCommandManager rcm) {
        super(rcm);
    }

    public RemoteCommandManager getRemoteCommandManager() {
        return this.rcm;
    }
   
    public boolean isDiscoveryStopped() {
        throw ValidationException.operationNotSupported("isDiscoveryStopped");
    }

    public void startDiscovery() {
        ((CoherenceTransportManager)rcm.getTransportManager()).createLocalConnection();
    }
   
    /**
     * We must implement this method, and we keep it blank
     */
    public void stopDiscovery() {
      
    }

    public void setAnnouncementDelay(int millisecondsToDelay) {
        throw ValidationException.operationNotSupported("setAnnouncementDelay");
    }

    public int getAnnouncementDelay() {
        throw ValidationException.operationNotSupported("getAnnouncementDelay");
    }

    public String getMulticastGroupAddress() {
        throw ValidationException.operationNotSupported("getMulticastGroupAddress");
    }

    public void setMulticastGroupAddress(String address) {
        throw ValidationException.operationNotSupported("setMulticastGroupAddress");
    }


    public void setMulticastPort(int port) {
        throw ValidationException.operationNotSupported("setMulticastPort");
    }

    public int getMulticastPort() {
        throw ValidationException.operationNotSupported("getMulticastPort");
    }
}

3           命令行下mvn clean install部署Demo,部署的oc4j instance信息需要在ear下的pom.xml中给出。
    properties>
       home.j2ee>D:/oc4j_extended_101310/j2ee/homehome.j2ee>
       oc4j.host>localhostoc4j.host>
       rmi.port>23791rmi.port>
       deploy.username>oc4jadmindeploy.username>
       deploy.password>welcomedeploy.password>
    properties>
因为我们需要检测cache同步还需要部署到另外一个oc4j instance上,需要修改build.properties为:
coherence.member.name = tts-server1
coherence.local.address = 146.222.51.20
coherence.local.port = 8088

ear下的pom.xml为:
properties>
       home.j2ee>C:/oc4j_extended_101310/j2ee/homehome.j2ee>
       oc4j.host>localhostoc4j.host>
       rmi.port>23792rmi.port>
       deploy.username>oc4jadmindeploy.username>
       deploy.password>welcomedeploy.password>
    properties>

4           通过浏览器访问demo,并检测cache实现,我们在一个oc4j instance上修改employee的数据在另外一个oc4j的instance中就会立即呈现这个改变。并可以看到merge employee的相关toplink的log。如果没有cache同步(demo的cache和cache 同步策略都基本采用了toplink的默认配置),因为employee上有乐观锁,当在另外一个oc4j instance或者说服务器上修改employee的数据,就会出现乐观锁异常,会rollback此次修改,如果有cache同步就会拿到最新的数据而不需要直接访问db,最新的数据和db一致,大大减少了乐观锁出现的频率。当然为了辅助更好的使用cache同步我们还需要定义cache invalidation机制。当然还有很多其他的cache策略避免出现脏数据。
Toplink Log:
[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received remote command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]
[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Executing command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]
[TopLink 较详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received updates from Remote Server
[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Merging com.oocl.isdc.sha.frm.tts.model.Employee: [558] from remote server
页面演示:





上面两个网页截图上的url表明是两个不同的oc4j server。并且他们用了同一个db也是同一个application。在在server1上修改数据:






本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u/19897/showart_702911.html
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP