浏览代码

1.修改核心的重连逻辑,重连线程由通道本身的netty循环线程池转换为由工具统一的重连线程池管理。
2.104master连接成功时打印本机使用端口
3.modbus 线圈拆分请求时 最大值最小值 原本取反了 进行修正

weiyigulu 3 年之前
父节点
当前提交
599c6b4770

+ 4 - 4
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>wei.yigulu</groupId>
     <artifactId>protocol</artifactId>
-    <version>2.2.12</version>
+    <version>2.3.12</version>
     <packaging>pom</packaging>
     <modules>
         <module>protocol-core</module>
@@ -32,9 +32,9 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <java.version>1.8</java.version>
-        <protocol.version>2.2.12</protocol.version>
-        <iec104.version>2.4.17</iec104.version>
-        <modbus.version>2.2.5</modbus.version>
+        <protocol.version>2.3.12</protocol.version>
+        <iec104.version>2.4.18</iec104.version>
+        <modbus.version>2.2.6</modbus.version>
         <cdt.version>1.0.0</cdt.version>
     </properties>
     <distributionManagement>

+ 1 - 1
protocol-all/pom.xml

@@ -6,7 +6,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>2.2.12</version>
+        <version>2.3.12</version>
     </parent>
 
 

+ 1 - 1
protocol-cdt/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>2.2.12</version>
+        <version>2.3.12</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
protocol-core/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>2.2.12</version>
+        <version>2.3.12</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 4 - 4
protocol-core/src/main/java/wei/yigulu/netty/HSConnectionListener.java

@@ -7,6 +7,7 @@ import io.netty.util.internal.StringUtil;
 import lombok.AllArgsConstructor;
 import lombok.NoArgsConstructor;
 import org.slf4j.Logger;
+import wei.yigulu.utils.FutureListenerReconnectThreadPool;
 
 import java.util.concurrent.TimeUnit;
 
@@ -18,12 +19,11 @@ import java.util.concurrent.TimeUnit;
  * @version 3.0
  */
 @AllArgsConstructor
