Quellcode durchsuchen

新增网口传RTU 协议的客户端

xiuwei vor 3 Jahren
Ursprung
Commit
a8f91ab86d
18 geänderte Dateien mit 601 neuen und 327 gelöschten Zeilen
  1. 122 0
      protocol-core/src/main/java/wei/yigulu/netty/AbstractClientBuilder.java
  2. 8 11
      protocol-core/src/main/java/wei/yigulu/netty/AbstractDelimiterHandler.java
  3. 0 2
      protocol-core/src/main/java/wei/yigulu/netty/AbstractHSTcpMasterBuilder.java
  4. 7 104
      protocol-core/src/main/java/wei/yigulu/netty/AbstractMasterBuilder.java
  5. 5 6
      protocol-core/src/main/java/wei/yigulu/netty/AbstractTcpMasterBuilder.java
  6. 186 0
      protocol-core/src/main/java/wei/yigulu/netty/AbstractTcpServerBuilder.java
  7. 2 165
      protocol-core/src/main/java/wei/yigulu/netty/AbstractTcpSlaverBuilder.java
  8. 7 4
      protocol-core/src/main/java/wei/yigulu/netty/BaseProtocolBuilder.java
  9. 2 2
      protocol-core/src/main/java/wei/yigulu/netty/HSConnectionListener.java
  10. 26 0
      protocol-core/src/main/java/wei/yigulu/netty/MasterInterface.java
  11. 3 3
      protocol-core/src/main/java/wei/yigulu/netty/SimpleTcpConnectionListener.java
  12. 10 0
      protocol-core/src/main/java/wei/yigulu/netty/SlaverInterface.java
  13. 3 3
      protocol-core/src/main/java/wei/yigulu/utils/DataConvertor.java
  14. 71 0
      protocol-modbus/src/main/java/wei/yigulu/modbus/netty/ModbusRtuMasterWithTcpServer.java
  15. 77 0
      protocol-modbus/src/main/java/wei/yigulu/modbus/netty/ModbusRtuMasterWithTcpServerHandler.java
  16. 25 26
      protocol-modbus/src/main/java/wei/yigulu/modbus/utils/ModbusRequestDataUtils.java
  17. 46 0
      protocol-modbus/src/test/java/TestModbusRtuMasterWithTcpServer.java
  18. 1 1
      protocol-modbus/src/test/java/TestRtuMaster.java

+ 122 - 0
protocol-core/src/main/java/wei/yigulu/netty/AbstractClientBuilder.java

@@ -0,0 +1,122 @@
+package wei.yigulu.netty;
+
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import wei.yigulu.threadpool.LocalThreadPool;
+
+
+/**
+ * 网络传输层的client
+ *
+ * @author 修唯xiuwei
+ * @version 3.0
+ */
+
+@Accessors(chain = true)
+public abstract class AbstractClientBuilder extends BaseProtocolBuilder {
+
+
+	/**
+	 * Work group
+	 */
+	protected EventLoopGroup workGroup = null;
+	/**
+	 * Bootstrap
+	 */
+	protected Bootstrap bootstrap = null;
+
+	/**
+	 * Future
+	 */
+	@Getter
+	@Setter
+	protected ChannelFuture future;
+	/**
+	 * Connection listener
+	 */
+	protected ChannelFutureListener connectionListener = null;
+
+
+	protected ProtocolChannelInitializer channelInitializer = null;
+
+
+	public void stop() {
+		if (this.future != null) {
+			this.future.removeListener(getOrCreateConnectionListener());
+			if (!this.future.channel().eventLoop().isShutdown()) {
+				this.future.channel().close();
+			}
+		}
+		if (this.workGroup != null) {
+			this.workGroup.shutdownGracefully();
+		}
+		this.bootstrap = null;
+		this.workGroup = null;
+	}
+
+
+	/**
+	 * 创建Master 连接
+	 */
+	public abstract void create();
+
+	/**
+	 * Create by un block
+	 */
+	public void createByUnBlock() {
+		LocalThreadPool.getInstance().getLocalPool().execute(this::create);
+	}
+
+
+	/**
+	 * null则创建,有则获取获取EventLoopGroup 用与bootstrap的绑定
+	 *
+	 * @return or create work group
+	 */
+	public abstract EventLoopGroup getOrCreateWorkGroup();
+
+
+	/**
+	 * null则创建,有则获取获取bootstrap
+	 *
+	 * @return or create bootstrap
+	 */
+	public abstract Bootstrap getOrCreateBootstrap();
+
+
+	/**
+	 * null则创建,有则获取获取ConnectionListener
+	 *
+	 * @return or create connection listener
+	 */
+	public abstract ChannelFutureListener getOrCreateConnectionListener();
+
+	/**
+	 * null则创建,有则获取获取ChannelInitializer
+	 *
+	 * @return or create ChannelInitializer
+	 */
+	protected abstract ProtocolChannelInitializer getOrCreateChannelInitializer();
+
+	/**
+	 * 通道连接成功
+	 */
+	public void connected() {
+
+	}
+
+	/**
+	 * 通道断开连接
+	 */
+	public void disconnected() {
+
+	}
+
+
+}

+ 8 - 11
protocol-core/src/main/java/wei/yigulu/netty/AbstractDelimiterHandler.java

@@ -43,7 +43,7 @@ public abstract class AbstractDelimiterHandler extends ChannelInboundHandlerAdap
 	@Setter
 	@Getter
 	@Accessors(chain = true)
