package com.casic.missiles.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @Slf4j @Component public class NettyClient { // static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); private final EventLoopGroup group = new NioEventLoopGroup(); private ChannelFuture mChannelFuture = null; private final ThreadLocal<Channel> mChannel = new ThreadLocal<>(); // private final ThreadLocalMap<String,Channel> threadLocalMap = new ThreadLocalMap; // ThreadLocalMap @Resource private NettyClientHandler nettyClientHandler; public void startClient(String host, int port) { // Configure the client. try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)//无阻塞 .option(ChannelOption.SO_KEEPALIVE, true)//长连接 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new HjtDecoder()); // p.addLast(new NettyClientHandler(NettyClient.this)); p.addLast(nettyClientHandler); } }); mChannelFuture = b.connect(host, port).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future){ if (future.isSuccess()) { log.info("探测器{}--连接成功!!", host); } else { log.error("探测器{}--连接失败!!", host); future.channel().eventLoop().schedule(new Runnable() { @Override public void run() { log.info("探测器{}--重新连接!!", host); startClient(host, port); } }, 10, TimeUnit.SECONDS); } } }); } catch (Exception e) { log.error("启动 netty 客户端出现异常", e); } } /** * 客户端通过 Channel 对象向服务器端发送数据 * * @param data 文本数据 */ public boolean send(String data) { try { if (mChannel.get() == null) { mChannel.set(mChannelFuture.channel()); } mChannel.get().writeAndFlush(Unpooled.copiedBuffer(data.getBytes(StandardCharsets.UTF_8))); return true; } catch (Exception e) { log.error(this.getClass().getName().concat(".send has error"), e); } return false; } // 客户端启动,并连上服务器端 @PostConstruct public void init() { System.out.println("开始连接了------>"); ForkJoinPool.commonPool().submit(() -> startClient("192.168.1.50", 52002)); // ForkJoinPool.commonPool().submit(() -> startClient("127.0.0.1", 52002)); } @PreDestroy public void destroy() { group.shutdownGracefully(); } }