Newer
Older
pgdsc / src / com / szpg / plc / server / ACUClientHandler.java
package com.szpg.plc.server;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import com.szpg.db.dao.PgAcuCmdDao;
import com.szpg.db.dao.impl.PgAcuCmdDaoImpl;
import com.szpg.db.data.PgAcuCmd;
import com.szpg.plc.message.AppMessage;
import com.szpg.plc.message.command.LinkCommand;
import com.szpg.plc.message.response.LinkCommandResponse;
import com.szpg.plc.message.response.ReadMemoryCommandResponse;
import com.szpg.plc.message.response.WriteMemoryCommandResponse;
import com.szpg.plc.protocol.DTProtocolInterface;
import com.szpg.plc.protocol.ProtocolFactory;
import com.szpg.plc.util.ByteUtil;
import com.szpg.task.ACULinkTask;
import com.szpg.util.Configure;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ACUClientHandler extends ChannelInboundHandlerAdapter {
	
	private final Logger logger = Logger.getLogger(this.getClass().getName());
	
	private final DTProtocolInterface finspi = ProtocolFactory.getDefaultDTProtocol();
	
	private PgAcuCmdDao cmdDao = new PgAcuCmdDaoImpl();
	
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    	logger.info(ctx.channel().remoteAddress() + "连接建立成功");
    	
    	// 1更新map中的client的channel对象
    	ACUClient client = ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1));
    	if (null != client) {
    		client.setChannel(ctx.channel());
    		
    		// 2生成LinkCommand对象并解析成byte数组
    		LinkCommand lc = new LinkCommand();
//    		lc.setMessageProducerId(ByteUtil.binToHexString(ByteUtil.hexStringToBytes(client.getNode(), 4)));
    		lc.setMessageProducerId(ByteUtil.binToHexString(ByteUtil.hexStringToBytes(Configure.getProperty("sys", "LOCALHOST.NODE"), 4)));
    		
    		byte[] lcBytes = finspi.messageToBytes(lc);
    		
    		// 3延迟5秒发送握手命令
    		ScheduledExecutorService linkSche = new ScheduledThreadPoolExecutor(1);
    		linkSche.schedule(new ACULinkTask(client, lcBytes), 5, TimeUnit.SECONDS);
    		linkSche.shutdown(); //执行完任务之后关闭线程
    	}
    }
    
    @Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	// channel置空
    	ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null);
	}
    
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    	byte[] byteResponse = ByteBufUtil.getBytes((ByteBuf) msg);
    	
    	// 记录日志
    	ACUClientUtil.recv.info(ByteUtil.binToHexString(byteResponse));
    	String remoteAddress = ctx.channel().remoteAddress().toString(); // /172.20.10.5:9600
    	String host = ctx.channel().remoteAddress().toString().substring(1, remoteAddress.indexOf(":"));
    	
    	// 根据协议解析byte数组
    	List<byte[]> bytesList = finspi.extractByteMessage(byteResponse);
    	for (int i = 0; i < bytesList.size(); i++) {
    		// 解析消息
    		AppMessage message = finspi.bytesToMessage((byte[]) bytesList.get(i));
    		
    		if (null != message) {
    			message.setMessageProducerHost(host);
    			
    			if (message instanceof LinkCommandResponse) {
    				LinkCommandResponse lcr = (LinkCommandResponse) message;
    				
    				// 解析字节流
    				lcr.parseData();
    			} else if (message instanceof ReadMemoryCommandResponse) {
    				ReadMemoryCommandResponse rmcr = (ReadMemoryCommandResponse) message;
    				
    				// 查询数据中最近的有效的读内存命令,获取其读取的参数类型
    				PgAcuCmd cmd = cmdDao.findLatestCmdByHostAndType(host, rmcr.getCommandType());
    				if (null != cmd) {
    					rmcr.setAcucode(cmd.getDest_acu_code());
    					rmcr.setCmdId(cmd.getId());

    					// 解析字节流
    					rmcr.parseData();
    					
    					// 将已响应的命令删除
        				cmdDao.deleteCmdRecord(cmd.getId());
    				}
    			} else if (message instanceof WriteMemoryCommandResponse) {
    				WriteMemoryCommandResponse rmcr = (WriteMemoryCommandResponse) message;
    				
    				// 查询数据中最近的有效的写内存命令,获取其读取的参数类型
    				PgAcuCmd cmd = cmdDao.findLatestCmdByHostAndType(host, rmcr.getCommandType());
    				if (null != cmd) {
    					rmcr.setAcucode(cmd.getDest_acu_code());
    					rmcr.setCmdId(cmd.getId());
    					
    					// 字节流已解析
    					
    					// 将已响应的命令删除
        				cmdDao.deleteCmdRecord(cmd.getId());
    				}
    			}
    			
    			// 命令解析后处理
    			message.afterAction();
    		}
    	}
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       logger.debug("channelReadComplete");
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
        
        // channel置空
        ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null);
    }
}