-	protected  int maxLength = 10240;
+	protected int maxLength = 10240;
 
 
 	/**
@@ -52,16 +52,13 @@ public abstract class AbstractDelimiterHandler extends ChannelInboundHandlerAdap
 	@Setter
 	@Getter
 	@Accessors(chain = true)
-	protected  int maxTimeSpace=200;
-
-
+	protected int maxTimeSpace = 200;
 
 
 	/**
 	 * 拓展寄居 ByteBuf
 	 * 拓展规则是: 初始容量为 两个ByteBuf的长度和,内容是byteBuf1未读部分+ byteBuf2未读部分。
 	 *
-	 *
 	 * @param byteBuf1
 	 * @param byteBuf2
 	 * @return {@link ByteBuf}
@@ -94,7 +91,7 @@ public abstract class AbstractDelimiterHandler extends ChannelInboundHandlerAdap
 	 * @param head    头字节数组
 	 * @return int  头位置
 	 */
-	protected int getHeadIndex(int from, int end, ByteBuf byteBuf,byte[] head) {
+	protected int getHeadIndex(int from, int end, ByteBuf byteBuf, byte[] head) {
 		if (byteBuf.readableBytes() < head.length) {
 			return -1;
 		}
@@ -109,7 +106,7 @@ public abstract class AbstractDelimiterHandler extends ChannelInboundHandlerAdap
 	/**
 	 * 清除寄存ByteBuf的指向和内容
 	 */
-	protected void clearCumulation(){
+	protected void clearCumulation() {
 		while (!cumulation.release()) {
 		}
 		cumulation = null;
@@ -118,7 +115,7 @@ public abstract class AbstractDelimiterHandler extends ChannelInboundHandlerAdap
 	/**
 	 * 清除寄存ByteBuf的指向  并提供新的指向内容重新赋值
 	 */
-	protected void setCumulation(ByteBuf byteBuf){
+	protected void setCumulation(ByteBuf byteBuf) {
 		while (!cumulation.release()) {
 		}
 		cumulation = byteBuf;
@@ -129,7 +126,7 @@ public abstract class AbstractDelimiterHandler extends ChannelInboundHandlerAdap
 	 *
 	 * @param byteBuf 字节缓冲区
 	 */
-	protected void mergeOrFlushByTimeSpan(ByteBuf byteBuf){
+	protected void mergeOrFlushByTimeSpan(ByteBuf byteBuf) {
 		if (timeMark.plusMillis(getMaxTimeSpace()).isBeforeNow()) {
 			log.warn("上一帧数据长度不足,但两帧时间间隔较长上一帧被舍弃 舍弃的数据帧为:" + DataConvertor.ByteBuf2String(cumulation));
 			while (!cumulation.release()) {
@@ -149,14 +146,14 @@ public abstract class AbstractDelimiterHandler extends ChannelInboundHandlerAdap
 	 * @param byteBuf 字节缓冲区
 	 * @return boolean
 	 */
-	protected boolean isOverMaxLength(ByteBuf byteBuf){
+	protected boolean isOverMaxLength(ByteBuf byteBuf) {
 		if (byteBuf.readableBytes() > getMaxLength()) {
 			while (!cumulation.release()) {
 			}
 			cumulation = null;
 			log.warn("报文超长舍弃");
 			return true;
-		}else{
+		} else {
 			if (cumulation == null) {
 				cumulation = byteBuf;
 			} else {

+ 0 - 2
protocol-core/src/main/java/wei/yigulu/netty/AbstractHSTcpMasterBuilder.java

@@ -60,8 +60,6 @@ public abstract class AbstractHSTcpMasterBuilder extends AbstractTcpMasterBuilde
 	private Integer sparePort;
 
 
-
-
 	@Override
 	public ChannelFutureListener getOrCreateConnectionListener() {
 		if (this.connectionListener == null) {

+ 7 - 104
protocol-core/src/main/java/wei/yigulu/netty/AbstractMasterBuilder.java

@@ -1,16 +1,9 @@
 package wei.yigulu.netty;
 
 
-import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoopGroup;
-import lombok.Getter;
-import lombok.Setter;
 import lombok.experimental.Accessors;
-import wei.yigulu.threadpool.LocalThreadPool;
 import wei.yigulu.utils.DataConvertor;
 
 
@@ -22,46 +15,7 @@ import wei.yigulu.utils.DataConvertor;
  */
 
 @Accessors(chain = true)
-public abstract class AbstractMasterBuilder extends BaseProtocolBuilder {
-
-
-	/**
-	 * Work group
-	 */
-	protected EventLoopGroup workGroup = null;
-	/**
-	 * Bootstrap
-	 */
-	protected Bootstrap bootstrap = null;
-
-	/**
-	 * Future
-	 */
-	@Getter
-	@Setter
-	protected ChannelFuture future;
-	/**
-	 * Connection listener
-	 */
-	protected ChannelFutureListener connectionListener = null;
-
-
-	protected ProtocolChannelInitializer channelInitializer = null;
-
-
-	public void stop() {
-		if (this.future != null) {
-			this.future.removeListener(getOrCreateConnectionListener());
-			if (!this.future.channel().eventLoop().isShutdown()) {
-				this.future.channel().close();
-			}
-		}
-		if (this.workGroup != null) {
-			this.workGroup.shutdownGracefully();
-		}
-		this.bootstrap=null;
-		this.workGroup=null;
-	}
+public abstract class AbstractMasterBuilder extends AbstractClientBuilder implements MasterInterface {
 
 
 	/**
@@ -69,10 +23,13 @@ public abstract class AbstractMasterBuilder extends BaseProtocolBuilder {
 	 *
 	 * @param bytes
 	 */
+	@Override
 	public void sendFrameToOpposite(byte[] bytes) {
 		if (getFuture() != null && getFuture().channel().isActive()) {
 			getLog().info("se ==> " + DataConvertor.Byte2String(bytes));
 			getFuture().channel().writeAndFlush(Unpooled.copiedBuffer(bytes));
+		}else{
+			throw new RuntimeException("无客户端连接");
 		}
 	}
 
@@ -81,69 +38,15 @@ public abstract class AbstractMasterBuilder extends BaseProtocolBuilder {
 	 *
 	 * @param byteBuf
 	 */
+	@Override
 	public void sendFrameToOpposite(ByteBuf byteBuf) {
 		if (getFuture() != null && getFuture().channel().isActive()) {
 			getLog().info("se ==> " + DataConvertor.ByteBuf2String(byteBuf));
 			getFuture().channel().writeAndFlush(Unpooled.copiedBuffer(byteBuf));
+		}else{
+			throw new RuntimeException("无客户端连接");
 		}
 	}
 
-	/**
-	 * 创建Master 连接
-	 */
-	public  abstract void create();
-
-	/**
-	 * Create by un block
-	 */
-	public void createByUnBlock() {
-		LocalThreadPool.getInstance().getLocalPool().execute(this::create);
-	}
-
-
-	/**
-	 * null则创建,有则获取获取EventLoopGroup 用与bootstrap的绑定
-	 *
-	 * @return or create work group
-	 */
-	public abstract EventLoopGroup getOrCreateWorkGroup();
-
-
-	/**
-	 * null则创建,有则获取获取bootstrap
-	 *
-	 * @return or create bootstrap
-	 */
-	public abstract Bootstrap getOrCreateBootstrap();
-
-
-	/**
-	 * null则创建,有则获取获取ConnectionListener
-	 *
-	 * @return or create connection listener
-	 */
-	public abstract ChannelFutureListener getOrCreateConnectionListener();
-
-	/**
-	 * null则创建,有则获取获取ChannelInitializer
-	 *
-	 * @return or create ChannelInitializer
-	 */
-	protected abstract ProtocolChannelInitializer getOrCreateChannelInitializer();
-
-	/**
-	 * 通道连接成功
-	 */
-	public void connected() {
-
-	}
-
-	/**
-	 * 通道断开连接
-	 */
-	public void disconnected() {
-
-	}
-
 
 }

+ 5 - 6
protocol-core/src/main/java/wei/yigulu/netty/AbstractTcpMasterBuilder.java

@@ -78,11 +78,11 @@ public abstract class AbstractTcpMasterBuilder extends AbstractMasterBuilder {
 			}
 			log.debug("创建连接");
 			try {
-				SocketAddress remoteAddress= new InetSocketAddress(getIp(),getPort());
-				if(!StringUtil.isNullOrEmpty(getSelfIp()) && getSelfPort()!=null){
-					SocketAddress localAddress= new InetSocketAddress(getSelfIp(),getSelfPort());
-					future = getOrCreateBootstrap().connect(remoteAddress,localAddress);
-				}else{
+				SocketAddress remoteAddress = new InetSocketAddress(getIp(), getPort());
+				if (!StringUtil.isNullOrEmpty(getSelfIp()) && getSelfPort() != null) {
+					SocketAddress localAddress = new InetSocketAddress(getSelfIp(), getSelfPort());
+					future = getOrCreateBootstrap().connect(remoteAddress, localAddress);
+				} else {
 					future = getOrCreateBootstrap().connect(remoteAddress);
 				}
 				log.debug("为连接添加监听");
@@ -170,5 +170,4 @@ public abstract class AbstractTcpMasterBuilder extends AbstractMasterBuilder {
 	}
 
 
-
 }

+ 186 - 0
protocol-core/src/main/java/wei/yigulu/netty/AbstractTcpServerBuilder.java

@@ -0,0 +1,186 @@
+package wei.yigulu.netty;
+
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import wei.yigulu.connectfilterofslave.ConnectFilterManager;
+import wei.yigulu.threadpool.LocalThreadPool;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * TCP网络传输层的server
+ * 向主站上送数据
+ *
+ * @author 修唯xiuwei
+ * @version 3.0
+ */
+@EqualsAndHashCode(callSuper = true)
+@Accessors(chain = true)
+public abstract class AbstractTcpServerBuilder extends BaseProtocolBuilder {
+
+
+	private EventLoopGroup group;
+
+	private ServerBootstrap serverBootstrap;
+
+	@Setter
+	@Getter
+	private int port = 2404;
+
+	@Setter
+	@Getter
+	private String ip = null;
+
+
+	/**
+	 * 连接过滤器管理器
+	 */
+	@Setter
+	@Getter
+	private ConnectFilterManager connectFilterManager = new ConnectFilterManager();
+
+	protected ProtocolChannelInitializer channelInitializer = null;
+
+	/**
+	 * 子channel集合
+	 */
+	@Getter
+	private List<Channel> channels = new ArrayList<>();
+	/**
+	 * 父channel
+	 */
+	@Getter
+	private Channel fatherChannel;
+
+
+	public AbstractTcpServerBuilder(int port) {
+		this.port = port;
+	}
+
+	/**
+	 * 创建104 slave 监听
+	 *
+	 * @throws Exception 异常
+	 */
+	public void create() throws Exception {
+		// 服务器异步创建绑定
+		ChannelFuture cf = getOrCrateServerBootstrap().bind().sync();
+		this.fatherChannel = cf.channel();
+		log.info("Slaver端启动成功;端口" + port);
+		// 关闭服务器通道
+		cf.channel().closeFuture().sync();
+		// 释放线程池资源
+		group.shutdownGracefully().sync();
+	}
+
+
+	/**
+	 * null则创建,有则获取获取ChannelInitializer
+	 *
+	 * @return or create ChannelInitializer
+	 */
+	protected abstract ProtocolChannelInitializer getOrCreateChannelInitializer();
+
+	/**
+	 * 获取ServerBootstrap
+	 * 如果==null 那么就创建
+	 *
+	 * @return
+	 */
+	protected ServerBootstrap getOrCrateServerBootstrap() {
+		if (this.serverBootstrap == null) {
+			AbstractTcpServerBuilder slaverBuilder = this;
+			this.serverBootstrap = new ServerBootstrap();
+			// 绑定线程池
+			this.serverBootstrap.group(getOrCrateLoopGroup())
+					// 指定使用的channel
+					.channel(NioServerSocketChannel.class)
+					// 绑定客户端连接时候触发操作
+					.childHandler(getOrCreateChannelInitializer());
+			// 绑定监听端口
+			if (this.ip != null) {
+				this.serverBootstrap.localAddress(this.ip, this.port);
+			} else {
+				this.serverBootstrap.localAddress(this.port);
+			}
+		}
+		return this.serverBootstrap;
+	}
+
+	/**
+	 * 获取或创建 循环任务组
+	 *
+	 * @return {@link EventLoopGroup}
+	 */
+	protected EventLoopGroup getOrCrateLoopGroup() {
+		if (this.group == null) {
+			this.group = new NioEventLoopGroup();
+		}
+		return this.group;
+	}
+
+
+	/**
+	 * 停止通道监听
+	 */
+	public void stop() {
+		if (this.fatherChannel != null) {
+			this.fatherChannel.close();
+			fatherChannel = null;
+		}
+		for (Channel c : this.channels) {
+			c.close();
+		}
+		this.channels = new ArrayList<>();
+		if (this.group != null) {
+			this.group.shutdownGracefully();
+		}
+
+	}
+
+	/**
+	 * 以非阻塞的方式启动
+	 */
+	public void createByUnBlock() {
+		AbstractTcpServerBuilder s = this;
+		LocalThreadPool.getInstance().getLocalPool().execute(() -> {
+			try {
+				s.create();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		});
+	}
+
+
+	/**
+	 * 当有连接接入时触发
+	 *
+	 * @param ipSocket ip套接字
+	 */
+	public void connected(InetSocketAddress ipSocket) {
+
+	}
+
+	/**
+	 * 当有连接断开时触发
+	 *
+	 * @param ipSocket ip套接字
+	 */
+	public void disconnected(InetSocketAddress ipSocket) {
+
+	}
+
+}

+ 2 - 165
protocol-core/src/main/java/wei/yigulu/netty/AbstractTcpSlaverBuilder.java

@@ -1,22 +1,8 @@
 package wei.yigulu.netty;
 
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
 import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
 import lombok.experimental.Accessors;
-import wei.yigulu.connectfilterofslave.ConnectFilterManager;
-import wei.yigulu.threadpool.LocalThreadPool;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
 
 
 /**
@@ -28,159 +14,10 @@ import java.util.List;
  */
 @EqualsAndHashCode(callSuper = true)
 @Accessors(chain = true)
-public abstract class AbstractTcpSlaverBuilder extends BaseProtocolBuilder {
-
-
-	private EventLoopGroup group;
-
-	private ServerBootstrap serverBootstrap;
-
-	@Setter
-	@Getter
-	private int port = 2404;
-
-	@Setter
-	@Getter
-	private String ip = null;
-
-
-	/**
-	 * 连接过滤器管理器
-	 */
-	@Setter
-	@Getter
-	private ConnectFilterManager connectFilterManager = new ConnectFilterManager();
-
-	protected ProtocolChannelInitializer channelInitializer = null;
-
-	/**
-	 * 子channel集合
-	 */
-	@Getter
-	private List<Channel> channels = new ArrayList<>();
-	/**
-	 * 父channel
-	 */
-	@Getter
-	private Channel fatherChannel;
+public abstract class AbstractTcpSlaverBuilder extends AbstractTcpServerBuilder implements SlaverInterface {
 
 
 	public AbstractTcpSlaverBuilder(int port) {
-		this.port = port;
-	}
-
-	/**
-	 * 创建104 slave 监听
-	 *
-	 * @throws Exception 异常
-	 */
-	public void create() throws Exception {
-		// 服务器异步创建绑定
-		ChannelFuture cf = getOrCrateServerBootstrap().bind().sync();
-		this.fatherChannel = cf.channel();
-		log.info("Slaver端启动成功;端口" + port);
-		// 关闭服务器通道
-		cf.channel().closeFuture().sync();
-	// 释放线程池资源
-		group.shutdownGracefully().sync();
-}
-
-
-	/**
-	 * null则创建,有则获取获取ChannelInitializer
-	 *
-	 * @return or create ChannelInitializer
-	 */
-	protected abstract ProtocolChannelInitializer getOrCreateChannelInitializer();
-
-	/**
-	 * 获取ServerBootstrap
-	 * 如果==null 那么就创建
-	 *
-	 * @return
-	 */
-	protected ServerBootstrap getOrCrateServerBootstrap() {
-		if (this.serverBootstrap == null) {
-			AbstractTcpSlaverBuilder slaverBuilder = this;
-			this.serverBootstrap = new ServerBootstrap();
-			// 绑定线程池
-			this.serverBootstrap.group(getOrCrateLoopGroup())
-					// 指定使用的channel
-					.channel(NioServerSocketChannel.class)
-					// 绑定客户端连接时候触发操作
-					.childHandler(getOrCreateChannelInitializer());
-			// 绑定监听端口
-			if (this.ip != null) {
-				this.serverBootstrap.localAddress(this.ip, this.port);
-			} else {
-				this.serverBootstrap.localAddress(this.port);
-			}
-		}
-		return this.serverBootstrap;
+		super(port);
 	}
-
-	/**
-	 * 获取或创建 循环任务组
-	 *
-	 * @return {@link EventLoopGroup}
-	 */
-	protected EventLoopGroup getOrCrateLoopGroup() {
-		if (this.group == null) {
-			this.group = new NioEventLoopGroup();
-		}
-		return this.group;
-	}
-
-
-	/**
-	 * 停止通道监听
-	 */
-	public void stop() {
-		if (this.fatherChannel != null) {
-			this.fatherChannel.close();
-			fatherChannel = null;
-		}
-		for (Channel c : this.channels) {
-			c.close();
-		}
-		this.channels = new ArrayList<>();
-		if (this.group != null) {
-			this.group.shutdownGracefully();
-		}
-
-	}
-
-	/**
-	 * 以非阻塞的方式启动
-	 */
-	public void createByUnBlock() {
-		AbstractTcpSlaverBuilder s = this;
-		LocalThreadPool.getInstance().getLocalPool().execute(() -> {
-			try {
-				s.create();
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		});
-	}
-
-
-	/**
-	 * 当有连接接入时触发
-	 *
-	 * @param ipSocket ip套接字
-	 */
-	public void connected(InetSocketAddress ipSocket) {
-
-	}
-
-	/**
-	 * 当有连接断开时触发
-	 *
-	 * @param ipSocket ip套接字
-	 */
-	public void disconnected(InetSocketAddress ipSocket) {
-
-	}
-
 }

+ 7 - 4
protocol-core/src/main/java/wei/yigulu/netty/BaseProtocolBuilder.java

@@ -1,6 +1,5 @@
 package wei.yigulu.netty;
 
-import io.netty.util.ResourceLeakDetector;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.experimental.Accessors;
@@ -19,7 +18,7 @@ import java.util.UUID;
  */
 public class BaseProtocolBuilder {
 
-	public  BaseProtocolBuilder(){
+	public BaseProtocolBuilder() {
 		//ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
 	}
 
@@ -42,8 +41,12 @@ public class BaseProtocolBuilder {
 
 	@Override
 	public boolean equals(Object o) {
-		if (this == o) {return true;}
-		if (o == null || getClass() != o.getClass()) {return false;}
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
 		BaseProtocolBuilder that = (BaseProtocolBuilder) o;
 		return builderId.equals(that.builderId);
 	}

+ 2 - 2
protocol-core/src/main/java/wei/yigulu/netty/HSConnectionListener.java

@@ -42,7 +42,7 @@ public class HSConnectionListener implements ChannelFutureListener {
 		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
 			this.masterBuilder.getOrCreateWorkGroup().schedule(() -> {
 				try {
-					if(masterBuilder.future==null ||!masterBuilder.future.channel().isActive()) {
+					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
 						if (this.retryTimes < 10) {
 							log.error("服务端{}:{}链接不上,开始重连操作,第{}次尝试", this.masterBuilder.getIp(), this.masterBuilder.getPort(), retryTimes);
 							masterBuilder.create();
@@ -58,7 +58,7 @@ public class HSConnectionListener implements ChannelFutureListener {
 							masterBuilder.create();
 							log.info("重置重试次数=0");
 						}
-					}else {
+					} else {
 						log.warn("masterBuilder在延迟过程中已由其他线程连接成功,此处略过重连");
 					}
 				} catch (Exception e) {

+ 26 - 0
protocol-core/src/main/java/wei/yigulu/netty/MasterInterface.java

@@ -0,0 +1,26 @@
+package wei.yigulu.netty;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * 协议层的master
+ *
+ * @author: xiuwei
+ * @version:
+ */
+public interface MasterInterface {
+	/**
+	 * 向对端 发送数据帧
+	 *
+	 * @param bytes
+	 */
+	public void sendFrameToOpposite(byte[] bytes);
+
+	/**
+	 * 向对端 发送数据帧
+	 *
+	 * @param byteBuf
+	 */
+	public void sendFrameToOpposite(ByteBuf byteBuf);
+
+}

+ 3 - 3
protocol-core/src/main/java/wei/yigulu/netty/SimpleTcpConnectionListener.java

@@ -40,10 +40,10 @@ public class SimpleTcpConnectionListener implements ChannelFutureListener {
 		if (channelFuture == null || channelFuture.channel() == null || !channelFuture.channel().isActive()) {
 			this.future = this.masterBuilder.getOrCreateWorkGroup().schedule(() -> {
 				try {
-					if(masterBuilder.future==null ||!masterBuilder.future.channel().isActive()) {
+					if (masterBuilder.future == null || !masterBuilder.future.channel().isActive()) {
 						log.error("服务端{}:{}链接不上,开始重连操作", this.masterBuilder.getIp(), this.masterBuilder.getPort());
 						masterBuilder.create();
-					}else{
+					} else {
 						log.warn("masterBuilder在延迟过程中已由其他线程连接成功,此处略过重连");
 					}
 				} catch (Exception e) {
@@ -55,7 +55,7 @@ public class SimpleTcpConnectionListener implements ChannelFutureListener {
 					}
 				}
 			}, 6L, TimeUnit.SECONDS);
-		}else{
+		} else {
 			log.warn("masterBuilder已经连接成功,不进行重连操作");
 		}
 	}

+ 10 - 0
protocol-core/src/main/java/wei/yigulu/netty/SlaverInterface.java

@@ -0,0 +1,10 @@
+package wei.yigulu.netty;
+
+/**
+ * @program: protocol
+ * @description: 协议层的slaver 子站
+ * @author: xiuwei
+ * @create: 2021-06-11 10:39
+ */
+public interface SlaverInterface {
+}

+ 3 - 3
protocol-core/src/main/java/wei/yigulu/utils/DataConvertor.java

@@ -58,7 +58,7 @@ public class DataConvertor {
 		ByteBuf b1 = buf.copy();
 		byte[] bs = new byte[b1.readableBytes()];
 		b1.readBytes(bs);
-		ReferenceCountUtil.release( b1);
+		ReferenceCountUtil.release(b1);
 		return Byte2String(bs);
 	}
 
@@ -70,8 +70,8 @@ public class DataConvertor {
 	 * @return string
 	 */
 	public static String ByteBuf2StringAndRelease(ByteBuf buf) {
-		String s= ByteBuf2String(buf);
-		ReferenceCountUtil.release( buf);
+		String s = ByteBuf2String(buf);
+		ReferenceCountUtil.release(buf);
 		return s;
 
 	}

+ 71 - 0
protocol-modbus/src/main/java/wei/yigulu/modbus/netty/ModbusRtuMasterWithTcpServer.java

@@ -0,0 +1,71 @@
+package wei.yigulu.modbus.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import wei.yigulu.modbus.domain.synchronouswaitingroom.RtuSynchronousWaitingRoom;
+import wei.yigulu.modbus.domain.synchronouswaitingroom.SynchronousWaitingRoom;
+import wei.yigulu.modbus.exceptiom.ModbusException;
+import wei.yigulu.netty.AbstractTcpServerBuilder;
+import wei.yigulu.netty.AbstractTcpSlaverBuilder;
+import wei.yigulu.netty.MasterInterface;
+import wei.yigulu.netty.ProtocolChannelInitializer;
+import wei.yigulu.utils.DataConvertor;
+
+/**
+ * 以TCPserver的通讯方式  传输RTU协议  Master角色
+ *
+ * @author: xiuwei
+ * @version:
+ */
+public class ModbusRtuMasterWithTcpServer  extends AbstractTcpServerBuilder implements ModbusMasterBuilderInterface, MasterInterface {
+
+	private SynchronousWaitingRoom synchronousWaitingRoom;
+
+	public ModbusRtuMasterWithTcpServer(int port) {
+		super(port);
+	}
+
+	@Override
+	protected ProtocolChannelInitializer getOrCreateChannelInitializer() {
+		return new ProtocolChannelInitializer(this) {
+			@Override
+			protected void initChannel(Channel ch) throws Exception {
+				ch.pipeline().addLast(new ModbusRtuMasterDelimiterHandler().setLog(getLog()));
+				ch.pipeline().addLast(new ModbusRtuMasterWithTcpServerHandler((ModbusRtuMasterWithTcpServer) builder));
+			}
+		};
+	}
+
+	@Override
+	public SynchronousWaitingRoom getOrCreateSynchronousWaitingRoom() throws ModbusException {
+		if (this.synchronousWaitingRoom == null) {
+			this.synchronousWaitingRoom = new RtuSynchronousWaitingRoom();
+		}
+		return this.synchronousWaitingRoom;
+	}
+
+	@Override
+	public void sendFrameToOpposite(byte[] bytes) {
+		if(getChannels().size()>0){
+			getChannels().forEach(c->{
+				getLog().info("se ==> "+c.remoteAddress()+" :" + DataConvertor.Byte2String(bytes));
+				c.writeAndFlush(Unpooled.copiedBuffer(bytes));
+			});
+		}else{
+			throw new RuntimeException("无客户端连接");
+		}
+	}
+
+	@Override
+	public void sendFrameToOpposite(ByteBuf byteBuf) {
+		if(getChannels().size()>0){
+			getChannels().forEach(c->{
+				getLog().info("se ==> "+c.remoteAddress()+" :" + DataConvertor.ByteBuf2String(byteBuf));
+				c.writeAndFlush(Unpooled.copiedBuffer(byteBuf));
+			});
+		}else{
+			throw new RuntimeException("无客户端连接");
+		}
+	}
+}

+ 77 - 0
protocol-modbus/src/main/java/wei/yigulu/modbus/netty/ModbusRtuMasterWithTcpServerHandler.java

@@ -0,0 +1,77 @@
+package wei.yigulu.modbus.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import wei.yigulu.modbus.domain.request.TcpModbusRequest;
+import wei.yigulu.modbus.domain.response.TcpModbusResponse;
+import wei.yigulu.modbus.exceptiom.ModbusException;
+import wei.yigulu.modbus.utils.ModbusResponseDataUtils;
+import wei.yigulu.utils.DataConvertor;
+
+import java.net.InetSocketAddress;
+
+/**
+ * 以TCPserver的通讯方式  传输RTU协议  Master角色 处理器
+ * @author: xiuwei
+ * @version:
+ */
+public class ModbusRtuMasterWithTcpServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+	protected Logger log;
+
+	/**
+	 * Slave 104 handle
+	 *
+	 * @param builder slaver builder
+	 */
+	public ModbusRtuMasterWithTcpServerHandler(ModbusRtuMasterWithTcpServer builder) {
+		this.builder = builder;
+		this.log = builder.getLog();
+	}
+
+	protected ModbusRtuMasterWithTcpServer builder;
+
+
+	@Override
+	public void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
+		//收数据
+		log.debug("re <== " + DataConvertor.ByteBuf2String(byteBuf));
+		((ModbusMasterBuilderInterface) this.builder).getOrCreateSynchronousWaitingRoom().setData(byteBuf.nioBuffer());
+	}
+
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		log.error("ModbusSlave交互时发生异常", cause);
+	}
+
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+		String clientIp = ipSocket.getAddress().getHostAddress();
+		Integer clientPort = ipSocket.getPort();
+		if (!this.builder.getConnectFilterManager().verdict(ctx.channel())) {
+			ctx.channel().close();
+			log.info(clientIp + ":" + clientPort + "客户端被过滤链拦截,已关闭通道");
+			return;
+		}
+		log.info(clientIp + ":" + clientPort + "客户端连接");
+		this.builder.connected(ipSocket);
+		this.builder.getChannels().add(ctx.channel());
+	}
+
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+		String clientIp = ipSocket.getAddress().getHostAddress();
+		Integer clientPort = ipSocket.getPort();
+		log.info(clientIp + ":" + clientPort + "客户端断开连接");
+		this.builder.getChannels().remove(ctx.channel());
+		this.builder.disconnected(ipSocket);
+	}
+}

+ 25 - 26
protocol-modbus/src/main/java/wei/yigulu/modbus/utils/ModbusRequestDataUtils.java

@@ -18,6 +18,7 @@ import wei.yigulu.modbus.exceptiom.ModbusException;
 import wei.yigulu.modbus.netty.ModbusMasterBuilderInterface;
 import wei.yigulu.netty.AbstractMasterBuilder;
 import wei.yigulu.netty.AbstractTcpMasterBuilder;
+import wei.yigulu.netty.MasterInterface;
 import wei.yigulu.utils.PCON;
 
 import java.nio.ByteBuffer;
@@ -181,28 +182,28 @@ public class ModbusRequestDataUtils {
 	 * @return
 	 * @throws ModbusException
 	 */
-	public static <T extends AbstractModbusResponse> T requestData(AbstractMasterBuilder masterBuilder, AbstractModbusRequest modbusRequest, T response) throws ModbusException {
+	public static <T extends AbstractModbusResponse> T requestData(MasterInterface masterBuilder, AbstractModbusRequest modbusRequest, T response) throws ModbusException {
 		if (!(masterBuilder instanceof ModbusMasterBuilderInterface)) {
 			throw new ModbusException("请传人实现了<ModbusMasterBuilderInterface>的Master");
 		}
-		if (masterBuilder.getFuture() != null && masterBuilder.getFuture().channel().isActive()) {
-			List<Byte> byteList = new ArrayList<>();
-			modbusRequest.encode(byteList);
-			byte[] bb = Bytes.toArray(byteList);
+		List<Byte> byteList = new ArrayList<>();
+		modbusRequest.encode(byteList);
+		byte[] bb = Bytes.toArray(byteList);
+		try {
 			masterBuilder.sendFrameToOpposite(bb);
-			ByteBuffer buffer;
-			if (modbusRequest instanceof TcpModbusRequest) {
-				buffer = ((ModbusMasterBuilderInterface) masterBuilder).getOrCreateSynchronousWaitingRoom().getData(((TcpModbusRequest) modbusRequest).getTcpExtraCode().getTransactionIdentifier().getSeq());
-			} else {
-				buffer = ((ModbusMasterBuilderInterface) masterBuilder).getOrCreateSynchronousWaitingRoom().getData(0);
-			}
-			if (buffer == null) {
-				throw new ModbusException("Slave端响应超时");
-			}
-			response.decode(buffer);
+		} catch (RuntimeException e) {
+			throw new ModbusException(e.getMessage());
+		}
+		ByteBuffer buffer;
+		if (modbusRequest instanceof TcpModbusRequest) {
+			buffer = ((ModbusMasterBuilderInterface) masterBuilder).getOrCreateSynchronousWaitingRoom().getData(((TcpModbusRequest) modbusRequest).getTcpExtraCode().getTransactionIdentifier().getSeq());
 		} else {
-			throw new ModbusException("当前并Master未链接到Salve端");
+			buffer = ((ModbusMasterBuilderInterface) masterBuilder).getOrCreateSynchronousWaitingRoom().getData(0);
+		}
+		if (buffer == null) {
+			throw new ModbusException("Slave端响应超时");
 		}
+		response.decode(buffer);
 		return response;
 	}
 
@@ -219,7 +220,7 @@ public class ModbusRequestDataUtils {
 	 * @return
 	 * @throws ModbusException
 	 */
-	public static Map<Integer, IModbusDataType> getData(AbstractMasterBuilder masterBuilder, Map<Integer, ModbusDataTypeEnum> locator, Integer slaveId, FunctionCode functionCode) throws ModbusException {
+	public static Map<Integer, IModbusDataType> getData(MasterInterface masterBuilder, Map<Integer, ModbusDataTypeEnum> locator, Integer slaveId, FunctionCode functionCode) throws ModbusException {
 		List<Obj4RequestRegister> list = splitModbusRequest(locator, slaveId, functionCode);
 		return getRegisterData(masterBuilder, list);
 	}
@@ -237,7 +238,7 @@ public class ModbusRequestDataUtils {
 	 * @return
 	 * @throws ModbusException
 	 */
-	public static Map<Integer, Boolean> getData(AbstractMasterBuilder masterBuilder, List<Integer> locator, Integer slaveId, FunctionCode functionCode) throws ModbusException {
+	public static Map<Integer, Boolean> getData(MasterInterface masterBuilder, List<Integer> locator, Integer slaveId, FunctionCode functionCode) throws ModbusException {
 		List<Obj4RequestCoil> list = splitModbusRequest(locator, slaveId, functionCode);
 		return getCoilData(masterBuilder, list);
 	}
@@ -251,7 +252,7 @@ public class ModbusRequestDataUtils {
 	 * @param locators
 	 * @return
 	 */
-	public static Map<Integer, IModbusDataType> getRegisterData(AbstractMasterBuilder masterBuilder, List<Obj4RequestRegister> locators) throws ModbusException {
+	public static Map<Integer, IModbusDataType> getRegisterData(MasterInterface masterBuilder, List<Obj4RequestRegister> locators) throws ModbusException {
 		Map<Integer, IModbusDataType> map = new HashMap<>();
 		Map<Integer, IModbusDataType> map1 = null;
 		for (Obj4RequestRegister m : locators) {
@@ -261,7 +262,7 @@ public class ModbusRequestDataUtils {
 					map.putAll(map1);
 				}
 			} catch (ModbusException e) {
-				if ("当前并Master未链接到Salve端".equals(e.getMsg())) {
+				if ("无客户端连接".equals(e.getMsg())) {
 					throw e;
 				}
 			} catch (Exception e) {
@@ -272,7 +273,7 @@ public class ModbusRequestDataUtils {
 	}
 
 
-	public static Map<Integer, IModbusDataType> getRegisterData(AbstractMasterBuilder masterBuilder, Obj4RequestRegister locator) throws ModbusException {
+	public static Map<Integer, IModbusDataType> getRegisterData(MasterInterface masterBuilder, Obj4RequestRegister locator) throws ModbusException {
 		Map<Integer, IModbusDataType> map = null;
 		AbstractModbusResponse response;
 		try {
@@ -292,10 +293,9 @@ public class ModbusRequestDataUtils {
 				}
 			}
 		} catch (ModbusException e) {
-			if ("当前并Master未链接到Salve端".equals(e.getMsg())) {
+			if ("无客户端连接".equals(e.getMsg())) {
 				throw e;
 			}
-			masterBuilder.getLog().error(e.getMsg());
 		} catch (Exception e) {
 			e.printStackTrace();
 		}
@@ -311,7 +311,7 @@ public class ModbusRequestDataUtils {
 	 * @param locators
 	 * @return
 	 */
-	public static Map<Integer, Boolean> getCoilData(AbstractMasterBuilder masterBuilder, List<Obj4RequestCoil> locators) throws ModbusException {
+	public static Map<Integer, Boolean> getCoilData(MasterInterface masterBuilder, List<Obj4RequestCoil> locators) throws ModbusException {
 		Map<Integer, Boolean> map = new HashMap<>();
 		Map<Integer, Boolean> map1 = null;
 		AbstractModbusResponse requestData;
@@ -343,10 +343,9 @@ public class ModbusRequestDataUtils {
 					map.putAll(map1);
 				}
 			} catch (ModbusException e) {
-				if ("当前并Master未链接到Salve端".equals(e.getMsg())) {
+				if ("无客户端连接".equals(e.getMsg())) {
 					throw e;
 				}
-				masterBuilder.getLog().error(e.getMsg());
 			} catch (Exception e) {
 				e.printStackTrace();
 			}

+ 46 - 0
protocol-modbus/src/test/java/TestModbusRtuMasterWithTcpServer.java

@@ -0,0 +1,46 @@
+import wei.yigulu.modbus.domain.FunctionCode;
+import wei.yigulu.modbus.domain.Obj4RequestRegister;
+import wei.yigulu.modbus.domain.datatype.IModbusDataType;
+import wei.yigulu.modbus.domain.datatype.ModbusDataTypeEnum;
+import wei.yigulu.modbus.domain.datatype.NumericModbusData;
+import wei.yigulu.modbus.exceptiom.ModbusException;
+import wei.yigulu.modbus.netty.ModbusRtuMasterWithTcpServer;
+import wei.yigulu.modbus.utils.ModbusRequestDataUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author: xiuwei
+ * @version:
+ */
+public class TestModbusRtuMasterWithTcpServer {
+	public static void main(String[] args)  {
+		try {
+			ModbusRtuMasterWithTcpServer ss = new ModbusRtuMasterWithTcpServer(502);
+			ss.createByUnBlock();
+			Thread.sleep(5000L);
+			Map<Integer, ModbusDataTypeEnum> map = new HashMap<>();
+			for (int i = 0; i < 10; i += 2) {
+				map.put(i, ModbusDataTypeEnum.CDAB);
+			}
+			List<Obj4RequestRegister> ll = ModbusRequestDataUtils.splitModbusRequest(map, 1, FunctionCode.READ_HOLDING_REGISTERS);
+
+			for (; ; ) {
+				try {
+					Map<Integer, IModbusDataType> map1 = ModbusRequestDataUtils.getRegisterData(ss, ll);
+					for (Integer i : map1.keySet()) {
+						System.out.println(i + " ============ " + ((NumericModbusData) map1.get(i)).getValue());
+					}
+				} catch (ModbusException e) {
+					System.out.println(e.getMsg());
+				}
+				Thread.sleep(1000L);
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+	}
+}

+ 1 - 1
protocol-modbus/src/test/java/TestRtuMaster.java

@@ -31,7 +31,7 @@ public class TestRtuMaster {
 		master3.createByUnBlock();*/
 		Thread.sleep(5000L);
 		Map<Integer, ModbusDataTypeEnum> map = new HashMap<>();
-		for (int i = 0; i < 60; i+=2) {
+		for (int i = 0; i < 10; i+=2) {
 			map.put(i , ModbusDataTypeEnum.CDAB);
 		}
 		List<Obj4RequestRegister> ll = ModbusRequestDataUtils.splitModbusRequest(map, 1, FunctionCode.READ_HOLDING_REGISTERS);