package com.casic.missiles.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.concurrent.ForkJoinPool; @Slf4j @Component public class NettyClient { // static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); private final EventLoopGroup group = new NioEventLoopGroup(); private ChannelFuture mChannelFuture = null; private final ThreadLocal<Channel> mChannel = new ThreadLocal<>(); @Resource private NettyClientHandler nettyClientHandler; public void startClient(String host, int port) { // Configure the client. try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)//无阻塞 .option(ChannelOption.SO_KEEPALIVE, true)//长连接 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new HjtDecoder()); p.addLast(nettyClientHandler); } }); mChannelFuture = b.connect(host, port).addListener(future -> { if (future.isSuccess()) log.info(String.format("客户端启动成功,并监听端口:%s ", port)); log.info(String.format("客户端启动失败,并监听端口:%s ", port)); }); mChannelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { //重连服务端 startClient("192.168.1.50", 52002); } }); } catch (Exception e) { log.error("启动 netty 客户端出现异常", e); } } /** * 客户端通过 Channel 对象向服务器端发送数据 * * @param data 文本数据 */ public void send(String data) { try { if (mChannel.get() == null) { mChannel.set(mChannelFuture.channel()); } mChannel.get().writeAndFlush(Unpooled.copiedBuffer(data.getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { log.error(this.getClass().getName().concat(".send has error"), e); } } // 客户端启动,并连上服务器端 @PostConstruct public void init() { ForkJoinPool.commonPool().submit(() -> startClient("192.168.1.50", 52002)); // ForkJoinPool.commonPool().submit(() -> startClient("127.0.0.1", 52002)); } @PreDestroy public void destroy() { group.shutdownGracefully(); } }