Newer
Older
casic-robot-inspection / casic-server / src / main / java / com / casic / missiles / netty / NettyClient.java
liwenhao on 1 Aug 5 KB 1.初始化调用中子源
package com.casic.missiles.netty;

import com.casic.missiles.modular.neutron.service.INeutronOptService;
import com.casic.missiles.modular.robot.model.DetectorInfo;
import com.casic.missiles.modular.robot.service.IDetectorInfoService;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class NettyClient {

    private final EventLoopGroup group = new NioEventLoopGroup();
    private ChannelFuture mChannelFuture = null;
    private final ThreadLocal<Channel> mChannel = new ThreadLocal<>();

    //    @Resource
//    private NettyClientHandler nettyClientHandler;
    @Resource
    private IDetectorInfoService iDetectorInfoService;
    @Resource
    private ChannelCache channelCache;
    @Resource
    private INeutronOptService neutronOptService;

    public void startClient(String host, int port, Long userId) {
        // Configure the client.
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)//无阻塞
                    .option(ChannelOption.SO_KEEPALIVE, true)//长连接
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new HjtDecoder());
                            p.addLast(new NettyClientHandler(userId, channelCache, neutronOptService, NettyClient.this));
                        }
                    });

            mChannelFuture = b.connect(host, port).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        log.info("探测器{}:{}--连接成功!!", host, port);
                    } else {
                        log.error("探测器{}:{}--连接失败!!", host, port);
                        future.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                log.info("探测器{}:{}--重新连接!!", host, port);
                                startClient(host, port, userId);
                            }
                        }, 10, TimeUnit.SECONDS);
                    }
                }
            });
        } catch (Exception e) {
            log.error("启动 netty 客户端出现异常", e);
        }
    }

    /**
     * 客户端通过 Channel 对象向服务器端发送数据
     *
     * @param data 文本数据
     */
    public boolean send1(String data) {
        try {
            if (mChannel.get() == null) {
                mChannel.set(mChannelFuture.channel());
            }
            mChannel.get().writeAndFlush(Unpooled.copiedBuffer(data.getBytes(StandardCharsets.UTF_8)));
            return true;
        } catch (Exception e) {
            log.error(this.getClass().getName().concat(".send has error"), e);
        }
        return false;
    }

    //发送数据
    public boolean send(Long uid, String data) {
        try {
            Channel channel = channelCache.getChannelByUserId(uid + "");
            if (channel != null) {
                channel.writeAndFlush(Unpooled.copiedBuffer(data.getBytes(StandardCharsets.UTF_8)));
                return true;
            } else {
                log.error("探测器id--->{}--未连接,请检查设备", uid);
            }

        } catch (Exception e) {
            log.error(this.getClass().getName().concat(".send has error"), e);
        }
        return false;
    }

    // 客户端启动,并连上服务器端
//    @PostConstruct
    public void init() {
        List<DetectorInfo> detectorInfoList = new ArrayList<>();
        //10秒轮询机器人是否上线
        while (true) {
            try {
                System.out.println("中子等待连接------>");
                detectorInfoList = iDetectorInfoService.getOnlineList();
                if (null != detectorInfoList && detectorInfoList.size() > 0) {
                    break;
                }
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("中子源探测器开始连接了------>");

        detectorInfoList.forEach(detectorInfo -> {
            ForkJoinPool.commonPool().submit(() -> startClient(detectorInfo.getDetectorIp(),
                    detectorInfo.getDetectorPort(), detectorInfo.getRobotId()));
        });
//        ForkJoinPool.commonPool().submit(() -> startClient("192.168.1.50", 52002));
    }

    @PreDestroy
    public void destroy() {
        group.shutdownGracefully();
    }

}