Kaynağa Gözat

更新onlydata分支

weiyigulu 3 yıl önce
ebeveyn
işleme
af91dee51e

+ 3 - 3
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>wei.yigulu</groupId>
     <artifactId>protocol</artifactId>
-    <version>1.2.10</version>
+    <version>1.2.20</version>
     <packaging>pom</packaging>
     <modules>
         <module>protocol-core</module>
@@ -32,8 +32,8 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <java.version>1.8</java.version>
-        <protocol.version>1.2.10</protocol.version>
-        <iec104.version>1.4.15</iec104.version>
+        <protocol.version>1.2.20</protocol.version>
+        <iec104.version>1.4.25</iec104.version>
         <modbus.version>1.2.5</modbus.version>
         <cdt.version>1.0.0</cdt.version>
     </properties>

+ 2 - 2
protocol-all/pom.xml

@@ -6,14 +6,14 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>1.2.10</version>
+        <version>1.2.20</version>
     </parent>
 
 
 
     <packaging>jar</packaging>
     <artifactId>protocol-all</artifactId>
-    <version>1.2.10</version>
+    <version>1.2.20</version>
 
     <dependencies>
         <dependency>

+ 1 - 1
protocol-cdt/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>1.2.10</version>
+        <version>1.2.20</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
protocol-core/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>1.2.10</version>
+        <version>1.2.20</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
protocol-core/src/main/java/wei/yigulu/netty/AbstractHSTcpMasterBuilder.java

@@ -79,7 +79,7 @@ public abstract class AbstractHSTcpMasterBuilder extends AbstractTcpMasterBuilde
 			this.ip = this.spareIp;
 			this.spareIp = temporaryIp;
 		}
