diff --git a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt index cf8feaf..980950b 100644 --- a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt +++ b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt @@ -22,6 +22,7 @@ import com.casic.br.operationsite.utils.LocaleConstant import com.casic.br.operationsite.utils.OnTcpConnectStateListener import com.casic.br.operationsite.utils.TcpClient +import com.pengxh.kt.lite.extensions.show import com.pengxh.kt.lite.extensions.toAsciiCode import com.pengxh.kt.lite.utils.SaveKeyValues import com.pengxh.kt.lite.utils.WeakReferenceHandler @@ -48,6 +49,10 @@ override fun handleMessage(msg: Message): Boolean { when (msg.what) { + LocaleConstant.CONNECT_TCP_CODE->{ + tcpClient.start() + } + LocaleConstant.QUERY_METHANE_STATE_CODE -> { tcpClient.sendMessage(CommandCreator.queryMethaneState()) } @@ -209,6 +214,7 @@ notificationBuilder?.setContentTitle("通讯服务已连接") val notification = notificationBuilder?.build() notificationManager.notify(notificationId, notification) + "通讯服务已连接".show(this) //定时查询甲烷浓度 weakReferenceHandler?.post(methaneRunnable) } diff --git a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt index cf8feaf..980950b 100644 --- a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt +++ b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt @@ -22,6 +22,7 @@ import com.casic.br.operationsite.utils.LocaleConstant import com.casic.br.operationsite.utils.OnTcpConnectStateListener import com.casic.br.operationsite.utils.TcpClient +import com.pengxh.kt.lite.extensions.show import com.pengxh.kt.lite.extensions.toAsciiCode import com.pengxh.kt.lite.utils.SaveKeyValues import com.pengxh.kt.lite.utils.WeakReferenceHandler @@ -48,6 +49,10 @@ override fun handleMessage(msg: Message): Boolean { when (msg.what) { + LocaleConstant.CONNECT_TCP_CODE->{ + tcpClient.start() + } + LocaleConstant.QUERY_METHANE_STATE_CODE -> { tcpClient.sendMessage(CommandCreator.queryMethaneState()) } @@ -209,6 +214,7 @@ notificationBuilder?.setContentTitle("通讯服务已连接") val notification = notificationBuilder?.build() notificationManager.notify(notificationId, notification) + "通讯服务已连接".show(this) //定时查询甲烷浓度 weakReferenceHandler?.post(methaneRunnable) } diff --git a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt index 78e3ace..dd89794 100644 --- a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt +++ b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt @@ -64,6 +64,7 @@ const val TCP_PORT = 333 const val PLAY_RTSP_CODE = 20241001 + const val CONNECT_TCP_CODE = 20251001 /** * Handler Request Code diff --git a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt index cf8feaf..980950b 100644 --- a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt +++ b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt @@ -22,6 +22,7 @@ import com.casic.br.operationsite.utils.LocaleConstant import com.casic.br.operationsite.utils.OnTcpConnectStateListener import com.casic.br.operationsite.utils.TcpClient +import com.pengxh.kt.lite.extensions.show import com.pengxh.kt.lite.extensions.toAsciiCode import com.pengxh.kt.lite.utils.SaveKeyValues import com.pengxh.kt.lite.utils.WeakReferenceHandler @@ -48,6 +49,10 @@ override fun handleMessage(msg: Message): Boolean { when (msg.what) { + LocaleConstant.CONNECT_TCP_CODE->{ + tcpClient.start() + } + LocaleConstant.QUERY_METHANE_STATE_CODE -> { tcpClient.sendMessage(CommandCreator.queryMethaneState()) } @@ -209,6 +214,7 @@ notificationBuilder?.setContentTitle("通讯服务已连接") val notification = notificationBuilder?.build() notificationManager.notify(notificationId, notification) + "通讯服务已连接".show(this) //定时查询甲烷浓度 weakReferenceHandler?.post(methaneRunnable) } diff --git a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt index 78e3ace..dd89794 100644 --- a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt +++ b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt @@ -64,6 +64,7 @@ const val TCP_PORT = 333 const val PLAY_RTSP_CODE = 20241001 + const val CONNECT_TCP_CODE = 20251001 /** * Handler Request Code diff --git a/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt b/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt index 91f82bb..00ce86a 100644 --- a/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt +++ b/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt @@ -21,7 +21,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.net.InetSocketAddress import java.util.concurrent.TimeUnit @@ -30,20 +30,22 @@ class TcpClient(private val listener: OnTcpConnectStateListener) { + companion object { + private const val INITIAL_IDLE_TIME = 15L + private const val MAX_RETRY_TIMES = 10 + private const val RECONNECT_DELAY_SECONDS = 15L + private const val RECEIVE_BUFFER_MIN = 5000 + private const val RECEIVE_BUFFER_MAX = 8000 + } + private val kTag = "TcpClient" - private val reconnectDelay = 15L - private val maxRetryTimes = 10 // 设置最大重连次数 + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val loopGroup by lazy { NioEventLoopGroup() } - private var needReconnect = false - private lateinit var host: String + private var host: String = "" private var port: Int = 0 + private var needReconnect = false private var channel: Channel? = null - private var scope: CoroutineScope? = null - - @Volatile private var isRunning = AtomicBoolean(false) - - @Volatile private var retryTimes = AtomicInteger(0) /** @@ -56,21 +58,22 @@ fun start(host: String, port: Int) { this.host = host this.port = port - if (isRunning.get()) { - Log.d(kTag, "start: TcpClient 正在运行") - return - } + connect() + } + + fun start() { connect() } private inner class SimpleChannelInitializer : ChannelInitializer() { override fun initChannel(ch: SocketChannel) { - val channelPipeline = ch.pipeline() - channelPipeline - .addLast(ByteArrayDecoder()) - .addLast(ByteArrayEncoder()) - .addLast(IdleStateHandler(15, 15, 60, TimeUnit.SECONDS))//如果连接没有接收或发送数据超过60秒钟就发送一次心跳 - .addLast(object : SimpleChannelInboundHandler() { + ch.pipeline().apply { + addLast(ByteArrayDecoder()) + addLast(ByteArrayEncoder()) + addLast(//如果连接没有接收或发送数据超过60秒钟就发送一次心跳 + IdleStateHandler(INITIAL_IDLE_TIME, INITIAL_IDLE_TIME, 60, TimeUnit.SECONDS) + ) + addLast(object : SimpleChannelInboundHandler() { override fun channelActive(ctx: ChannelHandlerContext) { val address = ctx.channel().remoteAddress() as InetSocketAddress Log.d(kTag, "${address.address.hostAddress} 已连接") @@ -98,41 +101,31 @@ isRunning.set(false) } }) + } } } @Synchronized private fun connect() { - if (channel != null && channel!!.isActive) { - Log.d(kTag, "connect: TcpClient 正在运行") + if (isRunning()) { + Log.d(kTag, "start: TcpClient 正在运行") return } - scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) - scope?.launch(Dispatchers.IO) { + scope.launch(Dispatchers.IO) { try { - Log.d(kTag, "start connect: ${host}:${port}") - var bootStrap = Bootstrap().apply { - group(loopGroup) - channel(NioSocketChannel::class.java) - option(ChannelOption.TCP_NODELAY, true) //无阻塞 - option(ChannelOption.SO_KEEPALIVE, true) //长连接 - option( - ChannelOption.RCVBUF_ALLOCATOR, - AdaptiveRecvByteBufAllocator(5000, 5000, 8000) - ) - option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - handler(SimpleChannelInitializer()) - } - val channelFuture = bootStrap.connect(host, port) - .addListener(object : ChannelFutureListener { - override fun operationComplete(channelFuture: ChannelFuture) { - if (channelFuture.isSuccess) { + Log.d(kTag, "开始连接: $host:$port") + val bootstrap = createBootstrap() + val channelFuture = bootstrap.connect(host, port).apply { + addListener(object : ChannelFutureListener { + override fun operationComplete(future: ChannelFuture) { + if (future.isSuccess) { isRunning.set(true) retryTimes.set(0) - channel = channelFuture.channel() + channel = future.channel() } } - }).sync() + }) + }.sync() channelFuture.channel().closeFuture().sync() } catch (e: InterruptedException) { e.printStackTrace() @@ -143,13 +136,34 @@ } } + private fun createBootstrap(): Bootstrap { + return Bootstrap().apply { + group(loopGroup) + channel(NioSocketChannel::class.java) + option(ChannelOption.TCP_NODELAY, true) //无阻塞 + option(ChannelOption.SO_KEEPALIVE, true) //长连接 + option( + ChannelOption.RCVBUF_ALLOCATOR, + AdaptiveRecvByteBufAllocator( + RECEIVE_BUFFER_MIN, RECEIVE_BUFFER_MIN, RECEIVE_BUFFER_MAX + ) + ) + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + handler(SimpleChannelInitializer()) + } + } + private fun reconnect() { val currentRetryTimes = retryTimes.incrementAndGet() - if (currentRetryTimes <= maxRetryTimes) { - scope?.launch(Dispatchers.Main) { + if (currentRetryTimes <= MAX_RETRY_TIMES) { + scope.launch(Dispatchers.Main) { "开始第 $currentRetryTimes 次重连".show(BaseApplication.get()) } - loopGroup.schedule({ connect() }, reconnectDelay, TimeUnit.SECONDS) + //使用协程延迟重连,替代 Netty 的 loopGroup.schedule,防止调度器滥用。 + scope.launch(Dispatchers.Main) { + delay(RECONNECT_DELAY_SECONDS * 1000L) + connect() + } } else { Log.e(kTag, "达到最大重连次数,停止重连") listener.onConnectFailed() @@ -160,13 +174,10 @@ this.needReconnect = needReconnect isRunning.set(false) channel?.close() - scope?.cancel() } fun sendMessage(bytes: ByteArray) { - if (!isRunning.get()) { - return - } + if (!isRunning()) return channel?.writeAndFlush(bytes) } } \ No newline at end of file diff --git a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt index cf8feaf..980950b 100644 --- a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt +++ b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt @@ -22,6 +22,7 @@ import com.casic.br.operationsite.utils.LocaleConstant import com.casic.br.operationsite.utils.OnTcpConnectStateListener import com.casic.br.operationsite.utils.TcpClient +import com.pengxh.kt.lite.extensions.show import com.pengxh.kt.lite.extensions.toAsciiCode import com.pengxh.kt.lite.utils.SaveKeyValues import com.pengxh.kt.lite.utils.WeakReferenceHandler @@ -48,6 +49,10 @@ override fun handleMessage(msg: Message): Boolean { when (msg.what) { + LocaleConstant.CONNECT_TCP_CODE->{ + tcpClient.start() + } + LocaleConstant.QUERY_METHANE_STATE_CODE -> { tcpClient.sendMessage(CommandCreator.queryMethaneState()) } @@ -209,6 +214,7 @@ notificationBuilder?.setContentTitle("通讯服务已连接") val notification = notificationBuilder?.build() notificationManager.notify(notificationId, notification) + "通讯服务已连接".show(this) //定时查询甲烷浓度 weakReferenceHandler?.post(methaneRunnable) } diff --git a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt index 78e3ace..dd89794 100644 --- a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt +++ b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt @@ -64,6 +64,7 @@ const val TCP_PORT = 333 const val PLAY_RTSP_CODE = 20241001 + const val CONNECT_TCP_CODE = 20251001 /** * Handler Request Code diff --git a/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt b/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt index 91f82bb..00ce86a 100644 --- a/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt +++ b/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt @@ -21,7 +21,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.net.InetSocketAddress import java.util.concurrent.TimeUnit @@ -30,20 +30,22 @@ class TcpClient(private val listener: OnTcpConnectStateListener) { + companion object { + private const val INITIAL_IDLE_TIME = 15L + private const val MAX_RETRY_TIMES = 10 + private const val RECONNECT_DELAY_SECONDS = 15L + private const val RECEIVE_BUFFER_MIN = 5000 + private const val RECEIVE_BUFFER_MAX = 8000 + } + private val kTag = "TcpClient" - private val reconnectDelay = 15L - private val maxRetryTimes = 10 // 设置最大重连次数 + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val loopGroup by lazy { NioEventLoopGroup() } - private var needReconnect = false - private lateinit var host: String + private var host: String = "" private var port: Int = 0 + private var needReconnect = false private var channel: Channel? = null - private var scope: CoroutineScope? = null - - @Volatile private var isRunning = AtomicBoolean(false) - - @Volatile private var retryTimes = AtomicInteger(0) /** @@ -56,21 +58,22 @@ fun start(host: String, port: Int) { this.host = host this.port = port - if (isRunning.get()) { - Log.d(kTag, "start: TcpClient 正在运行") - return - } + connect() + } + + fun start() { connect() } private inner class SimpleChannelInitializer : ChannelInitializer() { override fun initChannel(ch: SocketChannel) { - val channelPipeline = ch.pipeline() - channelPipeline - .addLast(ByteArrayDecoder()) - .addLast(ByteArrayEncoder()) - .addLast(IdleStateHandler(15, 15, 60, TimeUnit.SECONDS))//如果连接没有接收或发送数据超过60秒钟就发送一次心跳 - .addLast(object : SimpleChannelInboundHandler() { + ch.pipeline().apply { + addLast(ByteArrayDecoder()) + addLast(ByteArrayEncoder()) + addLast(//如果连接没有接收或发送数据超过60秒钟就发送一次心跳 + IdleStateHandler(INITIAL_IDLE_TIME, INITIAL_IDLE_TIME, 60, TimeUnit.SECONDS) + ) + addLast(object : SimpleChannelInboundHandler() { override fun channelActive(ctx: ChannelHandlerContext) { val address = ctx.channel().remoteAddress() as InetSocketAddress Log.d(kTag, "${address.address.hostAddress} 已连接") @@ -98,41 +101,31 @@ isRunning.set(false) } }) + } } } @Synchronized private fun connect() { - if (channel != null && channel!!.isActive) { - Log.d(kTag, "connect: TcpClient 正在运行") + if (isRunning()) { + Log.d(kTag, "start: TcpClient 正在运行") return } - scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) - scope?.launch(Dispatchers.IO) { + scope.launch(Dispatchers.IO) { try { - Log.d(kTag, "start connect: ${host}:${port}") - var bootStrap = Bootstrap().apply { - group(loopGroup) - channel(NioSocketChannel::class.java) - option(ChannelOption.TCP_NODELAY, true) //无阻塞 - option(ChannelOption.SO_KEEPALIVE, true) //长连接 - option( - ChannelOption.RCVBUF_ALLOCATOR, - AdaptiveRecvByteBufAllocator(5000, 5000, 8000) - ) - option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - handler(SimpleChannelInitializer()) - } - val channelFuture = bootStrap.connect(host, port) - .addListener(object : ChannelFutureListener { - override fun operationComplete(channelFuture: ChannelFuture) { - if (channelFuture.isSuccess) { + Log.d(kTag, "开始连接: $host:$port") + val bootstrap = createBootstrap() + val channelFuture = bootstrap.connect(host, port).apply { + addListener(object : ChannelFutureListener { + override fun operationComplete(future: ChannelFuture) { + if (future.isSuccess) { isRunning.set(true) retryTimes.set(0) - channel = channelFuture.channel() + channel = future.channel() } } - }).sync() + }) + }.sync() channelFuture.channel().closeFuture().sync() } catch (e: InterruptedException) { e.printStackTrace() @@ -143,13 +136,34 @@ } } + private fun createBootstrap(): Bootstrap { + return Bootstrap().apply { + group(loopGroup) + channel(NioSocketChannel::class.java) + option(ChannelOption.TCP_NODELAY, true) //无阻塞 + option(ChannelOption.SO_KEEPALIVE, true) //长连接 + option( + ChannelOption.RCVBUF_ALLOCATOR, + AdaptiveRecvByteBufAllocator( + RECEIVE_BUFFER_MIN, RECEIVE_BUFFER_MIN, RECEIVE_BUFFER_MAX + ) + ) + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + handler(SimpleChannelInitializer()) + } + } + private fun reconnect() { val currentRetryTimes = retryTimes.incrementAndGet() - if (currentRetryTimes <= maxRetryTimes) { - scope?.launch(Dispatchers.Main) { + if (currentRetryTimes <= MAX_RETRY_TIMES) { + scope.launch(Dispatchers.Main) { "开始第 $currentRetryTimes 次重连".show(BaseApplication.get()) } - loopGroup.schedule({ connect() }, reconnectDelay, TimeUnit.SECONDS) + //使用协程延迟重连,替代 Netty 的 loopGroup.schedule,防止调度器滥用。 + scope.launch(Dispatchers.Main) { + delay(RECONNECT_DELAY_SECONDS * 1000L) + connect() + } } else { Log.e(kTag, "达到最大重连次数,停止重连") listener.onConnectFailed() @@ -160,13 +174,10 @@ this.needReconnect = needReconnect isRunning.set(false) channel?.close() - scope?.cancel() } fun sendMessage(bytes: ByteArray) { - if (!isRunning.get()) { - return - } + if (!isRunning()) return channel?.writeAndFlush(bytes) } } \ No newline at end of file diff --git a/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt b/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt index 440400d..b2c369f 100644 --- a/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt +++ b/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt @@ -107,7 +107,7 @@ } override fun onRightClick() { - + SocketConnectionService.weakReferenceHandler?.sendEmptyMessage(LocaleConstant.CONNECT_TCP_CODE) } }) } diff --git a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt index cf8feaf..980950b 100644 --- a/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt +++ b/app/src/main/java/com/casic/br/operationsite/service/SocketConnectionService.kt @@ -22,6 +22,7 @@ import com.casic.br.operationsite.utils.LocaleConstant import com.casic.br.operationsite.utils.OnTcpConnectStateListener import com.casic.br.operationsite.utils.TcpClient +import com.pengxh.kt.lite.extensions.show import com.pengxh.kt.lite.extensions.toAsciiCode import com.pengxh.kt.lite.utils.SaveKeyValues import com.pengxh.kt.lite.utils.WeakReferenceHandler @@ -48,6 +49,10 @@ override fun handleMessage(msg: Message): Boolean { when (msg.what) { + LocaleConstant.CONNECT_TCP_CODE->{ + tcpClient.start() + } + LocaleConstant.QUERY_METHANE_STATE_CODE -> { tcpClient.sendMessage(CommandCreator.queryMethaneState()) } @@ -209,6 +214,7 @@ notificationBuilder?.setContentTitle("通讯服务已连接") val notification = notificationBuilder?.build() notificationManager.notify(notificationId, notification) + "通讯服务已连接".show(this) //定时查询甲烷浓度 weakReferenceHandler?.post(methaneRunnable) } diff --git a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt index 78e3ace..dd89794 100644 --- a/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt +++ b/app/src/main/java/com/casic/br/operationsite/utils/LocaleConstant.kt @@ -64,6 +64,7 @@ const val TCP_PORT = 333 const val PLAY_RTSP_CODE = 20241001 + const val CONNECT_TCP_CODE = 20251001 /** * Handler Request Code diff --git a/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt b/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt index 91f82bb..00ce86a 100644 --- a/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt +++ b/app/src/main/java/com/casic/br/operationsite/utils/TcpClient.kt @@ -21,7 +21,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.net.InetSocketAddress import java.util.concurrent.TimeUnit @@ -30,20 +30,22 @@ class TcpClient(private val listener: OnTcpConnectStateListener) { + companion object { + private const val INITIAL_IDLE_TIME = 15L + private const val MAX_RETRY_TIMES = 10 + private const val RECONNECT_DELAY_SECONDS = 15L + private const val RECEIVE_BUFFER_MIN = 5000 + private const val RECEIVE_BUFFER_MAX = 8000 + } + private val kTag = "TcpClient" - private val reconnectDelay = 15L - private val maxRetryTimes = 10 // 设置最大重连次数 + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val loopGroup by lazy { NioEventLoopGroup() } - private var needReconnect = false - private lateinit var host: String + private var host: String = "" private var port: Int = 0 + private var needReconnect = false private var channel: Channel? = null - private var scope: CoroutineScope? = null - - @Volatile private var isRunning = AtomicBoolean(false) - - @Volatile private var retryTimes = AtomicInteger(0) /** @@ -56,21 +58,22 @@ fun start(host: String, port: Int) { this.host = host this.port = port - if (isRunning.get()) { - Log.d(kTag, "start: TcpClient 正在运行") - return - } + connect() + } + + fun start() { connect() } private inner class SimpleChannelInitializer : ChannelInitializer() { override fun initChannel(ch: SocketChannel) { - val channelPipeline = ch.pipeline() - channelPipeline - .addLast(ByteArrayDecoder()) - .addLast(ByteArrayEncoder()) - .addLast(IdleStateHandler(15, 15, 60, TimeUnit.SECONDS))//如果连接没有接收或发送数据超过60秒钟就发送一次心跳 - .addLast(object : SimpleChannelInboundHandler() { + ch.pipeline().apply { + addLast(ByteArrayDecoder()) + addLast(ByteArrayEncoder()) + addLast(//如果连接没有接收或发送数据超过60秒钟就发送一次心跳 + IdleStateHandler(INITIAL_IDLE_TIME, INITIAL_IDLE_TIME, 60, TimeUnit.SECONDS) + ) + addLast(object : SimpleChannelInboundHandler() { override fun channelActive(ctx: ChannelHandlerContext) { val address = ctx.channel().remoteAddress() as InetSocketAddress Log.d(kTag, "${address.address.hostAddress} 已连接") @@ -98,41 +101,31 @@ isRunning.set(false) } }) + } } } @Synchronized private fun connect() { - if (channel != null && channel!!.isActive) { - Log.d(kTag, "connect: TcpClient 正在运行") + if (isRunning()) { + Log.d(kTag, "start: TcpClient 正在运行") return } - scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) - scope?.launch(Dispatchers.IO) { + scope.launch(Dispatchers.IO) { try { - Log.d(kTag, "start connect: ${host}:${port}") - var bootStrap = Bootstrap().apply { - group(loopGroup) - channel(NioSocketChannel::class.java) - option(ChannelOption.TCP_NODELAY, true) //无阻塞 - option(ChannelOption.SO_KEEPALIVE, true) //长连接 - option( - ChannelOption.RCVBUF_ALLOCATOR, - AdaptiveRecvByteBufAllocator(5000, 5000, 8000) - ) - option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - handler(SimpleChannelInitializer()) - } - val channelFuture = bootStrap.connect(host, port) - .addListener(object : ChannelFutureListener { - override fun operationComplete(channelFuture: ChannelFuture) { - if (channelFuture.isSuccess) { + Log.d(kTag, "开始连接: $host:$port") + val bootstrap = createBootstrap() + val channelFuture = bootstrap.connect(host, port).apply { + addListener(object : ChannelFutureListener { + override fun operationComplete(future: ChannelFuture) { + if (future.isSuccess) { isRunning.set(true) retryTimes.set(0) - channel = channelFuture.channel() + channel = future.channel() } } - }).sync() + }) + }.sync() channelFuture.channel().closeFuture().sync() } catch (e: InterruptedException) { e.printStackTrace() @@ -143,13 +136,34 @@ } } + private fun createBootstrap(): Bootstrap { + return Bootstrap().apply { + group(loopGroup) + channel(NioSocketChannel::class.java) + option(ChannelOption.TCP_NODELAY, true) //无阻塞 + option(ChannelOption.SO_KEEPALIVE, true) //长连接 + option( + ChannelOption.RCVBUF_ALLOCATOR, + AdaptiveRecvByteBufAllocator( + RECEIVE_BUFFER_MIN, RECEIVE_BUFFER_MIN, RECEIVE_BUFFER_MAX + ) + ) + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + handler(SimpleChannelInitializer()) + } + } + private fun reconnect() { val currentRetryTimes = retryTimes.incrementAndGet() - if (currentRetryTimes <= maxRetryTimes) { - scope?.launch(Dispatchers.Main) { + if (currentRetryTimes <= MAX_RETRY_TIMES) { + scope.launch(Dispatchers.Main) { "开始第 $currentRetryTimes 次重连".show(BaseApplication.get()) } - loopGroup.schedule({ connect() }, reconnectDelay, TimeUnit.SECONDS) + //使用协程延迟重连,替代 Netty 的 loopGroup.schedule,防止调度器滥用。 + scope.launch(Dispatchers.Main) { + delay(RECONNECT_DELAY_SECONDS * 1000L) + connect() + } } else { Log.e(kTag, "达到最大重连次数,停止重连") listener.onConnectFailed() @@ -160,13 +174,10 @@ this.needReconnect = needReconnect isRunning.set(false) channel?.close() - scope?.cancel() } fun sendMessage(bytes: ByteArray) { - if (!isRunning.get()) { - return - } + if (!isRunning()) return channel?.writeAndFlush(bytes) } } \ No newline at end of file diff --git a/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt b/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt index 440400d..b2c369f 100644 --- a/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt +++ b/app/src/main/java/com/casic/br/operationsite/view/DeviceControlActivity.kt @@ -107,7 +107,7 @@ } override fun onRightClick() { - + SocketConnectionService.weakReferenceHandler?.sendEmptyMessage(LocaleConstant.CONNECT_TCP_CODE) } }) } diff --git a/app/src/main/res/drawable/ic_connection.xml b/app/src/main/res/drawable/ic_connection.xml new file mode 100644 index 0000000..725c077 --- /dev/null +++ b/app/src/main/res/drawable/ic_connection.xml @@ -0,0 +1,12 @@ + + + +