Quellcode durchsuchen

重连机制修改

xiuwei vor 3 Jahren
Ursprung
Commit
e4347a0a28

+ 8 - 5
protocol-core/src/main/java/wei/yigulu/netty/AbstractClientBuilder.java

@@ -9,6 +9,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.Setter;
 import lombok.experimental.Accessors;
 import lombok.experimental.Accessors;
 import wei.yigulu.threadpool.LocalThreadPool;
 import wei.yigulu.threadpool.LocalThreadPool;
+import wei.yigulu.utils.FutureListenerReconnectThreadPool;
 
 
 
 
 /**
 /**
@@ -40,22 +41,24 @@ public abstract class AbstractClientBuilder extends BaseProtocolBuilder {
 	/**
 	/**
 	 * Connection listener
 	 * Connection listener
 	 */
 	 */
-	protected ChannelFutureListener connectionListener = null;
+	protected ProtocolConnectionListener connectionListener = null;
 
 
 
 
 	protected ProtocolChannelInitializer channelInitializer = null;
 	protected ProtocolChannelInitializer channelInitializer = null;
 
 
 
 
 	public void stop() {
 	public void stop() {
+		log.info("关闭通道{}", this.builderId);
 		if (this.future != null) {
 		if (this.future != null) {
 			this.future.removeListener(getOrCreateConnectionListener());
 			this.future.removeListener(getOrCreateConnectionListener());
-			if (!this.future.channel().eventLoop().isShutdown()) {
-				this.future.channel().close();
-			}
+			this.future.addListener(ChannelFutureListener.CLOSE);
 		}
 		}
+		getOrCreateConnectionListener().stop();
+		this.connectionListener = null;
 		if (this.workGroup != null) {
 		if (this.workGroup != null) {
 			this.workGroup.shutdownGracefully();
 			this.workGroup.shutdownGracefully();
 		}
 		}
+		FutureListenerReconnectThreadPool.getInstance().remove(this);
 		this.bootstrap = null;
 		this.bootstrap = null;
 		this.workGroup = null;
 		this.workGroup = null;
 	}
 	}
