package com.szpg.plc.server; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import com.szpg.db.dao.PgAcuCmdDao; import com.szpg.db.dao.impl.PgAcuCmdDaoImpl; import com.szpg.db.data.PgAcuCmd; import com.szpg.plc.message.AppMessage; import com.szpg.plc.message.command.LinkCommand; import com.szpg.plc.message.response.LinkCommandResponse; import com.szpg.plc.message.response.ReadMemoryCommandResponse; import com.szpg.plc.message.response.WriteMemoryCommandResponse; import com.szpg.plc.protocol.DTProtocolInterface; import com.szpg.plc.protocol.ProtocolFactory; import com.szpg.plc.util.ByteUtil; import com.szpg.task.ACULinkTask; import com.szpg.util.Configure; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ACUClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = Logger.getLogger(this.getClass().getName()); private final DTProtocolInterface finspi = ProtocolFactory.getDefaultDTProtocol(); private PgAcuCmdDao cmdDao = new PgAcuCmdDaoImpl(); @Override public void channelActive(ChannelHandlerContext ctx) { logger.info(ctx.channel().remoteAddress() + "连接建立成功"); // 1更新map中的client的channel对象 ACUClient client = ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)); if (null != client) { client.setChannel(ctx.channel()); // 2生成LinkCommand对象并解析成byte数组 LinkCommand lc = new LinkCommand(); // lc.setMessageProducerId(ByteUtil.binToHexString(ByteUtil.hexStringToBytes(client.getNode(), 4))); lc.setMessageProducerId(ByteUtil.binToHexString(ByteUtil.hexStringToBytes(Configure.getProperty("sys", "LOCALHOST.NODE"), 4))); byte[] lcBytes = finspi.messageToBytes(lc); // 3延迟5秒发送握手命令 ScheduledExecutorService linkSche = new ScheduledThreadPoolExecutor(1); linkSche.schedule(new ACULinkTask(client, lcBytes), 5, TimeUnit.SECONDS); linkSche.shutdown(); //执行完任务之后关闭线程 } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // channel置空 ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { byte[] byteResponse = ByteBufUtil.getBytes((ByteBuf) msg); // 记录日志 ACUClientUtil.recv.info(ByteUtil.binToHexString(byteResponse)); String remoteAddress = ctx.channel().remoteAddress().toString(); // /172.20.10.5:9600 String host = ctx.channel().remoteAddress().toString().substring(1, remoteAddress.indexOf(":")); // 根据协议解析byte数组 List<byte[]> bytesList = finspi.extractByteMessage(byteResponse); for (int i = 0; i < bytesList.size(); i++) { // 解析消息 AppMessage message = finspi.bytesToMessage((byte[]) bytesList.get(i)); if (null != message) { message.setMessageProducerHost(host); if (message instanceof LinkCommandResponse) { LinkCommandResponse lcr = (LinkCommandResponse) message; // 解析字节流 lcr.parseData(); } else if (message instanceof ReadMemoryCommandResponse) { ReadMemoryCommandResponse rmcr = (ReadMemoryCommandResponse) message; // 查询数据中最近的有效的读内存命令,获取其读取的参数类型 PgAcuCmd cmd = cmdDao.findLatestCmdByHostAndType(host, rmcr.getCommandType()); if (null != cmd) { rmcr.setAcucode(cmd.getDest_acu_code()); rmcr.setCmdId(cmd.getId()); // 解析字节流 rmcr.parseData(); // 将已响应的命令删除 cmdDao.deleteCmdRecord(cmd.getId()); } } else if (message instanceof WriteMemoryCommandResponse) { WriteMemoryCommandResponse rmcr = (WriteMemoryCommandResponse) message; // 查询数据中最近的有效的写内存命令,获取其读取的参数类型 PgAcuCmd cmd = cmdDao.findLatestCmdByHostAndType(host, rmcr.getCommandType()); if (null != cmd) { rmcr.setAcucode(cmd.getDest_acu_code()); rmcr.setCmdId(cmd.getId()); rmcr.setDestinationId(cmd.getDevice_code()); // 用设备资产编号赋值给目标地址 // 字节流已解析 // 将已响应的命令删除 cmdDao.deleteCmdRecord(cmd.getId()); } } // 命令解析后处理 message.afterAction(); } } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { logger.debug("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); // channel置空 ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null); } }