package com.casic.missiles.netty; import com.casic.missiles.modular.neutron.service.INeutronOptService; import com.casic.missiles.modular.robot.model.DetectorInfo; import com.casic.missiles.modular.robot.service.IDetectorInfoService; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @Slf4j @Component public class NettyClient { private final EventLoopGroup group = new NioEventLoopGroup(); private ChannelFuture mChannelFuture = null; private final ThreadLocal<Channel> mChannel = new ThreadLocal<>(); // @Resource // private NettyClientHandler nettyClientHandler; @Resource private IDetectorInfoService iDetectorInfoService; @Resource private ChannelCache channelCache; @Resource private INeutronOptService neutronOptService; public void startClient(String host, int port, Long userId) { // Configure the client. try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)//无阻塞 .option(ChannelOption.SO_KEEPALIVE, true)//长连接 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new HjtDecoder()); p.addLast(new NettyClientHandler(userId, channelCache, neutronOptService, NettyClient.this)); } }); mChannelFuture = b.connect(host, port).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { log.info("探测器{}:{}--连接成功!!", host, port); } else { log.error("探测器{}:{}--连接失败!!", host, port); future.channel().eventLoop().schedule(new Runnable() { @Override public void run() { log.info("探测器{}:{}--重新连接!!", host, port); startClient(host, port, userId); } }, 10, TimeUnit.SECONDS); } } }); } catch (Exception e) { log.error("启动 netty 客户端出现异常", e); } } /** * 客户端通过 Channel 对象向服务器端发送数据 * * @param data 文本数据 */ public boolean send1(String data) { try { if (mChannel.get() == null) { mChannel.set(mChannelFuture.channel()); } mChannel.get().writeAndFlush(Unpooled.copiedBuffer(data.getBytes(StandardCharsets.UTF_8))); return true; } catch (Exception e) { log.error(this.getClass().getName().concat(".send has error"), e); } return false; } //发送数据 public boolean send(Long uid, String data) { try { Channel channel = channelCache.getChannelByUserId(uid + ""); if (channel != null) { channel.writeAndFlush(Unpooled.copiedBuffer(data.getBytes(StandardCharsets.UTF_8))); return true; } else { log.error("探测器id--->{}--未连接,请检查设备", uid); } } catch (Exception e) { log.error(this.getClass().getName().concat(".send has error"), e); } return false; } // 客户端启动,并连上服务器端 // @PostConstruct public void init() { List<DetectorInfo> detectorInfoList = new ArrayList<>(); //10秒轮询机器人是否上线 while (true) { try { System.out.println("中子等待连接------>"); detectorInfoList = iDetectorInfoService.getOnlineList(); if (null != detectorInfoList && detectorInfoList.size() > 0) { break; } Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("中子源探测器开始连接了------>"); detectorInfoList.forEach(detectorInfo -> { ForkJoinPool.commonPool().submit(() -> startClient(detectorInfo.getDetectorIp(), detectorInfo.getDetectorPort(), detectorInfo.getRobotId())); }); // ForkJoinPool.commonPool().submit(() -> startClient("192.168.1.50", 52002)); } @PreDestroy public void destroy() { group.shutdownGracefully(); } }