package com.szpg.plc.server; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import com.szpg.plc.message.AppMessage; import com.szpg.plc.message.command.LinkCommand; import com.szpg.plc.protocol.DTProtocolInterface; import com.szpg.plc.protocol.ProtocolFactory; import com.szpg.plc.util.ByteUtil; import com.szpg.task.ACULinkTask; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ACUClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = Logger.getLogger(this.getClass().getName()); private final DTProtocolInterface finspi = ProtocolFactory.getDefaultDTProtocol(); @Override public void channelActive(ChannelHandlerContext ctx) { logger.info(ctx.channel().remoteAddress() + "连接建立成功"); // 1更新map中的client的channel对象 ACUClient client = ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)); if (null != client) { client.setChannel(ctx.channel()); // 2生成LinkCommand对象并解析成byte数组 LinkCommand lc = new LinkCommand(); lc.setMessageProducerId(ByteUtil.binToHexString(ByteUtil.hexStringToBytes(client.getNode(), 4))); byte[] lcBytes = finspi.messageToBytes(lc); // 3延迟10秒发送握手命令 ScheduledExecutorService linkSche = new ScheduledThreadPoolExecutor(1); linkSche.schedule(new ACULinkTask(client, lcBytes), 10, TimeUnit.SECONDS); linkSche.shutdown(); //执行完任务之后关闭线程 } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // channel置空 ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { byte[] byteResponse = ByteBufUtil.getBytes((ByteBuf) msg); // 记录日志 ACUClientUtil.recv.info(ByteUtil.binToHexString(byteResponse)); // 根据协议解析byte数组 List<byte[]> bytesList = finspi.extractByteMessage(byteResponse); for (int i = 0; i < bytesList.size(); i++) { // 解析消息 AppMessage message = finspi.bytesToMessage((byte[]) bytesList.get(i)); // 命令解析后处理 message.afterAction(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { logger.debug("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); // channel置空 ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null); } }