@@ -95,7 +98,7 @@ public abstract class AbstractClientBuilder extends BaseProtocolBuilder {
 	 *
 	 *
 	 * @return or create connection listener
 	 * @return or create connection listener
 	 */
 	 */
-	public abstract ChannelFutureListener getOrCreateConnectionListener();
+	public abstract ProtocolConnectionListener getOrCreateConnectionListener();
 
 
 	/**
 	/**
 	 * null则创建,有则获取获取ChannelInitializer
 	 * null则创建,有则获取获取ChannelInitializer

+ 58 - 59
protocol-core/src/main/java/wei/yigulu/netty/AbstractHSTcpMasterBuilder.java

@@ -1,7 +1,6 @@
 package wei.yigulu.netty;
 package wei.yigulu.netty;
 
 
 
 
-import io.netty.channel.ChannelFutureListener;
 import lombok.EqualsAndHashCode;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.Setter;
@@ -21,70 +20,70 @@ import lombok.experimental.Accessors;
 public abstract class AbstractHSTcpMasterBuilder extends AbstractTcpMasterBuilder {
 public abstract class AbstractHSTcpMasterBuilder extends AbstractTcpMasterBuilder {
 
 
 
 
-    /**
-     * Hs master builder
-     *
-     * @param ip   ip
-     * @param port port
-     */
-    public AbstractHSTcpMasterBuilder(String ip, Integer port) {
-        super(ip, port);
-    }
+	/**
+	 * Hs master builder
+	 *
+	 * @param ip   ip
+	 * @param port port
+	 */
+	public AbstractHSTcpMasterBuilder(String ip, Integer port) {
+		super(ip, port);
+	}
 
 
-    /**
-     * Hs master builder
-     *
-     * @param ip        ip
-     * @param port      port
-     * @param spareIp   备 ip
-     * @param sparePort 备 port
-     */
-    public AbstractHSTcpMasterBuilder(String ip, Integer port, String spareIp, Integer sparePort) {
-        super(ip, port);
-        this.spareIp = spareIp;
-        this.sparePort = sparePort;
-    }
+	/**
+	 * Hs master builder
+	 *
+	 * @param ip        ip
+	 * @param port      port
+	 * @param spareIp   备 ip
+	 * @param sparePort 备 port
+	 */
+	public AbstractHSTcpMasterBuilder(String ip, Integer port, String spareIp, Integer sparePort) {
+		super(ip, port);
+		this.spareIp = spareIp;
+		this.sparePort = sparePort;
+	}
 
 
-    /**
-     * 备对端ip
-     */
-    @Getter
-    @Setter
-    private String spareIp;
+	/**
+	 * 备对端ip
+	 */
+	@Getter
+	@Setter
+	private String spareIp;
 
 
-    /**
-     * 备对端port
-     */
-    @Getter
-    @Setter
-    private Integer sparePort;
+	/**
+	 * 备对端port
+	 */
+	@Getter
+	@Setter
+	private Integer sparePort;
 
 
 
 
-    @Override
-    public ChannelFutureListener getOrCreateConnectionListener() {
-        if (this.connectionListener == null) {
-            this.connectionListener = new HSConnectionListener(this);
-        }
-        return this.connectionListener;
-    }
+	@Override
+	public ProtocolConnectionListener getOrCreateConnectionListener() {
+		if (this.connectionListener == null) {
+			this.connectionListener = new HSConnectionListener(this);
+		}
+		return this.connectionListener;
+	}
 
 
-    /**
-     * 切换主备机
-     */
-    public void switchover() {
-        String temporaryIp;
-        int temporaryPort;
-        if (spareIp != null && !"".equals(spareIp)) {
-            temporaryIp = this.ip;
-            this.ip = this.spareIp;
-            this.spareIp = temporaryIp;
-        }
-        if (sparePort != null && sparePort != 0) {
-            temporaryPort = this.port;
-            this.port = this.sparePort;
-            this.sparePort = temporaryPort;
-        }
-    }
+	/**
+	 * 切换主备机
+	 */
+	public void switchover() {
+		String temporaryIp;
+		int temporaryPort;
+		if (spareIp != null && !"".equals(spareIp)) {
+			temporaryIp = this.ip;
+			this.ip = this.spareIp;
+			this.spareIp = temporaryIp;
+		}
+		if (this.sparePort != null && sparePort != 0) {
+			temporaryPort = this.port;
+			this.port = this.sparePort;
+			this.sparePort = temporaryPort;
+		}
+	}
 
 
 
 
 }
 }

+ 2 - 2
protocol-core/src/main/java/wei/yigulu/netty/AbstractMasterBuilder.java

@@ -28,7 +28,7 @@ public abstract class AbstractMasterBuilder extends AbstractClientBuilder implem
 		if (getFuture() != null && getFuture().channel().isActive()) {
 		if (getFuture() != null && getFuture().channel().isActive()) {
 			getLog().info("se ==> " + DataConvertor.Byte2String(bytes));
 			getLog().info("se ==> " + DataConvertor.Byte2String(bytes));
 			getFuture().channel().writeAndFlush(Unpooled.copiedBuffer(bytes));
 			getFuture().channel().writeAndFlush(Unpooled.copiedBuffer(bytes));
-		}else{
+		} else {
 			throw new RuntimeException("无客户端连接");
 			throw new RuntimeException("无客户端连接");
 		}
 		}
 	}
 	}
@@ -43,7 +43,7 @@ public abstract class AbstractMasterBuilder extends AbstractClientBuilder implem
 		if (getFuture() != null && getFuture().channel().isActive()) {
 		if (getFuture() != null && getFuture().channel().isActive()) {
 			getLog().info("se ==> " + DataConvertor.ByteBuf2String(byteBuf));
 			getLog().info("se ==> " + DataConvertor.ByteBuf2String(byteBuf));
 			getFuture().channel().writeAndFlush(Unpooled.copiedBuffer(byteBuf));
 			getFuture().channel().writeAndFlush(Unpooled.copiedBuffer(byteBuf));
-		}else{
+		} else {
 			throw new RuntimeException("无客户端连接");
 			throw new RuntimeException("无客户端连接");
 		}
 		}
 	}
 	}