-		if (sparePort != 0) {
+		if (this.sparePort!=null && sparePort != 0) {
 			temporaryPort = this.port;
 			this.port = this.sparePort;
 			this.sparePort = temporaryPort;

+ 7 - 6
protocol-core/src/main/java/wei/yigulu/netty/HSConnectionListener.java

@@ -7,6 +7,7 @@ import io.netty.util.internal.StringUtil;
 import lombok.AllArgsConstructor;
 import lombok.NoArgsConstructor;
 import org.slf4j.Logger;
+import wei.yigulu.utils.FutureListenerReconnectThreadPool;
 
 import java.util.concurrent.TimeUnit;
 
@@ -18,12 +19,11 @@ import java.util.concurrent.TimeUnit;
  * @version 3.0
  */
 @AllArgsConstructor
-@NoArgsConstructor
 public class HSConnectionListener implements ChannelFutureListener {
 
 	private Logger log;
 
-	private AbstractHSTcpMasterBuilder masterBuilder;
+	private  final AbstractHSTcpMasterBuilder masterBuilder;
 
 	private int retryTimes;
 
@@ -40,12 +40,13 @@ public class HSConnectionListener implements ChannelFutureListener {
 	@Override
 	public void operationComplete(ChannelFuture channelFuture) throws Exception {
 		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
-			this.masterBuilder.getOrCreateWorkGroup().schedule(() -> {
+			FutureListenerReconnectThreadPool.getInstance().submitReconnectJob(masterBuilder,(() -> {
 				try {
 					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
+						log.warn("开始执行重连任务");
 						if (this.retryTimes < 10) {
 							log.error("服务端{}:{}链接不上,开始重连操作,第{}次尝试", this.masterBuilder.getIp(), this.masterBuilder.getPort(), retryTimes);
-							masterBuilder.create();
+							masterBuilder.createByUnBlock();
 							log.warn("重试连接失败");
 							this.retryTimes++;
 						} else {
@@ -55,7 +56,7 @@ public class HSConnectionListener implements ChannelFutureListener {
 							}
 							this.masterBuilder.refreshLoopGroup();
 							this.retryTimes = 0;
-							masterBuilder.create();
+							masterBuilder.createByUnBlock();
 							log.info("重置重试次数=0");
 						}
 					} else {
@@ -71,7 +72,7 @@ public class HSConnectionListener implements ChannelFutureListener {
 						ex.printStackTrace();
 					}
 				}
-			}, 3L, TimeUnit.SECONDS);
+			}));
 		} else {
 			log.info("服务端{}:{}链接成功...", this.masterBuilder.getIp(), this.masterBuilder.getPort());
 		}

+ 4 - 3
protocol-core/src/main/java/wei/yigulu/netty/SimpleTcpConnectionListener.java

@@ -5,6 +5,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.util.concurrent.ScheduledFuture;
 import org.slf4j.Logger;
+import wei.yigulu.utils.FutureListenerReconnectThreadPool;
 
 import java.util.concurrent.TimeUnit;
 
@@ -38,11 +39,11 @@ public class SimpleTcpConnectionListener implements ChannelFutureListener {
 	@Override
 	public void operationComplete(ChannelFuture channelFuture) throws Exception {
 		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
-			this.future = this.masterBuilder.getOrCreateWorkGroup().schedule(() -> {
+			FutureListenerReconnectThreadPool.getInstance().submitReconnectJob(masterBuilder,() -> {
 				try {
 					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
 						log.error("服务端{}:{}链接不上,开始重连操作", this.masterBuilder.getIp(), this.masterBuilder.getPort());
-						masterBuilder.create();
+						masterBuilder.createByUnBlock();
 					} else {
 						log.warn("masterBuilder在延迟过程中已由其他线程连接成功,此处略过重连");
 					}
@@ -54,7 +55,7 @@ public class SimpleTcpConnectionListener implements ChannelFutureListener {
 						ex.printStackTrace();
 					}
 				}
-			}, 6L, TimeUnit.SECONDS);
+			});
 		} else {
 			log.warn("masterBuilder已经连接成功,不进行重连操作");
 		}

+ 51 - 0
protocol-core/src/main/java/wei/yigulu/utils/FutureListenerReconnectThreadPool.java

@@ -0,0 +1,51 @@
+package wei.yigulu.utils;
+
+import wei.yigulu.netty.BaseProtocolBuilder;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Netty future的侦听器连接线程池(定时任务池)
+ *
+ * @author xiuwei
+ * @date 2021/11/22
+ */
+public class FutureListenerReconnectThreadPool {
+
+    private static class LazyHolder {
+        private static final FutureListenerReconnectThreadPool INSTANCE = new FutureListenerReconnectThreadPool();
+    }
+
+    private FutureListenerReconnectThreadPool() {
+    }
+
+    private Map<BaseProtocolBuilder, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();
+
+    public static final FutureListenerReconnectThreadPool getInstance() {
+        return LazyHolder.INSTANCE;
+    }
+
+    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
+
+
+    public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command) {
+        return submitReconnectJob(protocolBuilder, command, 5);
+    }
+
+    public ScheduledFuture submitReconnectJob(BaseProtocolBuilder protocolBuilder, Runnable command, int delaySecond) {
+        synchronized (protocolBuilder) {
+            protocolBuilder.getLog().info("{},添加延时重连任务",protocolBuilder.getBuilderId());
+            if (this.scheduledFutureMap.containsKey(protocolBuilder)) {
+                ScheduledFuture f = this.scheduledFutureMap.get(protocolBuilder);
+                //线程池内有客户端对应的定时任务线程
+                if (!f.isDone() || !f.isCancelled()) {
+                    //如果之前提交的定时任务未执行完毕
+                    f.cancel(true);
+                }
+            }
+            this.scheduledFutureMap.put(protocolBuilder, pool.schedule(command, delaySecond, TimeUnit.SECONDS));
+        }
+        return this.scheduledFutureMap.get(protocolBuilder);
+    }
+}

+ 1 - 1
protocol-iec104/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>1.2.10</version>
+        <version>1.2.20</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 2 - 1
protocol-iec104/src/main/java/wei/yigulu/iec104/nettyconfig/Master104Handle.java

@@ -92,9 +92,10 @@ public class Master104Handle extends SimpleChannelInboundHandler<ByteBuf> {
 		log.debug("发出U帧,启动命令");
 		this.isInitiative = false;
 		InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+		InetSocketAddress localIpSocket = (InetSocketAddress) ctx.channel().localAddress();
 		String clientIp = ipSocket.getAddress().getHostAddress();
 		Integer clientPort = ipSocket.getPort();
-		log.info("连接" + clientIp + ":" + clientPort + "服务端成功");
+		log.info("连接" + clientIp + ":" + clientPort + "服务端成功,本地端口:"+localIpSocket.getPort());
 		LinkContainer.getInstance().getLinks().put(ctx.channel().id(), new Iec104Link(ctx.channel(), clientIp, clientPort, Iec104Link.Role.SLAVER,masterBuilder.getLog()));
 		ctx.writeAndFlush(Unpooled.copiedBuffer(TechnicalTerm.START));
 		this.masterBuilder.connected();

+ 1 - 1
protocol-iec104/src/test/java/ClientTest.java

@@ -11,7 +11,7 @@ import wei.yigulu.iec104.nettyconfig.Iec104HSMasterBuilder;
 public class ClientTest {
 
 	public static void main(String[] args) {
-		new Iec104HSMasterBuilder("127.0.0.1", 2404).create();
+		new Iec104HSMasterBuilder("127.0.0.1", 2405).create();
 		System.out.println(123);
 	}
 

+ 9 - 1
protocol-iec104/src/test/java/MasterTest.java

@@ -5,6 +5,9 @@ import wei.yigulu.iec104.asdudataframe.TotalSummonType;
 import wei.yigulu.iec104.nettyconfig.Iec104HSMasterBuilder;
 import wei.yigulu.iec104.nettyconfig.Iec104MasterBuilder;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 /**
  * dad
  *
@@ -19,7 +22,12 @@ public class MasterTest {
 		Iec104HSMasterBuilder masterBuilder = new Iec104HSMasterBuilder("127.0.0.1", 2404);
 
 		masterBuilder.createByUnBlock();
-
+		Executors.newScheduledThreadPool(5).schedule(()->{
+			if (!masterBuilder.getFuture().channel().isActive()){
+				System.out.println("---------------");
+				masterBuilder.create();
+			}
+		},2, TimeUnit.MINUTES);
 	/*	//创建总召唤类型I帧
 		TotalSummonType totalSummonType = new TotalSummonType();
 		//反向生成asdu

+ 6 - 2
protocol-iec104/src/test/java/SlaveTest.java

@@ -9,7 +9,11 @@ import java.net.InetSocketAddress;
 public class SlaveTest {
 
 	public static void main(String[] args) throws Exception {
-		Iec104SlaverBuilder slaverBuilder = new Iec104SlaverBuilder(2404);
+		Integer i=null;
+		System.out.println(i==null);
+
+
+		/*Iec104SlaverBuilder slaverBuilder = new Iec104SlaverBuilder(2404);
 		slaverBuilder.getConnectFilterManager().appendFilter((c)->{
 			if(slaverBuilder.getChannels().size()>=1){
 				return -1;
@@ -21,7 +25,7 @@ public class SlaveTest {
 				return -1;
 			}else {return 1;}
 		});
-		slaverBuilder.create();
+		slaverBuilder.create();*/
 	}
 
 

+ 33 - 0
protocol-iec104/src/test/java/SlaveTest1.java

@@ -0,0 +1,33 @@
+import wei.yigulu.iec104.nettyconfig.Iec104SlaverBuilder;
+
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * @author: xiuwei
+ * @version:
+ */
+public class SlaveTest1 {
+
+
+    public static void main(String[] args) throws Exception {
+        ScheduledExecutorService ex = Executors.newScheduledThreadPool(2);
+        Random r = new Random();
+        Iec104SlaverBuilder slaverBuilder = new Iec104SlaverBuilder(2404);
+        slaverBuilder.createByUnBlock();
+        Thread.sleep(3000);
+        for (; ; ) {
+            System.out.println("关闭通道");
+            slaverBuilder.stop();
+            Thread.sleep(r.nextInt(5)*100000);
+            slaverBuilder = new Iec104SlaverBuilder(2404);
+            slaverBuilder.createByUnBlock();
+            Thread.sleep(r.nextInt(5) * 1000);
+        }
+
+
+    }
+
+
+}

+ 1 - 1
protocol-modbus/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>protocol</artifactId>
         <groupId>wei.yigulu</groupId>
-        <version>1.2.10</version>
+        <version>1.2.20</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <version>${modbus.version}</version>

+ 2 - 3
protocol-modbus/src/main/java/wei/yigulu/modbus/utils/ModbusRequestDataUtils.java

@@ -16,7 +16,6 @@ import wei.yigulu.modbus.domain.response.TcpModbusResponse;
 import wei.yigulu.modbus.domain.tcpextracode.TransactionIdentifier;
 import wei.yigulu.modbus.exceptiom.ModbusException;
 import wei.yigulu.modbus.netty.ModbusMasterBuilderInterface;
-import wei.yigulu.netty.AbstractMasterBuilder;
 import wei.yigulu.netty.AbstractTcpMasterBuilder;
 import wei.yigulu.netty.MasterInterface;
 import wei.yigulu.utils.PCON;
@@ -89,8 +88,8 @@ public class ModbusRequestDataUtils {
 	public static List<Obj4RequestCoil> splitModbusRequest(List<Integer> locator, int slave, FunctionCode functionCode) throws ModbusException {
 		List<Obj4RequestCoil> list = new ArrayList<>();
 		Collections.sort(locator);
-		Integer max = locator.get(0);
-		Integer min = locator.get(locator.size() - 1);
+		Integer min = locator.get(0);
+		Integer max = locator.get(locator.size() - 1);
 		List<Integer> ls = new ArrayList<>();
 		if (max - min < MAXLENGTH) {
 			list.add(new Obj4RequestCoil(slave, functionCode, locator));