-@NoArgsConstructor
 public class HSConnectionListener implements ChannelFutureListener {
 
 	private Logger log;
 
-	private AbstractHSTcpMasterBuilder masterBuilder;
+	private final AbstractHSTcpMasterBuilder masterBuilder;
 
 	private int retryTimes;
 
@@ -40,7 +40,7 @@ public class HSConnectionListener implements ChannelFutureListener {
 	@Override
 	public void operationComplete(ChannelFuture channelFuture) throws Exception {
 		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
-			this.masterBuilder.getOrCreateWorkGroup().schedule(() -> {
+			FutureListenerReconnectThreadPool.getInstance().submitReconnectJob(masterBuilder,(() -> {
 				try {
 					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
 						if (this.retryTimes < 10) {
@@ -71,7 +71,7 @@ public class HSConnectionListener implements ChannelFutureListener {
 						ex.printStackTrace();
 					}
 				}
-			}, 3L, TimeUnit.SECONDS);
+			}));
 		} else {
 			log.info("服务端{}:{}链接成功...", this.masterBuilder.getIp(), this.masterBuilder.getPort());
 		}

+ 3 - 2
protocol-core/src/main/java/wei/yigulu/netty/SimpleTcpConnectionListener.java

@@ -5,6 +5,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.util.concurrent.ScheduledFuture;
 import org.slf4j.Logger;
+import wei.yigulu.utils.FutureListenerReconnectThreadPool;
 
 import java.util.concurrent.TimeUnit;
 
@@ -38,7 +39,7 @@ public class SimpleTcpConnectionListener implements ChannelFutureListener {
 	@Override
 	public void operationComplete(ChannelFuture channelFuture) throws Exception {
 		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
-			this.future = this.masterBuilder.getOrCreateWorkGroup().schedule(() -> {
+			FutureListenerReconnectThreadPool.getInstance().submitReconnectJob(masterBuilder,() -> {
 				try {
 					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
 						log.error("服务端{}:{}链接不上,开始重连操作", this.masterBuilder.getIp(), this.masterBuilder.getPort());
@@ -54,7 +55,7 @@ public class SimpleTcpConnectionListener implements ChannelFutureListener {
 						ex.printStackTrace();
 					}
 				}
-			}, 6L, TimeUnit.SECONDS);
+			});
 		} else {
 			log.warn("masterBuilder已经连接成功,不进行重连操作");
 		}

+ 51 - 0
protocol-core/src/main/java/wei/yigulu/utils/FutureListenerReconnectThreadPool.java

@@ -0,0 +1,51 @@
+package wei.yigulu.utils;
+
+import wei.yigulu.netty.BaseProtocolBuilder;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Netty future的侦听器连接线程池(定时任务池)
+ *
+ * @author xiuwei
+ * @date 2021/11/22
+ */
+public class FutureListenerReconnectThreadPool {
+
+    private static class LazyHolder {
+        private static final FutureListenerReconnectThreadPool INSTANCE = new FutureListenerReconnectThreadPool();
+    }
+
+    private FutureListenerReconnectThreadPool() {
+    }
+
+    private Map<BaseProtocolBuilder, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();
+
+    public static final FutureListenerReconnectThreadPool getInstance() {
+        return LazyHolder.INSTANCE;
+    }
+
+    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
+
+
+    public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command) {
+        return submitReconnectJob(protocolBuilder, command, 5);
+    }
+
+    public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command, int delaySecond) {
+        synchronized (protocolBuilder) {
+            protocolBuilder.getLog().info("{},添加延时重连任务",protocolBuilder.getBuilderId());
+            if (this.scheduledFutureMap.containsKey(protocolBuilder)) {
+                ScheduledFuture f = this.scheduledFutureMap.get(protocolBuilder);
+                //线程池内有客户端对应的定时任务线程
+                if (!f.isDone() || !f.isCancelled()) {
+                    //如果之前提交的定时任务未执行完毕
+                    f.cancel(true);
+                }
+            }
+            this.scheduledFutureMap.put(protocolBuilder, pool.schedule(command, delaySecond, TimeUnit.SECONDS));
+        }
+        return this.scheduledFutureMap.get(protocolBuilder);
+    }
+}

+ 1 - 1
protocol-iec104/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>2.2.12</version>
+        <version>2.3.12</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 2 - 1
protocol-iec104/src/main/java/wei/yigulu/iec104/nettyconfig/Master104Handle.java

@@ -89,9 +89,10 @@ public class Master104Handle extends SimpleChannelInboundHandler<ByteBuf> {
 		log.debug("发出U帧,启动命令");
 		this.isInitiative = false;
 		InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+		InetSocketAddress localIpSocket = (InetSocketAddress) ctx.channel().localAddress();
 		String clientIp = ipSocket.getAddress().getHostAddress();
 		Integer clientPort = ipSocket.getPort();
-		log.info("连接" + clientIp + ":" + clientPort + "服务端成功");
+		log.info("连接" + clientIp + ":" + clientPort + "服务端成功,本地端口:"+localIpSocket.getPort());
 		LinkContainer.getInstance().getLinks().put(ctx.channel().id(), new Iec104Link(ctx.channel(), clientIp, clientPort, Iec104Link.Role.SLAVER, masterBuilder.getLog()));
 		ctx.writeAndFlush(Unpooled.copiedBuffer(TechnicalTerm.START));
 		this.masterBuilder.connected();

+ 1 - 1
protocol-modbus/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>2.2.12</version>
+        <version>2.3.12</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <version>${modbus.version}</version>

+ 2 - 2
protocol-modbus/src/main/java/wei/yigulu/modbus/utils/ModbusRequestDataUtils.java

@@ -89,8 +89,8 @@ public class ModbusRequestDataUtils {
 	public static List<Obj4RequestCoil> splitModbusRequest(List<Integer> locator, int slave, FunctionCode functionCode) throws ModbusException {
 		List<Obj4RequestCoil> list = new ArrayList<>();
 		Collections.sort(locator);
-		Integer max = locator.get(0);
-		Integer min = locator.get(locator.size() - 1);
+		Integer min = locator.get(0);
+		Integer max = locator.get(locator.size() - 1);
 		List<Integer> ls = new ArrayList<>();
 		if (max - min < MAXLENGTH) {
 			list.add(new Obj4RequestCoil(slave, functionCode, locator));