### Eclipse Workspace Patch 1.0 #P ehcache-jgroupsreplication Index: src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProvider.java =================================================================== --- src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProvider.java (revisión: 29615) +++ src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProvider.java (copia de trabajo) @@ -27,6 +27,9 @@ import org.jgroups.Channel; import org.jgroups.ChannelException; import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.blocks.MessageDispatcher; +import org.jgroups.blocks.RequestHandler; import org.jgroups.jmx.JmxConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +62,11 @@ private final CacheManager cacheManager; private final String groupProperties; private final URL groupUrl; + private final int syncMode; + private final long syncTimeout; private JChannel channel; + private MessageDispatcher messageDispatcher; private JGroupsCachePeer cachePeer; private JGroupsCacheReceiver cacheReceiver; private List cachePeersListCache; @@ -76,10 +82,12 @@ * @param cacheManager the cache manager * @param properties the JGroups connection String */ - public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, String properties) { + public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, String properties, int syncMode, long syncTimeout) { this.cacheManager = cacheManager; this.groupProperties = properties; this.groupUrl = null; + this.syncMode = syncMode; + this.syncTimeout = syncTimeout; } /** @@ -88,10 +96,12 @@ * @param cacheManager the cache manager * @param configUrl the JGroups configuration file */ - public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, URL configUrl) { + public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, URL configUrl, int syncMode, long syncTimeout) { this.cacheManager = cacheManager; this.groupProperties = null; this.groupUrl = configUrl; + this.syncMode = syncMode; + this.syncTimeout = syncTimeout; } /** @@ -136,12 +146,21 @@ return; } + this.messageDispatcher = new MessageDispatcher(this.channel, null, null, new RequestHandler() { + @Override + public Object handle(Message msg) { + if (cacheReceiver != null) { + cacheReceiver.receive(msg); + } + return null; + } + }); + final String clusterName = this.getClusterName(); - this.cachePeer = new JGroupsCachePeer(this.channel, clusterName); + this.cachePeer = new JGroupsCachePeer(this.channel, this.messageDispatcher, clusterName, this.syncMode, this.syncTimeout); this.bootstrapManager = new JGroupsBootstrapManager(clusterName, this.cachePeer, this.cacheManager); this.cacheReceiver = new JGroupsCacheReceiver(this.cacheManager, this.bootstrapManager); - this.channel.setReceiver(this.cacheReceiver); this.channel.setOpt(Channel.LOCAL, Boolean.FALSE); try { @@ -221,7 +240,14 @@ } } + try { + this.messageDispatcher.stop(); + } catch (Exception e) { + LOG.error("Error stopping MessageDispatcher for cluster " + clusterName, e); + } + this.channel = null; + this.messageDispatcher = null; } } Index: src/net/sf/ehcache/distribution/jgroups/JGroupsCachePeer.java =================================================================== --- src/net/sf/ehcache/distribution/jgroups/JGroupsCachePeer.java (revisión: 29615) +++ src/net/sf/ehcache/distribution/jgroups/JGroupsCachePeer.java (copia de trabajo) @@ -21,10 +21,11 @@ import net.sf.ehcache.store.chm.ConcurrentHashMap; import org.jgroups.Address; import org.jgroups.Channel; -import org.jgroups.ChannelClosedException; -import org.jgroups.ChannelNotConnectedException; import org.jgroups.Message; import org.jgroups.View; +import org.jgroups.blocks.MessageDispatcher; +import org.jgroups.blocks.Request; +import org.jgroups.blocks.RequestOptions; import org.jgroups.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ import java.io.Serializable; import java.rmi.RemoteException; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -53,18 +55,24 @@ private static final int CHUNK_SIZE = 100; private final Channel channel; + private final MessageDispatcher messageDispatcher; private final ConcurrentMap> asyncReplicationQueues = new ConcurrentHashMap>(); private final Timer timer; private volatile boolean alive; + private final int syncMode; + private final long syncTimeout; /** * Create a new {@link CachePeer} */ - public JGroupsCachePeer(Channel channel, String clusterName) { + public JGroupsCachePeer(Channel channel, MessageDispatcher messageDispatcher, String clusterName, int syncMode, long syncTimeout) { this.channel = channel; + this.messageDispatcher = messageDispatcher; this.alive = true; this.timer = new Timer(clusterName + " Async Replication Thread", true); + this.syncMode = syncMode; + this.syncTimeout = syncTimeout; } /** @@ -163,7 +171,7 @@ } LOG.debug("Sending {} JGroupEventMessages synchronously.", synchronousEventMessages.size()); - this.sendData(dest, synchronousEventMessages); + this.sendData(dest, synchronousEventMessages, true); } private Queue getMessageQueue(long asyncTime) { @@ -188,7 +196,7 @@ * Sends a serializable object to the specified address. If no address is specified it is sent to the * entire group. */ - private void sendData(Address dest, List dataList) { + private void sendData(Address dest, List dataList, boolean sync) { //Remove the list wrapper if only a single event is being sent final Serializable toSend; if (dataList.size() == 1) { @@ -208,12 +216,23 @@ //Send it off to the group final Message msg = new Message(dest, null, data); + List
dests = null; + if (dest != null) { + dests = Collections.singletonList(dest); + } + + RequestOptions options = new RequestOptions(); + if (sync) { + options.setMode(this.syncMode); + options.setTimeout(this.syncTimeout); + } else { + options.setMode(Request.GET_NONE); + } + try { - this.channel.send(msg); - } catch (ChannelNotConnectedException e) { - LOG.error("Failed to send message(s) due to the channel being disconnected: " + toSend, e); - } catch (ChannelClosedException e) { - LOG.error("Failed to send message(s) due to the channel being closed: " + toSend, e); + this.messageDispatcher.castMessage(dests, msg, options); + } catch (Exception e) { + LOG.error("Failed to send message(s): " + toSend, e); } } @@ -243,7 +262,7 @@ } LOG.debug("Sending {} JGroupEventMessages from the asynchronous queue.", events.size()); - this.sendData(null, events); + this.sendData(null, events, false); } } Index: src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProviderFactory.java =================================================================== --- src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProviderFactory.java (revisión: 29615) +++ src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProviderFactory.java (copia de trabajo) @@ -22,6 +22,8 @@ import net.sf.ehcache.distribution.CacheManagerPeerProviderFactory; import net.sf.ehcache.util.ClassLoaderUtil; import net.sf.ehcache.util.PropertyUtil; + +import org.jgroups.blocks.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,8 @@ private static final String CONNECT = "connect"; private static final String FILE = "file"; + private static final String SYNC_MODE = "syncMode"; + private static final String SYNC_TIMEOUT = "syncTimeout"; /** * {@inheritDoc} @@ -49,7 +53,37 @@ final String connect = this.getProperty(CONNECT, properties); final String file = this.getProperty(FILE, properties); + final String syncModeStr = this.getProperty(SYNC_MODE, properties); + final String syncTimeoutStr = this.getProperty(SYNC_TIMEOUT, properties); + int syncMode = Request.GET_NONE; + if (syncModeStr != null) { + if ("GET_ALL".equals(syncModeStr)) { + syncMode = Request.GET_ALL; + } else if ("GET_FIRST".equals(syncModeStr)) { + syncMode = Request.GET_FIRST; + } else if ("GET_MAJORITY".equals(syncModeStr)) { + syncMode = Request.GET_MAJORITY; + } else if ("GET_ABS_MAJORITY".equals(syncModeStr)) { + syncMode = Request.GET_ABS_MAJORITY; + } else if ("GET_NONE".equals(syncModeStr)) { + syncMode = Request.GET_NONE; + } else { + LOG.warn("Unknown sync mode, defaulting to GET_NONE"); + syncMode = Request.GET_NONE; + } + } + + long syncTimeout = 0L; + if (syncTimeoutStr != null) { + try { + syncTimeout = Long.valueOf(syncTimeoutStr); + } catch (NumberFormatException e) { + LOG.warn("Invalid sync timeout, defaulting to 0 (no timeout)"); + syncTimeout = 0L; + } + } + final JGroupsCacheManagerPeerProvider peerProvider; if (file != null) { if (connect != null) { @@ -60,25 +94,25 @@ final URL configUrl = contextClassLoader.getResource(file); LOG.debug("Creating JGroups CacheManagerPeerProvider for {} with configuration file: {}", cacheManager.getName(), configUrl); - peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, configUrl); + peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, configUrl, syncMode, syncTimeout); } else { LOG.debug("Creating JGroups CacheManagerPeerProvider for {} with configuration:\n{}", cacheManager.getName(), connect); - peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, connect); + peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, connect, syncMode, syncTimeout); } return peerProvider; } private String getProperty(final String name, Properties properties) { - String connect = PropertyUtil.extractAndLogProperty(name, properties); - if (connect != null) { - connect = connect.trim(); - connect = connect.replaceAll(" ", ""); - if (connect.equals("")) { - connect = null; + String value = PropertyUtil.extractAndLogProperty(name, properties); + if (value != null) { + value = value.trim(); + value = value.replaceAll(" ", ""); + if (value.equals("")) { + value = null; } } - return connect; + return value; } }