Newer
Older
pgdsc / src / com / szpg / plc / server / ACUClientHandler.java
ty-pc\admin on 19 Jun 2018 3 KB 20180619 修改提交
package com.szpg.plc.server;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import com.szpg.plc.message.AppMessage;
import com.szpg.plc.message.CommandResponse;
import com.szpg.plc.message.command.LinkCommand;
import com.szpg.plc.protocol.DTProtocolInterface;
import com.szpg.plc.protocol.ProtocolFactory;
import com.szpg.plc.util.ByteUtil;
import com.szpg.task.ACULinkTask;
import com.szpg.util.Configure;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ACUClientHandler extends ChannelInboundHandlerAdapter {
	
	private final Logger logger = Logger.getLogger(this.getClass().getName());
	
	private final DTProtocolInterface finspi = ProtocolFactory.getDefaultDTProtocol();
	
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    	logger.info(ctx.channel().remoteAddress() + "连接建立成功");
    	
    	// 1更新map中的client的channel对象
    	ACUClient client = ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1));
    	if (null != client) {
    		client.setChannel(ctx.channel());
    		
    		// 2生成LinkCommand对象并解析成byte数组
    		LinkCommand lc = new LinkCommand();
//    		lc.setMessageProducerId(ByteUtil.binToHexString(ByteUtil.hexStringToBytes(client.getNode(), 4)));
    		lc.setMessageProducerId(ByteUtil.binToHexString(ByteUtil.hexStringToBytes(Configure.getProperty("sys", "LOCALHOST.NODE"), 4)));
    		
    		byte[] lcBytes = finspi.messageToBytes(lc);
    		
    		// 3延迟10秒发送握手命令
    		ScheduledExecutorService linkSche = new ScheduledThreadPoolExecutor(1);
    		linkSche.schedule(new ACULinkTask(client, lcBytes), 5, TimeUnit.SECONDS);
    		linkSche.shutdown(); //执行完任务之后关闭线程
    	}
    }
    
    @Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	// channel置空
    	ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null);
	}
    
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    	byte[] byteResponse = ByteBufUtil.getBytes((ByteBuf) msg);
    	
    	// 记录日志
    	ACUClientUtil.recv.info(ByteUtil.binToHexString(byteResponse));
    	
    	// 根据协议解析byte数组
    	List<byte[]> bytesList = finspi.extractByteMessage(byteResponse);
    	for (int i = 0; i < bytesList.size(); i++) {
    		// 解析消息
    		AppMessage message = finspi.bytesToMessage((byte[]) bytesList.get(i));
    		
    		if (null != message) {
    			// 如果是命令响应,则将命令响应存入响应消息池
    			if (message instanceof CommandResponse) {
    				CommandResponse cr = (CommandResponse) message;
    				ACUClientUtil.getInstance().responsePool.putResponse(cr.getCmdId(), cr);
    			}
    			
    			// 命令解析后处理
    			message.afterAction();
    		}
    	}
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       logger.debug("channelReadComplete");
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
        
        // channel置空
        ACUClientUtil.getInstance().getClient(ctx.channel().remoteAddress().toString().substring(1)).setChannel(null);
    }
}