- 论坛徽章:
- 0
|
1) CoherenceSessionHelper将transport manager添加到当前session中支持server和database两种session
package com.oocl.isdc.sha.frm.tts.remotecommand;
import oracle.toplink.remotecommand.CommandProcessor;
import oracle.toplink.remotecommand.RemoteCommandManager;
import oracle.toplink.sessions.DatabaseSession;
import oracle.toplink.sessions.Session;
import oracle.toplink.threetier.Server;
import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;
import com.oocl.isdc.sha.frm.tts.config.TTSConfigParser;
public class CoherenceSessionHelper {
public static Session addCoherenceTransportManagerToSession(Session session) {
RemoteCommandManager commandMgr = new RemoteCommandManager((CommandProcessor) session);
commandMgr.setChannel((String) TTSConfigParser.getInstance().get(
TTSConfigConstants.TOPLINK_COMMAND_CHANNEL_KEY));
CoherenceTransportManager tm = new CoherenceTransportManager(commandMgr);
tm.setInitialContextFactoryName(TTSConfigConstants.TTS_CONTEXT_FACTOYR_NAME);
commandMgr.setUrl(tm.getServiceUrl());
commandMgr.setTransportManager(tm);
tm.setShouldRemoveConnectionOnError(true);
if (session instanceof Server) {
((Server) session).setCommandManager(commandMgr);
((Server) session).setShouldPropagateChanges(true);
((Server) session).getCommandManager().initialize();
} else if (session instanceof DatabaseSession) {
((DatabaseSession) session).setCommandManager(commandMgr);
((DatabaseSession) session).setShouldPropagateChanges(true);
((DatabaseSession) session).getCommandManager().initialize();
} else {
throw new IllegalArgumentException("Session must be a server or database session");
}
return session;
}
}
2) CoherenceCache:自定义一个Coherence NamedCache
package com.oocl.isdc.sha.frm.tts.cohererence.cache;
import java.net.InetAddress;
import java.util.Collection;
import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.Cluster;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.Service;
import com.tangosol.util.MapListener;
public class CoherenceCache {
private NamedCache namedCache;
public CoherenceCache() {
this.namedCache = CacheFactory.getCache("toplinkSyn");
}
public void putCache(Object key, Object value) {
namedCache.put(key, value);
}
public Object retrieveCache(Object key) {
return namedCache.get(key);
}
public NamedCache getNamedCache() {
return namedCache;
}
public void addMapListener(MapListener listener) {
this.namedCache.addMapListener(listener);
}
public void removeMapListener(MapListener listener) {
this.namedCache.removeMapListener(listener);
}
@SuppressWarnings("unchecked")
public Collection retrieveCacheAll() {
return this.namedCache.values();
}
public String getUrl() {
Member member = getLocalMember();
InetAddress address = member.getAddress();
String ipAddress = address.getHostAddress();
int port = member.getPort();
StringBuffer sb = new StringBuffer(TTSConfigConstants.TOPLINK_SERVICEID_PREFIX)
.append(ipAddress).append(":").append(port);
return sb.toString();
}
public Member getLocalMember() {
Service service = namedCache.getCacheService();
Cluster cluster = service.getCluster();
Member member = cluster.getLocalMember();
return member;
}
}
3) CoherenceDiscoveryManager继承DiscoveryManager,主要重写了startDiscovery和stopDiscovery方法
package com.oocl.isdc.sha.frm.tts.remotecommand;
import oracle.toplink.exceptions.ValidationException;
import oracle.toplink.remotecommand.DiscoveryManager;
import oracle.toplink.remotecommand.RemoteCommandManager;
public class CoherenceDiscoveryManager extends DiscoveryManager {
public CoherenceDiscoveryManager(RemoteCommandManager rcm) {
super(rcm);
}
public RemoteCommandManager getRemoteCommandManager() {
return this.rcm;
}
public boolean isDiscoveryStopped() {
throw ValidationException.operationNotSupported("isDiscoveryStopped");
}
public void startDiscovery() {
((CoherenceTransportManager)rcm.getTransportManager()).createLocalConnection();
}
/**
* We must implement this method, and we keep it blank
*/
public void stopDiscovery() {
}
public void setAnnouncementDelay(int millisecondsToDelay) {
throw ValidationException.operationNotSupported("setAnnouncementDelay");
}
public int getAnnouncementDelay() {
throw ValidationException.operationNotSupported("getAnnouncementDelay");
}
public String getMulticastGroupAddress() {
throw ValidationException.operationNotSupported("getMulticastGroupAddress");
}
public void setMulticastGroupAddress(String address) {
throw ValidationException.operationNotSupported("setMulticastGroupAddress");
}
public void setMulticastPort(int port) {
throw ValidationException.operationNotSupported("setMulticastPort");
}
public int getMulticastPort() {
throw ValidationException.operationNotSupported("getMulticastPort");
}
}
3 命令行下mvn clean install部署Demo,部署的oc4j instance信息需要在ear下的pom.xml中给出。
properties>
home.j2ee>D:/oc4j_extended_101310/j2ee/homehome.j2ee>
oc4j.host>localhostoc4j.host>
rmi.port>23791rmi.port>
deploy.username>oc4jadmindeploy.username>
deploy.password>welcomedeploy.password>
properties>
因为我们需要检测cache同步还需要部署到另外一个oc4j instance上,需要修改build.properties为:
coherence.member.name = tts-server1
coherence.local.address = 146.222.51.20
coherence.local.port = 8088
ear下的pom.xml为:
properties>
home.j2ee>C:/oc4j_extended_101310/j2ee/homehome.j2ee>
oc4j.host>localhostoc4j.host>
rmi.port>23792rmi.port>
deploy.username>oc4jadmindeploy.username>
deploy.password>welcomedeploy.password>
properties>
4 通过浏览器访问demo,并检测cache实现,我们在一个oc4j instance上修改employee的数据在另外一个oc4j的instance中就会立即呈现这个改变。并可以看到merge employee的相关toplink的log。如果没有cache同步(demo的cache和cache 同步策略都基本采用了toplink的默认配置),因为employee上有乐观锁,当在另外一个oc4j instance或者说服务器上修改employee的数据,就会出现乐观锁异常,会rollback此次修改,如果有cache同步就会拿到最新的数据而不需要直接访问db,最新的数据和db一致,大大减少了乐观锁出现的频率。当然为了辅助更好的使用cache同步我们还需要定义cache invalidation机制。当然还有很多其他的cache策略避免出现脏数据。
Toplink Log:
[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received remote command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]
[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Executing command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]
[TopLink 较详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received updates from Remote Server
[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Merging com.oocl.isdc.sha.frm.tts.model.Employee: [558] from remote server
页面演示:
![]()
![]()
上面两个网页截图上的url表明是两个不同的oc4j server。并且他们用了同一个db也是同一个application。在在server1上修改数据:
![]()
![]()
本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u/19897/showart_702911.html |
|