+ 1 - 2
protocol-core/src/main/java/wei/yigulu/netty/AbstractRtuModeBuilder.java

@@ -1,7 +1,6 @@
 package wei.yigulu.netty;
 package wei.yigulu.netty;
 
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import lombok.Getter;
 import lombok.Getter;
@@ -96,7 +95,7 @@ public abstract class AbstractRtuModeBuilder extends AbstractMasterBuilder {
 	}
 	}
 
 
 	@Override
 	@Override
-	public ChannelFutureListener getOrCreateConnectionListener() {
+	public ProtocolConnectionListener getOrCreateConnectionListener() {
 		if (this.connectionListener == null) {
 		if (this.connectionListener == null) {
 			this.connectionListener = new RtuModeConnectionListener(this);
 			this.connectionListener = new RtuModeConnectionListener(this);
 		}
 		}

+ 8 - 5
protocol-core/src/main/java/wei/yigulu/netty/AbstractTcpMasterBuilder.java

@@ -70,12 +70,15 @@ public abstract class AbstractTcpMasterBuilder extends AbstractMasterBuilder {
 	public void create() {
 	public void create() {
 		synchronized (this) {
 		synchronized (this) {
 			if (future != null) {
 			if (future != null) {
-				future.removeListener(getOrCreateConnectionListener());
-				if (!future.channel().eventLoop().isShutdown()) {
-					future.channel().close();
-				}
+				this.future.removeListener(getOrCreateConnectionListener());
+				this.future.addListener(ChannelFutureListener.CLOSE);
 				future = null;
 				future = null;
 			}
 			}
+			try {
+				Thread.sleep(5000L);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
 			log.debug("创建连接");
 			log.debug("创建连接");
 			try {
 			try {
 				SocketAddress remoteAddress = new InetSocketAddress(getIp(), getPort());
 				SocketAddress remoteAddress = new InetSocketAddress(getIp(), getPort());
@@ -127,7 +130,7 @@ public abstract class AbstractTcpMasterBuilder extends AbstractMasterBuilder {
 	}
 	}
 
 
 	@Override
 	@Override
-	public ChannelFutureListener getOrCreateConnectionListener() {
+	public ProtocolConnectionListener getOrCreateConnectionListener() {
 		if (this.connectionListener == null) {
 		if (this.connectionListener == null) {
 			this.connectionListener = new SimpleTcpConnectionListener(this);
 			this.connectionListener = new SimpleTcpConnectionListener(this);
 		}
 		}

+ 31 - 47
protocol-core/src/main/java/wei/yigulu/netty/HSConnectionListener.java

@@ -2,14 +2,7 @@ package wei.yigulu.netty;
 
 
 
 
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.util.internal.StringUtil;
 import io.netty.util.internal.StringUtil;
-import lombok.AllArgsConstructor;
-import lombok.NoArgsConstructor;
-import org.slf4j.Logger;
-import wei.yigulu.utils.FutureListenerReconnectThreadPool;
-
-import java.util.concurrent.TimeUnit;
 
 
 /**
 /**
  * 负责监听启动时连接失败,重新连接功能
  * 负责监听启动时连接失败,重新连接功能
@@ -18,62 +11,53 @@ import java.util.concurrent.TimeUnit;
  * @author 修唯xiuwei
  * @author 修唯xiuwei
  * @version 3.0
  * @version 3.0
  */
  */
-@AllArgsConstructor
-public class HSConnectionListener implements ChannelFutureListener {
-
-	private Logger log;
+public class HSConnectionListener extends ProtocolConnectionListener<AbstractHSTcpMasterBuilder> {
 
 
-	private final AbstractHSTcpMasterBuilder masterBuilder;
 
 
 	private int retryTimes;
 	private int retryTimes;
 
 
 	/**
 	/**
-	 * Hs connection listener
+	 * Only host connection listener
 	 *
 	 *
 	 * @param masterBuilder master builder
 	 * @param masterBuilder master builder
 	 */
 	 */
 	public HSConnectionListener(AbstractHSTcpMasterBuilder masterBuilder) {
 	public HSConnectionListener(AbstractHSTcpMasterBuilder masterBuilder) {
-		this.masterBuilder = masterBuilder;
-		this.log = masterBuilder.getLog();
+		super(masterBuilder);
 	}
 	}
 
 
+
 	@Override
 	@Override
-	public void operationComplete(ChannelFuture channelFuture) throws Exception {
-		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
-			FutureListenerReconnectThreadPool.getInstance().submitReconnectJob(masterBuilder,(() -> {
-				try {
-					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
-						if (this.retryTimes < 10) {
-							log.error("服务端{}:{}链接不上,开始重连操作,第{}次尝试", this.masterBuilder.getIp(), this.masterBuilder.getPort(), retryTimes);
-							masterBuilder.create();
-							log.warn("重试连接失败");
-							this.retryTimes++;
-						} else {
-							if (!StringUtil.isNullOrEmpty(this.masterBuilder.getSpareIp()) || (this.masterBuilder.getSparePort() != null && this.masterBuilder.getSparePort() != 0)) {
-								log.info("服务端{}:{}链接不上,切换主备机{}:{}", this.masterBuilder.getIp(), this.masterBuilder.getPort(), this.masterBuilder.getSpareIp(), this.masterBuilder.getSparePort());
-								this.masterBuilder.switchover();
-							}
-							this.masterBuilder.refreshLoopGroup();
-							this.retryTimes = 0;
-							masterBuilder.create();
-							log.info("重置重试次数=0");
-						}
-					} else {
-						log.warn("masterBuilder在延迟过程中已由其他线程连接成功,此处略过重连");
+	protected void reconnectFuture(ChannelFuture channelFuture) {
+		try {
+			if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
+				log.warn("开始执行{}重连任务", masterBuilder.builderId);
+				if (this.retryTimes < 10) {
+					log.error("服务端{}:{}链接不上,开始重连操作,第{}次尝试", this.masterBuilder.getIp(), this.masterBuilder.getPort(), retryTimes);
+					masterBuilder.createByUnBlock();
+					log.warn("重试连接失败");
+					this.retryTimes++;
+				} else {
+					if (!StringUtil.isNullOrEmpty(this.masterBuilder.getSpareIp()) || (this.masterBuilder.getSparePort() != null && this.masterBuilder.getSparePort() != 0)) {
+						log.info("服务端{}:{}链接不上,切换主备机{}:{}", this.masterBuilder.getIp(), this.masterBuilder.getPort(), this.masterBuilder.getSpareIp(), this.masterBuilder.getSparePort());
+						this.masterBuilder.switchover();
 					}
 					}
-				} catch (Exception e) {
-					log.error("ModbusMaster重试连接时发生异常", e);
 					this.masterBuilder.refreshLoopGroup();
 					this.masterBuilder.refreshLoopGroup();
 					this.retryTimes = 0;
 					this.retryTimes = 0;
-					try {
-						operationComplete(channelFuture);
-					} catch (Exception ex) {
-						ex.printStackTrace();
-					}
+					masterBuilder.createByUnBlock();
+					log.info("重置重试次数=0");
 				}
 				}
-			}));
-		} else {
-			log.info("服务端{}:{}链接成功...", this.masterBuilder.getIp(), this.masterBuilder.getPort());
+			} else {
+				log.warn("masterBuilder在延迟过程中已由其他线程连接成功,此处略过重连");
+			}
+		} catch (Exception e) {
+			log.error("ModbusMaster重试连接时发生异常", e);
+			this.masterBuilder.refreshLoopGroup();
+			this.retryTimes = 0;
+			try {
+				operationComplete(channelFuture);
+			} catch (Exception ex) {
+				ex.printStackTrace();
+			}
 		}
 		}
 	}
 	}
 }
 }

+ 2 - 2
protocol-core/src/main/java/wei/yigulu/netty/MasterInterface.java

@@ -14,13 +14,13 @@ public interface MasterInterface {
 	 *
 	 *
 	 * @param bytes
 	 * @param bytes
 	 */
 	 */
-	public void sendFrameToOpposite(byte[] bytes);
+	void sendFrameToOpposite(byte[] bytes);
 
 
 	/**
 	/**
 	 * 向对端 发送数据帧
 	 * 向对端 发送数据帧
 	 *
 	 *
 	 * @param byteBuf
 	 * @param byteBuf
 	 */
 	 */
-	public void sendFrameToOpposite(ByteBuf byteBuf);
+	void sendFrameToOpposite(ByteBuf byteBuf);
 
 
 }
 }

+ 59 - 0
protocol-core/src/main/java/wei/yigulu/netty/ProtocolConnectionListener.java

@@ -0,0 +1,59 @@
+package wei.yigulu.netty;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.slf4j.Logger;
+import wei.yigulu.utils.FutureListenerReconnectThreadPool;
+
+/**
+ * 框架只用的连接监听
+ *
+ * @author: xiuwei
+ * @version:
+ */
+public abstract class ProtocolConnectionListener<T extends AbstractClientBuilder> implements ChannelFutureListener {
+
+	protected ScheduledFuture<?> future;
+	protected Logger log;
+	protected T masterBuilder;
+	protected boolean isStop = false;
+
+	/**
+	 * Only host connection listener
+	 *
+	 * @param masterBuilder master builder
+	 */
+	public ProtocolConnectionListener(T masterBuilder) {
+		this.masterBuilder = masterBuilder;
+		this.log = masterBuilder.getLog();
+	}
+
+
+	@Override
+	public void operationComplete(ChannelFuture future) throws Exception {
+		if (isStop) {
+			log.info("通道已经停止,不再重连");
+			return;
+		} else {
+			if (future == null || future.channel() == null || !future.channel().isActive()) {
+				FutureListenerReconnectThreadPool.getInstance().submitReconnectJob(masterBuilder, () -> {
+					reconnectFuture(future);
+				});
+			} else {
+				log.warn("masterBuilder已经连接成功,不进行重连操作");
+			}
+		}
+	}
+
+	/**
+	 * 连接任务
+	 *
+	 * @param channelFuture 频道任务
+	 */
+	protected abstract void reconnectFuture(ChannelFuture channelFuture);
+
+	public void stop() {
+		this.isStop = true;
+	}
+}

+ 15 - 21
protocol-core/src/main/java/wei/yigulu/netty/RtuModeConnectionListener.java

@@ -2,14 +2,8 @@ package wei.yigulu.netty;
 
 
 
 
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
-import lombok.AllArgsConstructor;
-import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 
 
-import java.util.concurrent.TimeUnit;
-
 /**
 /**
  * 负责监听启动时连接失败,重新连接功能
  * 负责监听启动时连接失败,重新连接功能
  *
  *
@@ -17,25 +11,25 @@ import java.util.concurrent.TimeUnit;
  * @create 2019-03-13 14:15
  * @create 2019-03-13 14:15
  * @Email 524710549@qq.com
  * @Email 524710549@qq.com
  **/
  **/
-@AllArgsConstructor
-@NoArgsConstructor
+
 @Slf4j
 @Slf4j
-public class RtuModeConnectionListener implements ChannelFutureListener {
+public class RtuModeConnectionListener extends ProtocolConnectionListener<AbstractRtuModeBuilder> {
 
 
-	private AbstractRtuModeBuilder abstractRtuClient;
+
+	/**
+	 * Only host connection listener
+	 *
+	 * @param masterBuilder master builder
+	 */
+	public RtuModeConnectionListener(AbstractRtuModeBuilder masterBuilder) {
+		super(masterBuilder);
+	}
 
 
 	@Override
 	@Override
-	public void operationComplete(ChannelFuture channelFuture) throws Exception {
-		if (!channelFuture.channel().isActive()) {
-			final EventLoop loop = channelFuture.channel().eventLoop();
-			loop.schedule(() -> {
-				abstractRtuClient.getLog().error("RTU:{}端链接不上,开始重连操作...", abstractRtuClient.getCommPortId());
-				channelFuture.channel().closeFuture();
-				abstractRtuClient.create();
-			}, 4L, TimeUnit.SECONDS);
-		} else {
-			abstractRtuClient.getLog().info("RTU:{}端链接成功...", abstractRtuClient.getCommPortId());
-		}
+	protected void reconnectFuture(ChannelFuture channelFuture) {
+		log.error("RTU:{}端链接不上,开始重连操作...", masterBuilder.getCommPortId());
+		channelFuture.channel().closeFuture();
+		masterBuilder.create();
 	}
 	}
 }
 }
 
 

+ 17 - 36
protocol-core/src/main/java/wei/yigulu/netty/SimpleTcpConnectionListener.java

@@ -2,12 +2,6 @@ package wei.yigulu.netty;
 
 
 
 
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.util.concurrent.ScheduledFuture;
-import org.slf4j.Logger;
-import wei.yigulu.utils.FutureListenerReconnectThreadPool;
-
-import java.util.concurrent.TimeUnit;
 
 
 /**
 /**
  * 负责监听启动时连接失败,重新连接功能
  * 负责监听启动时连接失败,重新连接功能
@@ -17,13 +11,7 @@ import java.util.concurrent.TimeUnit;
  * @version 3.0
  * @version 3.0
  */
  */
 
 
-public class SimpleTcpConnectionListener implements ChannelFutureListener {
-
-	private Logger log;
-
-	private AbstractTcpMasterBuilder masterBuilder;
-
-	ScheduledFuture<?> future;
+public class SimpleTcpConnectionListener extends ProtocolConnectionListener<AbstractTcpMasterBuilder> {
 
 
 	/**
 	/**
 	 * Only host connection listener
 	 * Only host connection listener
@@ -31,33 +19,26 @@ public class SimpleTcpConnectionListener implements ChannelFutureListener {
 	 * @param masterBuilder master builder
 	 * @param masterBuilder master builder
 	 */
 	 */
 	public SimpleTcpConnectionListener(AbstractTcpMasterBuilder masterBuilder) {
 	public SimpleTcpConnectionListener(AbstractTcpMasterBuilder masterBuilder) {
-		this.masterBuilder = masterBuilder;
-		this.log = masterBuilder.getLog();
+		super(masterBuilder);
 	}
 	}
 
 
 
 
 	@Override
 	@Override
-	public void operationComplete(ChannelFuture channelFuture) throws Exception {
-		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
-			FutureListenerReconnectThreadPool.getInstance().submitReconnectJob(masterBuilder,() -> {
-				try {
-					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
-						log.error("服务端{}:{}链接不上,开始重连操作", this.masterBuilder.getIp(), this.masterBuilder.getPort());
-						masterBuilder.create();
-					} else {
-						log.warn("masterBuilder在延迟过程中已由其他线程连接成功,此处略过重连");
-					}
-				} catch (Exception e) {
-					log.error("TcpMaster重试连接时发生异常", e);
-					try {
-						operationComplete(channelFuture);
-					} catch (Exception ex) {
-						ex.printStackTrace();
-					}
-				}
-			});
-		} else {
-			log.warn("masterBuilder已经连接成功,不进行重连操作");
+	protected void reconnectFuture(ChannelFuture channelFuture) {
+		try {
+			if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
+				log.error("服务端{}:{}链接不上,开始重连操作", this.masterBuilder.getIp(), this.masterBuilder.getPort());
+				masterBuilder.createByUnBlock();
+			} else {
+				log.warn("masterBuilder在延迟过程中已由其他线程连接成功,此处略过重连");
+			}
+		} catch (Exception e) {
+			log.error("TcpMaster重试连接时发生异常", e);
+			try {
+				operationComplete(channelFuture);
+			} catch (Exception ex) {
+				ex.printStackTrace();
+			}
 		}
 		}
 	}
 	}
 }
 }

+ 43 - 35
protocol-core/src/main/java/wei/yigulu/utils/FutureListenerReconnectThreadPool.java

@@ -13,39 +13,47 @@ import java.util.concurrent.*;
  */
  */
 public class FutureListenerReconnectThreadPool {
 public class FutureListenerReconnectThreadPool {
 
 
-    private static class LazyHolder {
-        private static final FutureListenerReconnectThreadPool INSTANCE = new FutureListenerReconnectThreadPool();
-    }
-
-    private FutureListenerReconnectThreadPool() {
-    }
-
-    private Map<BaseProtocolBuilder, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();
-
-    public static final FutureListenerReconnectThreadPool getInstance() {
-        return LazyHolder.INSTANCE;
-    }
-
-    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
-
-
-    public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command) {
-        return submitReconnectJob(protocolBuilder, command, 5);
-    }
-
-    public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command, int delaySecond) {
-        synchronized (protocolBuilder) {
-            protocolBuilder.getLog().info("{},添加延时重连任务",protocolBuilder.getBuilderId());
-            if (this.scheduledFutureMap.containsKey(protocolBuilder)) {
-                ScheduledFuture f = this.scheduledFutureMap.get(protocolBuilder);
-                //线程池内有客户端对应的定时任务线程
-                if (!f.isDone() || !f.isCancelled()) {
-                    //如果之前提交的定时任务未执行完毕
-                    f.cancel(true);
-                }
-            }
-            this.scheduledFutureMap.put(protocolBuilder, pool.schedule(command, delaySecond, TimeUnit.SECONDS));
-        }
-        return this.scheduledFutureMap.get(protocolBuilder);
-    }
+	ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
+	private Map<BaseProtocolBuilder, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();
+
+	private FutureListenerReconnectThreadPool() {
+	}
+
+	public static final FutureListenerReconnectThreadPool getInstance() {
+		return LazyHolder.INSTANCE;
+	}
+
+	public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command) {
+		return submitReconnectJob(protocolBuilder, command, 30);
+	}
+
+	public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command, int delaySecond) {
+		synchronized (protocolBuilder) {
+            protocolBuilder.getLog().info("{},添加延时重连任务", protocolBuilder.getBuilderId());
+			if (this.scheduledFutureMap.containsKey(protocolBuilder)) {
+				ScheduledFuture f = this.scheduledFutureMap.get(protocolBuilder);
+				//线程池内有客户端对应的定时任务线程
+				if (!f.isDone() ) {
+					//如果之前提交的定时任务未执行完毕
+                    protocolBuilder.getLog().info("重连任务中已经包含未执行的重连任务", protocolBuilder.getBuilderId());
+					f.cancel(true);
+				}
+			}
+			this.scheduledFutureMap.put(protocolBuilder, pool.schedule(command, delaySecond, TimeUnit.SECONDS));
+		}
+		return this.scheduledFutureMap.get(protocolBuilder);
+	}
+
+	public void remove(BaseProtocolBuilder protocolBuilder) {
+		if (this.scheduledFutureMap.containsKey(protocolBuilder)) {
+			if (!this.scheduledFutureMap.get(protocolBuilder).isDone()) {
+				this.scheduledFutureMap.get(protocolBuilder).cancel(true);
+			}
+			this.scheduledFutureMap.remove(protocolBuilder);
+		}
+	}
+
+	private static class LazyHolder {
+		private static final FutureListenerReconnectThreadPool INSTANCE = new FutureListenerReconnectThreadPool();
+	}
 }
 }