关注“腾飞开源”,一起共同成长! 1、更新内容 iot-modbus本次发布的V3.2.8版本主要优化服务端上线、掉线监听处理,以及对客户端心跳检测(超时没有接收到客户端上传的心跳则自动断开连接),请看下面的源码解读。 2、控制台日志输出效果
周飒博客-ZhouSa.com 编辑切换为居中
添加图片注释,不超过 140 字(可选)
3、服务端连接管理器 (1)服务端增加连接管理器MiiServerConnect,重写Channel的channelActive和channelInactive方法,监听Channel活跃状态情况进行处理,如下图所示: 编辑切换为居中
添加图片注释,不超过 140 字(可选)
(2)发布连接监听事件,主要通过spring的发布时间监听来处理,增加连接监听器ChannelConnectListener。 编辑切换为居中
添加图片注释,不超过 140 字(可选)
4、对客户端心跳检测 (1)增加心跳检测超时时间配置,如下图所示: 编辑切换为居中
添加图片注释,不超过 140 字(可选)
(2)服务端心跳检测超时时间,超时则主动断开链接。 编辑切换为居中
添加图片注释,不超过 140 字(可选)
5、源码解读 (1)服务端连接管理器源码package com.takeoff.iot.modbus.server.connect; import com.takeoff.iot.modbus.common.entity.ChannelConnectData; import com.takeoff.iot.modbus.common.enums.DeviceConnectEnum; import com.takeoff.iot.modbus.common.utils.CacheUtils; import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils; import com.takeoff.iot.modbus.common.utils.SpringContextUtil; import com.takeoff.iot.modbus.netty.channel.MiiChannel; import com.takeoff.iot.modbus.netty.device.MiiDeviceChannel; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.util.ObjectUtils; import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** * 类功能说明:客户端链接管理器<br/> * 公司名称:TF(腾飞)开源 <br/> * 作者:luorongxi <br/> */ @Slf4j @Sharable public class MiiServerConnect extends ChannelInboundHandlerAdapter { private ApplicationContext getApplicationContext = SpringContextUtil.applicationContext; private static int TIMEOUT = 5000; /** * 连接成功次数 */ private Map<String, Integer> onLineMap = new HashMap<>(); /** * 连接断开次数 */ private Map<String, Integer> breakOffMap = new HashMap<>(); public MiiServerConnect(){ } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //成功后,重连失败次数清零 Channel channel = ctx.channel(); ctx.fireChannelActive(); if(!JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length()); MiiChannel miiChannel = new MiiDeviceChannel(channel); Integer onLine = (ObjectUtils.isEmpty(onLineMap.get(miiChannel.name())) ? 0 : onLineMap.get(miiChannel.name())) + 1; onLineMap.put(miiChannel.name(), onLine); ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.ON_LINE.getKey(), address, onLine); if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData); //将柜地址与通讯管道的绑定关系写入缓存 CacheUtils.put(miiChannel.name(), miiChannel); } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); Channel channel = ctx.channel(); if(!JudgeEmptyUtils.isEmpty(channel) && !JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length()); MiiChannel miiChannel = new MiiDeviceChannel(channel); Integer breakOff = (ObjectUtils.isEmpty(breakOffMap.get(miiChannel.name())) ? 0 : breakOffMap.get(miiChannel.name())) + 1; breakOffMap.put(miiChannel.name(), breakOff); ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.BREAK_OFF.getKey(), address, breakOff); if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData); } //将通讯管道的绑定关系从缓存中删除 CacheUtils.remove(miiChannel.name()); //连接断开后的最后处理 ctx.pipeline().remove(ctx.handler()); ctx.deregister(); ctx.close(); } } }(2)连接监听器发布事件源码
package com.takeoff.iot.modbus.common.entity; import com.takeoff.iot.modbus.common.enums.DeviceConnectEnum; import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils; import org.springframework.context.ApplicationEvent; import lombok.Getter; @Getter public class ChannelConnectData extends ApplicationEvent { /** * 描述: TODO <br/> * Fields serialVersionUID : TODO <br/> */ private static final long serialVersionUID = 2111432846029949421L; private String deviceAddress = null; private Integer deviceConnect = null; private String connectMsg = null; public ChannelConnectData(Object source, Integer deviceConnect, String deviceAddress, int count) { super(source); if(!JudgeEmptyUtils.isEmpty(deviceAddress)){ this.deviceConnect = deviceConnect; this.deviceAddress = deviceAddress; this.connectMsg = "设备:"+ deviceAddress + DeviceConnectEnum.getName(deviceConnect) + ",累计:"+ count + "次"; } } }(3)连接监听器源码
package com.takeoff.iot.modbus.test.listener; import com.takeoff.iot.modbus.common.entity.ChannelConnectData; import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; @Slf4j @Component public class ChannelConnectListener { @EventListener public void handleReceiveDataEvent(ChannelConnectData data) { if(JudgeEmptyUtils.isEmpty(data.getDeviceConnect())){ log.info("设备连接状态码:"+data.getDeviceConnect()+" ---> "+data.getConnectMsg()); } } }(4)服务端心跳检测超时时间源码
package com.takeoff.iot.modbus.server; import java.util.List; import java.util.concurrent.TimeUnit; import com.takeoff.iot.modbus.common.utils.CacheUtils; import com.takeoff.iot.modbus.netty.device.MiiDeviceChannel; import com.takeoff.iot.modbus.netty.device.MiiDeviceGroup; import com.takeoff.iot.modbus.netty.device.MiiControlCentre; import com.takeoff.iot.modbus.common.bytes.factory.MiiDataFactory; import com.takeoff.iot.modbus.common.data.MiiHeartBeatData; import com.takeoff.iot.modbus.common.message.MiiMessage; import com.takeoff.iot.modbus.netty.channel.MiiChannel; import com.takeoff.iot.modbus.netty.channel.MiiChannelGroup; import com.takeoff.iot.modbus.netty.data.factory.MiiServerDataFactory; import com.takeoff.iot.modbus.netty.handle.*; import com.takeoff.iot.modbus.netty.listener.MiiListener; import com.takeoff.iot.modbus.server.connect.MiiServerConnect; import com.takeoff.iot.modbus.server.message.sender.MiiServerMessageSender; import com.takeoff.iot.modbus.server.message.sender.ServerMessageSender; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** * 类功能说明:设备通讯服务端<br/> * 公司名称:TF(腾飞)开源 <br/> * 作者:luorongxi <br/> */ @Slf4j public class MiiServer extends ChannelInitializer<SocketChannel> implements MiiControlCentre { private static int IDLE_TIMEOUT = 60000; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ChannelFuture future; private int port,nThread; @Getter private MiiChannelGroup groups; private MiiServerConnect connect; private ServerMessageSender sender; private MiiListenerHandler handler; private MiiDataFactory dataFactory; /** * 创建指定服务端口,默认线程数的服务端 * @param port 服务端口 */ public MiiServer(int port){ this(port, 0, IDLE_TIMEOUT); } /** * 创建指定服务端口,指定线程数的服务端 * @param port 服务端口 * @param nThread 执行线程池线程数 * @param heartBeatTime 心跳检测超时时间(单位:毫秒) */ public MiiServer(int port, int nThread, int heartBeatTime){ this.port = port; this.nThread = nThread; this.IDLE_TIMEOUT = heartBeatTime; this.groups = new MiiChannelGroup(); this.connect = new MiiServerConnect(); this.sender = new MiiServerMessageSender(); this.handler = new MiiListenerHandler(this.groups); this.handler.addListener(MiiMessage.HEARTBEAT, new MiiListener() { @Override public void receive(MiiChannel channel, MiiMessage message) { //通讯通道绑定设备IP groups.get(channel.name()).name(message.deviceGroup()); log.info("Netty通讯已绑定设备IP:"+ message.deviceGroup()); } }); this.dataFactory = new MiiServerDataFactory(); } /** * 启动服务 */ public void start(){ bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(nThread); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(this); future = b.bind(port); } /** * 停止服务 */ public void stop(){ future.channel().closeFuture(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } /** * 根据名称/地址找已连接设备组 * 名称/地址不存在或者未连接时返回null值 * @param name 名称/地址 * @return 设备组 */ public MiiChannel group(String name) { return get(name); } /** * 列出所有已连接设备组清单 * @return 所有已连接身边组清单 */ public List<MiiChannel> groups() { return groups.list(); } public ServerMessageSender sender(){ return sender; } /** * 添加接收指定指令的消息监听器 * @param command 指令类型 {@link MiiMessage} * @param listener 消息监听器 * @return 上一个消息监听器,如果没有返回null */ public MiiListener addListener(int command, MiiListener listener){ return handler.addListener(command, listener); } /** * 移除接收指定指令的消息监听器 * @param command 指令类型 {@link MiiMessage} * @return 移除消息监听器,如果没有返回null */ public MiiListener removeListener(int command){ return handler.removeListener(command); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); MiiDeviceGroup group = new MiiDeviceChannel(ch); add(group); //服务端心跳检测超时时间,超时则主动断开链接 p.addLast(new IdleStateHandler(0, 0, IDLE_TIMEOUT, TimeUnit.MILLISECONDS)); p.addLast(new ChannelInboundHandlerAdapter(){ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ ctx.disconnect(); } else { super.userEventTriggered(ctx, evt); } } }); p.addLast(new MiiMessageEncoder()); p.addLast(new MiiBasedFrameDecoder()); p.addLast(new MiiMessageDecoder(dataFactory)); p.addLast(connect); p.addLast(handler); p.addLast(new MiiExceptionHandler()); } @Override public boolean add(MiiChannel channel) { return groups.add(channel); } @Override public MiiChannel remove(String name) { return groups.remove(name); } @Override public MiiChannel get(String name) { return groups.get(name); } }(4)更详细的内容请查看“腾飞开源”物联网通讯协议 iot-modbus V3.2.8版本,gitee地址:https://gitee.com/takeoff/iot-modbus
还没有评论,来说两句吧...