diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/camera_processor.py b/camera_processor.py new file mode 100644 index 0000000..3dd5277 --- /dev/null +++ b/camera_processor.py @@ -0,0 +1,212 @@ +# camera_processor.py +import cv2 +import threading +import time +import asyncio +import json + +from global_logger import logger + + +class CameraProcessor(threading.Thread): + def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): + super().__init__() + self.camera_config = camera_config + self.model_wrapper = model_wrapper + self.tcp_client = tcp_client + self.http_client = http_client + self.loop = loop # 主线程中运行的事件循环 + self.daemon = True # 主线程退出时自动关闭 + + # 配置参数 + self.cam_id = self.camera_config.get('cam_id') + self.gst_str = self.camera_config.get('gst_str') + self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + + # 摄像头状态 + self.cap = None + self.running = True + self.reconnect_interval = 1 # 重连间隔(秒) + self.failure_threshold = 5 # 连续失败阈值 + + # 抽帧相关参数 + self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 + self.frame_count = 0 + + # 批处理相关参数 + self.batch_size = batch_size # 默认单张处理 + self.frame_buffer = [] + self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 + + self.frame_width = None + self.frame_height = None + self.frame_fps = None + + def _open_camera(self): + """尝试打开摄像头,返回是否成功""" + try: + if self.cap is not None: + self.cap.release() # 确保释放之前的资源 + + logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + + if not self.cap.isOpened(): + logger.error(f"摄像头 {self.cam_id} 打开失败") + return False + + logger.info(f"摄像头 {self.cam_id} 打开成功") + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) + logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") + logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") + logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") + + return True + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") + return False + + def _reconnect_camera(self): + """重新连接摄像头,直到成功""" + while self.running: + logger.info(f"摄像头 {self.cam_id} 正在重连...") + if self._open_camera(): + return True + time.sleep(self.reconnect_interval) + return False + + def _process_frame(self, frame): + """处理摄像头读取到的帧""" + # 使用 YOLO 模型进行检测 + boxes = self.model_wrapper.predict(frame) + for result in boxes: + detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] + for detection in detections: + x1, y1, x2, y2, conf, cls = detection + label = self.model_wrapper.get_label(int(cls)) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = { + "camera": self.cam_id, # 使用cam_id而不是name + "label": label, + "confidence": float(conf), + "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 + } + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(json.dumps(data)), + self.loop + ) + + # 如果有额外HTTP处理逻辑,可在此添加 + + def _process_batch(self, frames, frame_infos): + """批量处理多个帧""" + if not frames: + return + + # 批量推理 + batch_boxes = self.model_wrapper.batch_predict(frames) + + # 处理每个帧的结果 + for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): + frame = frames[idx] + timestamp = frame_info["timestamp"] + for box in boxes: + x1, y1, x2, y2 = box.xyxy.cpu().squeeze() + cls = int(box.cls) + conf = float(box.conf) + + label = self.model_wrapper.get_label(cls) + # print(label) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ + f"{self.frame_width},{self.frame_height},FA" + + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(data), + self.loop + ) + + def _add_to_batch(self, frame): + """将帧添加到批处理缓冲区""" + self.frame_buffer.append(frame) + self.frame_info_buffer.append({ + "timestamp": time.time() + }) + + # 当缓冲区达到批处理大小时处理批次 + if len(self.frame_buffer) >= self.batch_size: + self._process_batch(self.frame_buffer, self.frame_info_buffer) + self.frame_buffer = [] + self.frame_info_buffer = [] + + def run(self): + """摄像头处理主循环""" + logger.info(f"摄像头处理线程 {self.cam_id} 启动") + + # 确保初始化成功 + if not self._open_camera(): + if not self._reconnect_camera(): + logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") + return + + failure_count = 0 # 连续读取失败计数 + + while self.running: + try: + if self.cap is None or not self.cap.isOpened(): + logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") + if not self._reconnect_camera(): + break + failure_count = 0 + continue + + ret, frame = self.cap.read() + + if not ret: + failure_count += 1 + logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") + + # 达到失败阈值,尝试重连摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + else: + time.sleep(0.1) # 短暂等待后继续尝试 + continue + else: + # 读取成功后,重置失败计数 + failure_count = 0 + # 抽帧处理 + self.frame_count += 1 + if self.frame_count % self.frame_interval == 0: + # self._process_frame(frame) + self._add_to_batch(frame) + + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") + failure_count += 1 + + # 发生异常后,可能需要重置摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + time.sleep(0.5) # 异常发生后等待时间更长 + + # 线程结束前释放资源 + if self.cap is not None: + self.cap.release() + + logger.info(f"摄像头处理线程 {self.cam_id} 已退出") diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/camera_processor.py b/camera_processor.py new file mode 100644 index 0000000..3dd5277 --- /dev/null +++ b/camera_processor.py @@ -0,0 +1,212 @@ +# camera_processor.py +import cv2 +import threading +import time +import asyncio +import json + +from global_logger import logger + + +class CameraProcessor(threading.Thread): + def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): + super().__init__() + self.camera_config = camera_config + self.model_wrapper = model_wrapper + self.tcp_client = tcp_client + self.http_client = http_client + self.loop = loop # 主线程中运行的事件循环 + self.daemon = True # 主线程退出时自动关闭 + + # 配置参数 + self.cam_id = self.camera_config.get('cam_id') + self.gst_str = self.camera_config.get('gst_str') + self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + + # 摄像头状态 + self.cap = None + self.running = True + self.reconnect_interval = 1 # 重连间隔(秒) + self.failure_threshold = 5 # 连续失败阈值 + + # 抽帧相关参数 + self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 + self.frame_count = 0 + + # 批处理相关参数 + self.batch_size = batch_size # 默认单张处理 + self.frame_buffer = [] + self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 + + self.frame_width = None + self.frame_height = None + self.frame_fps = None + + def _open_camera(self): + """尝试打开摄像头,返回是否成功""" + try: + if self.cap is not None: + self.cap.release() # 确保释放之前的资源 + + logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + + if not self.cap.isOpened(): + logger.error(f"摄像头 {self.cam_id} 打开失败") + return False + + logger.info(f"摄像头 {self.cam_id} 打开成功") + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) + logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") + logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") + logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") + + return True + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") + return False + + def _reconnect_camera(self): + """重新连接摄像头,直到成功""" + while self.running: + logger.info(f"摄像头 {self.cam_id} 正在重连...") + if self._open_camera(): + return True + time.sleep(self.reconnect_interval) + return False + + def _process_frame(self, frame): + """处理摄像头读取到的帧""" + # 使用 YOLO 模型进行检测 + boxes = self.model_wrapper.predict(frame) + for result in boxes: + detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] + for detection in detections: + x1, y1, x2, y2, conf, cls = detection + label = self.model_wrapper.get_label(int(cls)) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = { + "camera": self.cam_id, # 使用cam_id而不是name + "label": label, + "confidence": float(conf), + "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 + } + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(json.dumps(data)), + self.loop + ) + + # 如果有额外HTTP处理逻辑,可在此添加 + + def _process_batch(self, frames, frame_infos): + """批量处理多个帧""" + if not frames: + return + + # 批量推理 + batch_boxes = self.model_wrapper.batch_predict(frames) + + # 处理每个帧的结果 + for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): + frame = frames[idx] + timestamp = frame_info["timestamp"] + for box in boxes: + x1, y1, x2, y2 = box.xyxy.cpu().squeeze() + cls = int(box.cls) + conf = float(box.conf) + + label = self.model_wrapper.get_label(cls) + # print(label) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ + f"{self.frame_width},{self.frame_height},FA" + + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(data), + self.loop + ) + + def _add_to_batch(self, frame): + """将帧添加到批处理缓冲区""" + self.frame_buffer.append(frame) + self.frame_info_buffer.append({ + "timestamp": time.time() + }) + + # 当缓冲区达到批处理大小时处理批次 + if len(self.frame_buffer) >= self.batch_size: + self._process_batch(self.frame_buffer, self.frame_info_buffer) + self.frame_buffer = [] + self.frame_info_buffer = [] + + def run(self): + """摄像头处理主循环""" + logger.info(f"摄像头处理线程 {self.cam_id} 启动") + + # 确保初始化成功 + if not self._open_camera(): + if not self._reconnect_camera(): + logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") + return + + failure_count = 0 # 连续读取失败计数 + + while self.running: + try: + if self.cap is None or not self.cap.isOpened(): + logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") + if not self._reconnect_camera(): + break + failure_count = 0 + continue + + ret, frame = self.cap.read() + + if not ret: + failure_count += 1 + logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") + + # 达到失败阈值,尝试重连摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + else: + time.sleep(0.1) # 短暂等待后继续尝试 + continue + else: + # 读取成功后,重置失败计数 + failure_count = 0 + # 抽帧处理 + self.frame_count += 1 + if self.frame_count % self.frame_interval == 0: + # self._process_frame(frame) + self._add_to_batch(frame) + + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") + failure_count += 1 + + # 发生异常后,可能需要重置摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + time.sleep(0.5) # 异常发生后等待时间更长 + + # 线程结束前释放资源 + if self.cap is not None: + self.cap.release() + + logger.info(f"摄像头处理线程 {self.cam_id} 已退出") diff --git a/config.py b/config.py new file mode 100644 index 0000000..e02d781 --- /dev/null +++ b/config.py @@ -0,0 +1,72 @@ +# config.py +CAMERAS = [ + # { + # "cam_id": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头" + # }, + # { + # "source": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头" + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头" + }, +] + +TCP_SERVER = { + "host": "127.0.0.1", + "port": 9000 +} + +HTTP_SERVER = { + "url": "http://127.0.0.1:8000/alert", + "timeout": 5 # 超时重试 +} + +MODEL_CLASS = { + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', + 20: '风镐' +} + +MODEL = { + "path": "weights/go-v8s-20250117.pt", + "size": 640, + "class_map": MODEL_CLASS, + "batch_size": 1, +} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/camera_processor.py b/camera_processor.py new file mode 100644 index 0000000..3dd5277 --- /dev/null +++ b/camera_processor.py @@ -0,0 +1,212 @@ +# camera_processor.py +import cv2 +import threading +import time +import asyncio +import json + +from global_logger import logger + + +class CameraProcessor(threading.Thread): + def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): + super().__init__() + self.camera_config = camera_config + self.model_wrapper = model_wrapper + self.tcp_client = tcp_client + self.http_client = http_client + self.loop = loop # 主线程中运行的事件循环 + self.daemon = True # 主线程退出时自动关闭 + + # 配置参数 + self.cam_id = self.camera_config.get('cam_id') + self.gst_str = self.camera_config.get('gst_str') + self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + + # 摄像头状态 + self.cap = None + self.running = True + self.reconnect_interval = 1 # 重连间隔(秒) + self.failure_threshold = 5 # 连续失败阈值 + + # 抽帧相关参数 + self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 + self.frame_count = 0 + + # 批处理相关参数 + self.batch_size = batch_size # 默认单张处理 + self.frame_buffer = [] + self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 + + self.frame_width = None + self.frame_height = None + self.frame_fps = None + + def _open_camera(self): + """尝试打开摄像头,返回是否成功""" + try: + if self.cap is not None: + self.cap.release() # 确保释放之前的资源 + + logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + + if not self.cap.isOpened(): + logger.error(f"摄像头 {self.cam_id} 打开失败") + return False + + logger.info(f"摄像头 {self.cam_id} 打开成功") + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) + logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") + logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") + logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") + + return True + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") + return False + + def _reconnect_camera(self): + """重新连接摄像头,直到成功""" + while self.running: + logger.info(f"摄像头 {self.cam_id} 正在重连...") + if self._open_camera(): + return True + time.sleep(self.reconnect_interval) + return False + + def _process_frame(self, frame): + """处理摄像头读取到的帧""" + # 使用 YOLO 模型进行检测 + boxes = self.model_wrapper.predict(frame) + for result in boxes: + detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] + for detection in detections: + x1, y1, x2, y2, conf, cls = detection + label = self.model_wrapper.get_label(int(cls)) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = { + "camera": self.cam_id, # 使用cam_id而不是name + "label": label, + "confidence": float(conf), + "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 + } + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(json.dumps(data)), + self.loop + ) + + # 如果有额外HTTP处理逻辑,可在此添加 + + def _process_batch(self, frames, frame_infos): + """批量处理多个帧""" + if not frames: + return + + # 批量推理 + batch_boxes = self.model_wrapper.batch_predict(frames) + + # 处理每个帧的结果 + for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): + frame = frames[idx] + timestamp = frame_info["timestamp"] + for box in boxes: + x1, y1, x2, y2 = box.xyxy.cpu().squeeze() + cls = int(box.cls) + conf = float(box.conf) + + label = self.model_wrapper.get_label(cls) + # print(label) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ + f"{self.frame_width},{self.frame_height},FA" + + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(data), + self.loop + ) + + def _add_to_batch(self, frame): + """将帧添加到批处理缓冲区""" + self.frame_buffer.append(frame) + self.frame_info_buffer.append({ + "timestamp": time.time() + }) + + # 当缓冲区达到批处理大小时处理批次 + if len(self.frame_buffer) >= self.batch_size: + self._process_batch(self.frame_buffer, self.frame_info_buffer) + self.frame_buffer = [] + self.frame_info_buffer = [] + + def run(self): + """摄像头处理主循环""" + logger.info(f"摄像头处理线程 {self.cam_id} 启动") + + # 确保初始化成功 + if not self._open_camera(): + if not self._reconnect_camera(): + logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") + return + + failure_count = 0 # 连续读取失败计数 + + while self.running: + try: + if self.cap is None or not self.cap.isOpened(): + logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") + if not self._reconnect_camera(): + break + failure_count = 0 + continue + + ret, frame = self.cap.read() + + if not ret: + failure_count += 1 + logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") + + # 达到失败阈值,尝试重连摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + else: + time.sleep(0.1) # 短暂等待后继续尝试 + continue + else: + # 读取成功后,重置失败计数 + failure_count = 0 + # 抽帧处理 + self.frame_count += 1 + if self.frame_count % self.frame_interval == 0: + # self._process_frame(frame) + self._add_to_batch(frame) + + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") + failure_count += 1 + + # 发生异常后,可能需要重置摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + time.sleep(0.5) # 异常发生后等待时间更长 + + # 线程结束前释放资源 + if self.cap is not None: + self.cap.release() + + logger.info(f"摄像头处理线程 {self.cam_id} 已退出") diff --git a/config.py b/config.py new file mode 100644 index 0000000..e02d781 --- /dev/null +++ b/config.py @@ -0,0 +1,72 @@ +# config.py +CAMERAS = [ + # { + # "cam_id": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头" + # }, + # { + # "source": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头" + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头" + }, +] + +TCP_SERVER = { + "host": "127.0.0.1", + "port": 9000 +} + +HTTP_SERVER = { + "url": "http://127.0.0.1:8000/alert", + "timeout": 5 # 超时重试 +} + +MODEL_CLASS = { + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', + 20: '风镐' +} + +MODEL = { + "path": "weights/go-v8s-20250117.pt", + "size": 640, + "class_map": MODEL_CLASS, + "batch_size": 1, +} \ No newline at end of file diff --git a/global_logger.py b/global_logger.py new file mode 100644 index 0000000..e88e989 --- /dev/null +++ b/global_logger.py @@ -0,0 +1,52 @@ +# logger.py +import logging.handlers +import os +import sys +from logging.handlers import TimedRotatingFileHandler + +# 确保日志目录存在 +log_dir = 'logs' +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# 实例化并导出全局日志记录器 +logger = logging.getLogger("casic_algo_server_logger") +logger.setLevel(logging.DEBUG) # 设置日志级别 + + +# 创建一个TimedRotatingFileHandler +handler = TimedRotatingFileHandler( + os.path.join(log_dir, 'app.log'), # 日志文件名 + when='midnight', # 每天午夜滚动 + interval=1 # 滚动间隔为1天 +) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +# 将handler添加到日志器 +logger.addHandler(handler) + +# 创建控制台处理器 +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + + +# 将标准输出和标准错误重定向到日志记录器 +class StreamToLogger: + def __init__(self, logger, level): + self.logger = logger + self.level = level + + def write(self, message): + if message.strip(): # 忽略空消息 + self.logger.log(self.level, message) + + def flush(self): + pass # 这里不需要实现 + + +# 捕获所有stdout和stderr的内容到日志 +sys.stdout = StreamToLogger(logger, logging.INFO) +sys.stderr = StreamToLogger(logger, logging.ERROR) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/camera_processor.py b/camera_processor.py new file mode 100644 index 0000000..3dd5277 --- /dev/null +++ b/camera_processor.py @@ -0,0 +1,212 @@ +# camera_processor.py +import cv2 +import threading +import time +import asyncio +import json + +from global_logger import logger + + +class CameraProcessor(threading.Thread): + def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): + super().__init__() + self.camera_config = camera_config + self.model_wrapper = model_wrapper + self.tcp_client = tcp_client + self.http_client = http_client + self.loop = loop # 主线程中运行的事件循环 + self.daemon = True # 主线程退出时自动关闭 + + # 配置参数 + self.cam_id = self.camera_config.get('cam_id') + self.gst_str = self.camera_config.get('gst_str') + self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + + # 摄像头状态 + self.cap = None + self.running = True + self.reconnect_interval = 1 # 重连间隔(秒) + self.failure_threshold = 5 # 连续失败阈值 + + # 抽帧相关参数 + self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 + self.frame_count = 0 + + # 批处理相关参数 + self.batch_size = batch_size # 默认单张处理 + self.frame_buffer = [] + self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 + + self.frame_width = None + self.frame_height = None + self.frame_fps = None + + def _open_camera(self): + """尝试打开摄像头,返回是否成功""" + try: + if self.cap is not None: + self.cap.release() # 确保释放之前的资源 + + logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + + if not self.cap.isOpened(): + logger.error(f"摄像头 {self.cam_id} 打开失败") + return False + + logger.info(f"摄像头 {self.cam_id} 打开成功") + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) + logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") + logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") + logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") + + return True + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") + return False + + def _reconnect_camera(self): + """重新连接摄像头,直到成功""" + while self.running: + logger.info(f"摄像头 {self.cam_id} 正在重连...") + if self._open_camera(): + return True + time.sleep(self.reconnect_interval) + return False + + def _process_frame(self, frame): + """处理摄像头读取到的帧""" + # 使用 YOLO 模型进行检测 + boxes = self.model_wrapper.predict(frame) + for result in boxes: + detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] + for detection in detections: + x1, y1, x2, y2, conf, cls = detection + label = self.model_wrapper.get_label(int(cls)) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = { + "camera": self.cam_id, # 使用cam_id而不是name + "label": label, + "confidence": float(conf), + "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 + } + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(json.dumps(data)), + self.loop + ) + + # 如果有额外HTTP处理逻辑,可在此添加 + + def _process_batch(self, frames, frame_infos): + """批量处理多个帧""" + if not frames: + return + + # 批量推理 + batch_boxes = self.model_wrapper.batch_predict(frames) + + # 处理每个帧的结果 + for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): + frame = frames[idx] + timestamp = frame_info["timestamp"] + for box in boxes: + x1, y1, x2, y2 = box.xyxy.cpu().squeeze() + cls = int(box.cls) + conf = float(box.conf) + + label = self.model_wrapper.get_label(cls) + # print(label) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ + f"{self.frame_width},{self.frame_height},FA" + + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(data), + self.loop + ) + + def _add_to_batch(self, frame): + """将帧添加到批处理缓冲区""" + self.frame_buffer.append(frame) + self.frame_info_buffer.append({ + "timestamp": time.time() + }) + + # 当缓冲区达到批处理大小时处理批次 + if len(self.frame_buffer) >= self.batch_size: + self._process_batch(self.frame_buffer, self.frame_info_buffer) + self.frame_buffer = [] + self.frame_info_buffer = [] + + def run(self): + """摄像头处理主循环""" + logger.info(f"摄像头处理线程 {self.cam_id} 启动") + + # 确保初始化成功 + if not self._open_camera(): + if not self._reconnect_camera(): + logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") + return + + failure_count = 0 # 连续读取失败计数 + + while self.running: + try: + if self.cap is None or not self.cap.isOpened(): + logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") + if not self._reconnect_camera(): + break + failure_count = 0 + continue + + ret, frame = self.cap.read() + + if not ret: + failure_count += 1 + logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") + + # 达到失败阈值,尝试重连摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + else: + time.sleep(0.1) # 短暂等待后继续尝试 + continue + else: + # 读取成功后,重置失败计数 + failure_count = 0 + # 抽帧处理 + self.frame_count += 1 + if self.frame_count % self.frame_interval == 0: + # self._process_frame(frame) + self._add_to_batch(frame) + + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") + failure_count += 1 + + # 发生异常后,可能需要重置摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + time.sleep(0.5) # 异常发生后等待时间更长 + + # 线程结束前释放资源 + if self.cap is not None: + self.cap.release() + + logger.info(f"摄像头处理线程 {self.cam_id} 已退出") diff --git a/config.py b/config.py new file mode 100644 index 0000000..e02d781 --- /dev/null +++ b/config.py @@ -0,0 +1,72 @@ +# config.py +CAMERAS = [ + # { + # "cam_id": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头" + # }, + # { + # "source": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头" + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头" + }, +] + +TCP_SERVER = { + "host": "127.0.0.1", + "port": 9000 +} + +HTTP_SERVER = { + "url": "http://127.0.0.1:8000/alert", + "timeout": 5 # 超时重试 +} + +MODEL_CLASS = { + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', + 20: '风镐' +} + +MODEL = { + "path": "weights/go-v8s-20250117.pt", + "size": 640, + "class_map": MODEL_CLASS, + "batch_size": 1, +} \ No newline at end of file diff --git a/global_logger.py b/global_logger.py new file mode 100644 index 0000000..e88e989 --- /dev/null +++ b/global_logger.py @@ -0,0 +1,52 @@ +# logger.py +import logging.handlers +import os +import sys +from logging.handlers import TimedRotatingFileHandler + +# 确保日志目录存在 +log_dir = 'logs' +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# 实例化并导出全局日志记录器 +logger = logging.getLogger("casic_algo_server_logger") +logger.setLevel(logging.DEBUG) # 设置日志级别 + + +# 创建一个TimedRotatingFileHandler +handler = TimedRotatingFileHandler( + os.path.join(log_dir, 'app.log'), # 日志文件名 + when='midnight', # 每天午夜滚动 + interval=1 # 滚动间隔为1天 +) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +# 将handler添加到日志器 +logger.addHandler(handler) + +# 创建控制台处理器 +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + + +# 将标准输出和标准错误重定向到日志记录器 +class StreamToLogger: + def __init__(self, logger, level): + self.logger = logger + self.level = level + + def write(self, message): + if message.strip(): # 忽略空消息 + self.logger.log(self.level, message) + + def flush(self): + pass # 这里不需要实现 + + +# 捕获所有stdout和stderr的内容到日志 +sys.stdout = StreamToLogger(logger, logging.INFO) +sys.stderr = StreamToLogger(logger, logging.ERROR) diff --git a/http_client.py b/http_client.py new file mode 100644 index 0000000..39e9e11 --- /dev/null +++ b/http_client.py @@ -0,0 +1,24 @@ +import aiohttp +import asyncio + +from global_logger import logger + + +class AsyncHTTPClient: + def __init__(self, url, timeout): + self.url = url + self.timeout = timeout + self.queue = asyncio.Queue() + + async def send_loop(self): + async with aiohttp.ClientSession() as session: + while True: + data = await self.queue.get() + try: + async with session.post(self.url, json=data, timeout=self.timeout) as response: + logger.info(f"HTTP 响应: {await response.text()}") + except Exception as e: + logger.exception(f"HTTP 发送失败: {e}, 数据: {data}") + + async def send(self, data: dict): + await self.queue.put(data) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/camera_processor.py b/camera_processor.py new file mode 100644 index 0000000..3dd5277 --- /dev/null +++ b/camera_processor.py @@ -0,0 +1,212 @@ +# camera_processor.py +import cv2 +import threading +import time +import asyncio +import json + +from global_logger import logger + + +class CameraProcessor(threading.Thread): + def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): + super().__init__() + self.camera_config = camera_config + self.model_wrapper = model_wrapper + self.tcp_client = tcp_client + self.http_client = http_client + self.loop = loop # 主线程中运行的事件循环 + self.daemon = True # 主线程退出时自动关闭 + + # 配置参数 + self.cam_id = self.camera_config.get('cam_id') + self.gst_str = self.camera_config.get('gst_str') + self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + + # 摄像头状态 + self.cap = None + self.running = True + self.reconnect_interval = 1 # 重连间隔(秒) + self.failure_threshold = 5 # 连续失败阈值 + + # 抽帧相关参数 + self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 + self.frame_count = 0 + + # 批处理相关参数 + self.batch_size = batch_size # 默认单张处理 + self.frame_buffer = [] + self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 + + self.frame_width = None + self.frame_height = None + self.frame_fps = None + + def _open_camera(self): + """尝试打开摄像头,返回是否成功""" + try: + if self.cap is not None: + self.cap.release() # 确保释放之前的资源 + + logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + + if not self.cap.isOpened(): + logger.error(f"摄像头 {self.cam_id} 打开失败") + return False + + logger.info(f"摄像头 {self.cam_id} 打开成功") + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) + logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") + logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") + logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") + + return True + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") + return False + + def _reconnect_camera(self): + """重新连接摄像头,直到成功""" + while self.running: + logger.info(f"摄像头 {self.cam_id} 正在重连...") + if self._open_camera(): + return True + time.sleep(self.reconnect_interval) + return False + + def _process_frame(self, frame): + """处理摄像头读取到的帧""" + # 使用 YOLO 模型进行检测 + boxes = self.model_wrapper.predict(frame) + for result in boxes: + detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] + for detection in detections: + x1, y1, x2, y2, conf, cls = detection + label = self.model_wrapper.get_label(int(cls)) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = { + "camera": self.cam_id, # 使用cam_id而不是name + "label": label, + "confidence": float(conf), + "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 + } + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(json.dumps(data)), + self.loop + ) + + # 如果有额外HTTP处理逻辑,可在此添加 + + def _process_batch(self, frames, frame_infos): + """批量处理多个帧""" + if not frames: + return + + # 批量推理 + batch_boxes = self.model_wrapper.batch_predict(frames) + + # 处理每个帧的结果 + for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): + frame = frames[idx] + timestamp = frame_info["timestamp"] + for box in boxes: + x1, y1, x2, y2 = box.xyxy.cpu().squeeze() + cls = int(box.cls) + conf = float(box.conf) + + label = self.model_wrapper.get_label(cls) + # print(label) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ + f"{self.frame_width},{self.frame_height},FA" + + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(data), + self.loop + ) + + def _add_to_batch(self, frame): + """将帧添加到批处理缓冲区""" + self.frame_buffer.append(frame) + self.frame_info_buffer.append({ + "timestamp": time.time() + }) + + # 当缓冲区达到批处理大小时处理批次 + if len(self.frame_buffer) >= self.batch_size: + self._process_batch(self.frame_buffer, self.frame_info_buffer) + self.frame_buffer = [] + self.frame_info_buffer = [] + + def run(self): + """摄像头处理主循环""" + logger.info(f"摄像头处理线程 {self.cam_id} 启动") + + # 确保初始化成功 + if not self._open_camera(): + if not self._reconnect_camera(): + logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") + return + + failure_count = 0 # 连续读取失败计数 + + while self.running: + try: + if self.cap is None or not self.cap.isOpened(): + logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") + if not self._reconnect_camera(): + break + failure_count = 0 + continue + + ret, frame = self.cap.read() + + if not ret: + failure_count += 1 + logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") + + # 达到失败阈值,尝试重连摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + else: + time.sleep(0.1) # 短暂等待后继续尝试 + continue + else: + # 读取成功后,重置失败计数 + failure_count = 0 + # 抽帧处理 + self.frame_count += 1 + if self.frame_count % self.frame_interval == 0: + # self._process_frame(frame) + self._add_to_batch(frame) + + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") + failure_count += 1 + + # 发生异常后,可能需要重置摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + time.sleep(0.5) # 异常发生后等待时间更长 + + # 线程结束前释放资源 + if self.cap is not None: + self.cap.release() + + logger.info(f"摄像头处理线程 {self.cam_id} 已退出") diff --git a/config.py b/config.py new file mode 100644 index 0000000..e02d781 --- /dev/null +++ b/config.py @@ -0,0 +1,72 @@ +# config.py +CAMERAS = [ + # { + # "cam_id": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头" + # }, + # { + # "source": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头" + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头" + }, +] + +TCP_SERVER = { + "host": "127.0.0.1", + "port": 9000 +} + +HTTP_SERVER = { + "url": "http://127.0.0.1:8000/alert", + "timeout": 5 # 超时重试 +} + +MODEL_CLASS = { + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', + 20: '风镐' +} + +MODEL = { + "path": "weights/go-v8s-20250117.pt", + "size": 640, + "class_map": MODEL_CLASS, + "batch_size": 1, +} \ No newline at end of file diff --git a/global_logger.py b/global_logger.py new file mode 100644 index 0000000..e88e989 --- /dev/null +++ b/global_logger.py @@ -0,0 +1,52 @@ +# logger.py +import logging.handlers +import os +import sys +from logging.handlers import TimedRotatingFileHandler + +# 确保日志目录存在 +log_dir = 'logs' +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# 实例化并导出全局日志记录器 +logger = logging.getLogger("casic_algo_server_logger") +logger.setLevel(logging.DEBUG) # 设置日志级别 + + +# 创建一个TimedRotatingFileHandler +handler = TimedRotatingFileHandler( + os.path.join(log_dir, 'app.log'), # 日志文件名 + when='midnight', # 每天午夜滚动 + interval=1 # 滚动间隔为1天 +) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +# 将handler添加到日志器 +logger.addHandler(handler) + +# 创建控制台处理器 +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + + +# 将标准输出和标准错误重定向到日志记录器 +class StreamToLogger: + def __init__(self, logger, level): + self.logger = logger + self.level = level + + def write(self, message): + if message.strip(): # 忽略空消息 + self.logger.log(self.level, message) + + def flush(self): + pass # 这里不需要实现 + + +# 捕获所有stdout和stderr的内容到日志 +sys.stdout = StreamToLogger(logger, logging.INFO) +sys.stderr = StreamToLogger(logger, logging.ERROR) diff --git a/http_client.py b/http_client.py new file mode 100644 index 0000000..39e9e11 --- /dev/null +++ b/http_client.py @@ -0,0 +1,24 @@ +import aiohttp +import asyncio + +from global_logger import logger + + +class AsyncHTTPClient: + def __init__(self, url, timeout): + self.url = url + self.timeout = timeout + self.queue = asyncio.Queue() + + async def send_loop(self): + async with aiohttp.ClientSession() as session: + while True: + data = await self.queue.get() + try: + async with session.post(self.url, json=data, timeout=self.timeout) as response: + logger.info(f"HTTP 响应: {await response.text()}") + except Exception as e: + logger.exception(f"HTTP 发送失败: {e}, 数据: {data}") + + async def send(self, data: dict): + await self.queue.put(data) diff --git a/model_wrapper.py b/model_wrapper.py new file mode 100644 index 0000000..ecc9c01 --- /dev/null +++ b/model_wrapper.py @@ -0,0 +1,54 @@ +from ultralytics import YOLO +import numpy as np + +from global_logger import logger + + +class ModelWrapper: + def __init__(self, model_path, model_size=640, model_names=None, model_warm_up=5, batch_size=1): + self.model_path = model_path + self.model_size = model_size + self.model_names = model_names + self.model_warm_up = model_warm_up + self.batch_size = batch_size + + logger.info(f'start load model {self.model_path}...') + self.model = YOLO(model_path) + self.__warm_up__() + logger.info(f'load model {self.model_path} success!') + + def __warm_up__(self): + if self.model_warm_up > 0: + logger.info(f'warming up model {self.model_path}') + imgsz = self.model_size + if not isinstance(imgsz, list): + imgsz = [imgsz, imgsz] + dummy_input = np.zeros((imgsz[0], imgsz[1], 3)) + dummy_inputs = [] + for _ in range(self.batch_size): + dummy_inputs.append(dummy_input) + for i in range(self.model_warm_up): + self.model.predict(source=dummy_inputs, imgsz=imgsz, verbose=False, save=False, save_txt=False) + logger.info(f'warm up model {self.model_path} success!') + + def predict(self, frame): + results_generator = self.model.predict(source=frame, imgsz=self.model_size, + save_txt=False, + save=False, + verbose=False, stream=True) + results = list(results_generator) + return results[0].boxes if len(results) > 0 else [] + + def batch_predict(self, frames): + results_generator = self.model.predict(source=frames, imgsz=self.model_size, + save_txt=False, + save=False, + verbose=False, stream=True) + result_boxes = [] + for r in results_generator: + result_boxes.append(r.boxes) + + return result_boxes + + def get_label(self, cls): + return self.model_names.get(cls, str(cls)) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/camera_processor.py b/camera_processor.py new file mode 100644 index 0000000..3dd5277 --- /dev/null +++ b/camera_processor.py @@ -0,0 +1,212 @@ +# camera_processor.py +import cv2 +import threading +import time +import asyncio +import json + +from global_logger import logger + + +class CameraProcessor(threading.Thread): + def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): + super().__init__() + self.camera_config = camera_config + self.model_wrapper = model_wrapper + self.tcp_client = tcp_client + self.http_client = http_client + self.loop = loop # 主线程中运行的事件循环 + self.daemon = True # 主线程退出时自动关闭 + + # 配置参数 + self.cam_id = self.camera_config.get('cam_id') + self.gst_str = self.camera_config.get('gst_str') + self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + + # 摄像头状态 + self.cap = None + self.running = True + self.reconnect_interval = 1 # 重连间隔(秒) + self.failure_threshold = 5 # 连续失败阈值 + + # 抽帧相关参数 + self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 + self.frame_count = 0 + + # 批处理相关参数 + self.batch_size = batch_size # 默认单张处理 + self.frame_buffer = [] + self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 + + self.frame_width = None + self.frame_height = None + self.frame_fps = None + + def _open_camera(self): + """尝试打开摄像头,返回是否成功""" + try: + if self.cap is not None: + self.cap.release() # 确保释放之前的资源 + + logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + + if not self.cap.isOpened(): + logger.error(f"摄像头 {self.cam_id} 打开失败") + return False + + logger.info(f"摄像头 {self.cam_id} 打开成功") + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) + logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") + logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") + logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") + + return True + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") + return False + + def _reconnect_camera(self): + """重新连接摄像头,直到成功""" + while self.running: + logger.info(f"摄像头 {self.cam_id} 正在重连...") + if self._open_camera(): + return True + time.sleep(self.reconnect_interval) + return False + + def _process_frame(self, frame): + """处理摄像头读取到的帧""" + # 使用 YOLO 模型进行检测 + boxes = self.model_wrapper.predict(frame) + for result in boxes: + detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] + for detection in detections: + x1, y1, x2, y2, conf, cls = detection + label = self.model_wrapper.get_label(int(cls)) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = { + "camera": self.cam_id, # 使用cam_id而不是name + "label": label, + "confidence": float(conf), + "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 + } + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(json.dumps(data)), + self.loop + ) + + # 如果有额外HTTP处理逻辑,可在此添加 + + def _process_batch(self, frames, frame_infos): + """批量处理多个帧""" + if not frames: + return + + # 批量推理 + batch_boxes = self.model_wrapper.batch_predict(frames) + + # 处理每个帧的结果 + for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): + frame = frames[idx] + timestamp = frame_info["timestamp"] + for box in boxes: + x1, y1, x2, y2 = box.xyxy.cpu().squeeze() + cls = int(box.cls) + conf = float(box.conf) + + label = self.model_wrapper.get_label(cls) + # print(label) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ + f"{self.frame_width},{self.frame_height},FA" + + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(data), + self.loop + ) + + def _add_to_batch(self, frame): + """将帧添加到批处理缓冲区""" + self.frame_buffer.append(frame) + self.frame_info_buffer.append({ + "timestamp": time.time() + }) + + # 当缓冲区达到批处理大小时处理批次 + if len(self.frame_buffer) >= self.batch_size: + self._process_batch(self.frame_buffer, self.frame_info_buffer) + self.frame_buffer = [] + self.frame_info_buffer = [] + + def run(self): + """摄像头处理主循环""" + logger.info(f"摄像头处理线程 {self.cam_id} 启动") + + # 确保初始化成功 + if not self._open_camera(): + if not self._reconnect_camera(): + logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") + return + + failure_count = 0 # 连续读取失败计数 + + while self.running: + try: + if self.cap is None or not self.cap.isOpened(): + logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") + if not self._reconnect_camera(): + break + failure_count = 0 + continue + + ret, frame = self.cap.read() + + if not ret: + failure_count += 1 + logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") + + # 达到失败阈值,尝试重连摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + else: + time.sleep(0.1) # 短暂等待后继续尝试 + continue + else: + # 读取成功后,重置失败计数 + failure_count = 0 + # 抽帧处理 + self.frame_count += 1 + if self.frame_count % self.frame_interval == 0: + # self._process_frame(frame) + self._add_to_batch(frame) + + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") + failure_count += 1 + + # 发生异常后,可能需要重置摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + time.sleep(0.5) # 异常发生后等待时间更长 + + # 线程结束前释放资源 + if self.cap is not None: + self.cap.release() + + logger.info(f"摄像头处理线程 {self.cam_id} 已退出") diff --git a/config.py b/config.py new file mode 100644 index 0000000..e02d781 --- /dev/null +++ b/config.py @@ -0,0 +1,72 @@ +# config.py +CAMERAS = [ + # { + # "cam_id": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头" + # }, + # { + # "source": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头" + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头" + }, +] + +TCP_SERVER = { + "host": "127.0.0.1", + "port": 9000 +} + +HTTP_SERVER = { + "url": "http://127.0.0.1:8000/alert", + "timeout": 5 # 超时重试 +} + +MODEL_CLASS = { + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', + 20: '风镐' +} + +MODEL = { + "path": "weights/go-v8s-20250117.pt", + "size": 640, + "class_map": MODEL_CLASS, + "batch_size": 1, +} \ No newline at end of file diff --git a/global_logger.py b/global_logger.py new file mode 100644 index 0000000..e88e989 --- /dev/null +++ b/global_logger.py @@ -0,0 +1,52 @@ +# logger.py +import logging.handlers +import os +import sys +from logging.handlers import TimedRotatingFileHandler + +# 确保日志目录存在 +log_dir = 'logs' +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# 实例化并导出全局日志记录器 +logger = logging.getLogger("casic_algo_server_logger") +logger.setLevel(logging.DEBUG) # 设置日志级别 + + +# 创建一个TimedRotatingFileHandler +handler = TimedRotatingFileHandler( + os.path.join(log_dir, 'app.log'), # 日志文件名 + when='midnight', # 每天午夜滚动 + interval=1 # 滚动间隔为1天 +) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +# 将handler添加到日志器 +logger.addHandler(handler) + +# 创建控制台处理器 +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + + +# 将标准输出和标准错误重定向到日志记录器 +class StreamToLogger: + def __init__(self, logger, level): + self.logger = logger + self.level = level + + def write(self, message): + if message.strip(): # 忽略空消息 + self.logger.log(self.level, message) + + def flush(self): + pass # 这里不需要实现 + + +# 捕获所有stdout和stderr的内容到日志 +sys.stdout = StreamToLogger(logger, logging.INFO) +sys.stderr = StreamToLogger(logger, logging.ERROR) diff --git a/http_client.py b/http_client.py new file mode 100644 index 0000000..39e9e11 --- /dev/null +++ b/http_client.py @@ -0,0 +1,24 @@ +import aiohttp +import asyncio + +from global_logger import logger + + +class AsyncHTTPClient: + def __init__(self, url, timeout): + self.url = url + self.timeout = timeout + self.queue = asyncio.Queue() + + async def send_loop(self): + async with aiohttp.ClientSession() as session: + while True: + data = await self.queue.get() + try: + async with session.post(self.url, json=data, timeout=self.timeout) as response: + logger.info(f"HTTP 响应: {await response.text()}") + except Exception as e: + logger.exception(f"HTTP 发送失败: {e}, 数据: {data}") + + async def send(self, data: dict): + await self.queue.put(data) diff --git a/model_wrapper.py b/model_wrapper.py new file mode 100644 index 0000000..ecc9c01 --- /dev/null +++ b/model_wrapper.py @@ -0,0 +1,54 @@ +from ultralytics import YOLO +import numpy as np + +from global_logger import logger + + +class ModelWrapper: + def __init__(self, model_path, model_size=640, model_names=None, model_warm_up=5, batch_size=1): + self.model_path = model_path + self.model_size = model_size + self.model_names = model_names + self.model_warm_up = model_warm_up + self.batch_size = batch_size + + logger.info(f'start load model {self.model_path}...') + self.model = YOLO(model_path) + self.__warm_up__() + logger.info(f'load model {self.model_path} success!') + + def __warm_up__(self): + if self.model_warm_up > 0: + logger.info(f'warming up model {self.model_path}') + imgsz = self.model_size + if not isinstance(imgsz, list): + imgsz = [imgsz, imgsz] + dummy_input = np.zeros((imgsz[0], imgsz[1], 3)) + dummy_inputs = [] + for _ in range(self.batch_size): + dummy_inputs.append(dummy_input) + for i in range(self.model_warm_up): + self.model.predict(source=dummy_inputs, imgsz=imgsz, verbose=False, save=False, save_txt=False) + logger.info(f'warm up model {self.model_path} success!') + + def predict(self, frame): + results_generator = self.model.predict(source=frame, imgsz=self.model_size, + save_txt=False, + save=False, + verbose=False, stream=True) + results = list(results_generator) + return results[0].boxes if len(results) > 0 else [] + + def batch_predict(self, frames): + results_generator = self.model.predict(source=frames, imgsz=self.model_size, + save_txt=False, + save=False, + verbose=False, stream=True) + result_boxes = [] + for r in results_generator: + result_boxes.append(r.boxes) + + return result_boxes + + def get_label(self, cls): + return self.model_names.get(cls, str(cls)) diff --git a/tcp_client.py b/tcp_client.py new file mode 100644 index 0000000..ffff57c --- /dev/null +++ b/tcp_client.py @@ -0,0 +1,162 @@ +import threading +import socket +import queue +import time +import asyncio + +from global_logger import logger + + +class AsyncTCPClient: + def __init__(self, server_ip, server_port): + self.server_ip = server_ip + self.server_port = server_port + self.writer = None + self.queue = asyncio.Queue() + self.connected = False + self.running = True + + async def connect(self): + # 先关闭现有的writer + if self.writer is not None: + try: + self.writer.close() + await self.writer.wait_closed() + except Exception as e: + logger.exception(f"关闭现有连接时出错: {e}") + self.writer = None + + while self.running: + try: + logger.info(f"连接 TCP 服务器 {self.server_ip}:{self.server_port}") + _, self.writer = await asyncio.open_connection(self.server_ip, self.server_port) + self.connected = True + logger.info("TCP 连接成功") + return + except Exception as e: + logger.exception(f"TCP 连接失败: {e}, 5 秒后重试") + self.connected = False + await asyncio.sleep(5) + + async def send_loop(self): + while self.running: + if not self.connected: + logger.info(f"TCP 服务器 {self.server_ip}:{self.server_port}连接断开,重新连接") + await self.connect() + try: + message, msg_time, expire_second = await self.queue.get() + # 检查消息是否已过期 + current_time = time.time() + if current_time - msg_time > expire_second: + logger.info(f"丢弃过期消息: {message}, 已过期 {current_time - msg_time:.1f} 秒") + continue + + self.writer.write(message.encode('utf-8')) + await self.writer.drain() + logger.info(f"TCP 发送数据: {message}") + except Exception as e: + logger.exception(f"TCP 发送失败: {e}") + self.connected = False + await asyncio.sleep(5) + + async def send(self, message: str, expire_second=60): + """向队列添加消息,带过期时间 + Args: + message: 要发送的消息 + expire_second: 消息过期时间(秒),默认60秒 + """ + logger.debug(f"添加消息到TCP队列: {message}") + await self.queue.put((message, time.time(), expire_second)) + + +class TCPClient: + """TCP客户端管理""" + + def __init__(self, server_ip, server_port): + self.server_ip = server_ip + self.server_port = server_port + self.sock = None + self.queue = queue.Queue() + self.lock = threading.Lock() + self.connected = False + self.running = True + self.send_thread = None + + def connect(self): + """建立TCP连接""" + while self.running: + try: + logger.info(f"连接 TCP 服务器 {self.server_ip}:{self.server_port}") + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.server_ip, self.server_port)) + self.connected = True + logger.info("TCP 连接成功") + return True + except Exception as e: + logger.exception(f"TCP 连接失败: {e}, 5 秒后重试") + self.connected = False + time.sleep(5) + return False + + def send_data(self, data: str): + """线程安全的数据发送""" + with self.lock: + if not self.connected: + if not self.connect(): + return False + + try: + self.sock.sendall(data.encode('utf-8')) + logger.info(f"TCP 发送数据: {data}") + return True + except Exception as e: + logger.exception(f"TCP 发送失败: {e}") + self.connected = False + self.sock.close() + self.sock = None + return False + + def send_loop(self): + """发送循环处理队列数据""" + while self.running: + try: + if not self.connected: + logger.info(f"TCP 服务器 {self.server_ip}:{self.server_port}连接断开,重新连接") + self.connect() + + message, msg_time, expire_second = self.queue.get(timeout=1) + # 检查消息是否已过期 + current_time = time.time() + if current_time - msg_time > expire_second: + logger.info(f"丢弃过期消息: {message}, 已过期 {current_time - msg_time:.1f} 秒") + continue + + if not self.send_data(message): + self.queue.put((message, msg_time, expire_second)) # 重新放回队列? + time.sleep(5) + except queue.Empty: + continue + except Exception as e: + logger.exception(f"TCP 工作线程错误: {e}") + + def start(self): + """启动发送线程""" + self.send_thread = threading.Thread(target=self.send_loop, daemon=True) + self.send_thread.start() + return self + + def send(self, message: str, expire_second=60): + """向队列添加消息,带过期时间 + + Args: + message: 要发送的消息 + expire_second: 消息过期时间(秒),默认60秒 + """ + logger.debug(f"添加消息到TCP队列: {message}") + self.queue.put((message, time.time(), expire_second)) + + def stop(self): + """停止服务""" + self.running = False + if self.sock: + self.sock.close() diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd24eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +logs +test* \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e32cd3b --- /dev/null +++ b/app.py @@ -0,0 +1,34 @@ +# main.py +import asyncio +from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL +from tcp_client import AsyncTCPClient +from http_client import AsyncHTTPClient +from model_wrapper import ModelWrapper +from camera_processor import CameraProcessor +from global_logger import logger + + +async def main(): + logger.info("开始启动算法分析服务") + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 + tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) + http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + + # 启动 TCP 和 HTTP 的发送任务 + asyncio.create_task(tcp_client.send_loop()) + asyncio.create_task(http_client.send_loop()) + + model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) + + # 为每个摄像头启动一个处理线程,并传入事件循环 + for camera_config in CAMERAS: + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + MODEL["batch_size"]) + camera_thread.start() + + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/camera_processor.py b/camera_processor.py new file mode 100644 index 0000000..3dd5277 --- /dev/null +++ b/camera_processor.py @@ -0,0 +1,212 @@ +# camera_processor.py +import cv2 +import threading +import time +import asyncio +import json + +from global_logger import logger + + +class CameraProcessor(threading.Thread): + def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): + super().__init__() + self.camera_config = camera_config + self.model_wrapper = model_wrapper + self.tcp_client = tcp_client + self.http_client = http_client + self.loop = loop # 主线程中运行的事件循环 + self.daemon = True # 主线程退出时自动关闭 + + # 配置参数 + self.cam_id = self.camera_config.get('cam_id') + self.gst_str = self.camera_config.get('gst_str') + self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + + # 摄像头状态 + self.cap = None + self.running = True + self.reconnect_interval = 1 # 重连间隔(秒) + self.failure_threshold = 5 # 连续失败阈值 + + # 抽帧相关参数 + self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 + self.frame_count = 0 + + # 批处理相关参数 + self.batch_size = batch_size # 默认单张处理 + self.frame_buffer = [] + self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 + + self.frame_width = None + self.frame_height = None + self.frame_fps = None + + def _open_camera(self): + """尝试打开摄像头,返回是否成功""" + try: + if self.cap is not None: + self.cap.release() # 确保释放之前的资源 + + logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + + if not self.cap.isOpened(): + logger.error(f"摄像头 {self.cam_id} 打开失败") + return False + + logger.info(f"摄像头 {self.cam_id} 打开成功") + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) + logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") + logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") + logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") + + return True + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") + return False + + def _reconnect_camera(self): + """重新连接摄像头,直到成功""" + while self.running: + logger.info(f"摄像头 {self.cam_id} 正在重连...") + if self._open_camera(): + return True + time.sleep(self.reconnect_interval) + return False + + def _process_frame(self, frame): + """处理摄像头读取到的帧""" + # 使用 YOLO 模型进行检测 + boxes = self.model_wrapper.predict(frame) + for result in boxes: + detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] + for detection in detections: + x1, y1, x2, y2, conf, cls = detection + label = self.model_wrapper.get_label(int(cls)) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = { + "camera": self.cam_id, # 使用cam_id而不是name + "label": label, + "confidence": float(conf), + "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 + } + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(json.dumps(data)), + self.loop + ) + + # 如果有额外HTTP处理逻辑,可在此添加 + + def _process_batch(self, frames, frame_infos): + """批量处理多个帧""" + if not frames: + return + + # 批量推理 + batch_boxes = self.model_wrapper.batch_predict(frames) + + # 处理每个帧的结果 + for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): + frame = frames[idx] + timestamp = frame_info["timestamp"] + for box in boxes: + x1, y1, x2, y2 = box.xyxy.cpu().squeeze() + cls = int(box.cls) + conf = float(box.conf) + + label = self.model_wrapper.get_label(cls) + # print(label) + + # 只处理需要通过TCP发送的目标类别 + if label in self.tcp_send_cls: + data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ + f"{self.frame_width},{self.frame_height},FA" + + # 使用主线程的事件循环将 TCP 消息异步发送出去 + asyncio.run_coroutine_threadsafe( + self.tcp_client.send(data), + self.loop + ) + + def _add_to_batch(self, frame): + """将帧添加到批处理缓冲区""" + self.frame_buffer.append(frame) + self.frame_info_buffer.append({ + "timestamp": time.time() + }) + + # 当缓冲区达到批处理大小时处理批次 + if len(self.frame_buffer) >= self.batch_size: + self._process_batch(self.frame_buffer, self.frame_info_buffer) + self.frame_buffer = [] + self.frame_info_buffer = [] + + def run(self): + """摄像头处理主循环""" + logger.info(f"摄像头处理线程 {self.cam_id} 启动") + + # 确保初始化成功 + if not self._open_camera(): + if not self._reconnect_camera(): + logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") + return + + failure_count = 0 # 连续读取失败计数 + + while self.running: + try: + if self.cap is None or not self.cap.isOpened(): + logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") + if not self._reconnect_camera(): + break + failure_count = 0 + continue + + ret, frame = self.cap.read() + + if not ret: + failure_count += 1 + logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") + + # 达到失败阈值,尝试重连摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + else: + time.sleep(0.1) # 短暂等待后继续尝试 + continue + else: + # 读取成功后,重置失败计数 + failure_count = 0 + # 抽帧处理 + self.frame_count += 1 + if self.frame_count % self.frame_interval == 0: + # self._process_frame(frame) + self._add_to_batch(frame) + + except Exception as e: + logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") + failure_count += 1 + + # 发生异常后,可能需要重置摄像头 + if failure_count >= self.failure_threshold: + logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") + if not self._reconnect_camera(): + break + failure_count = 0 + time.sleep(0.5) # 异常发生后等待时间更长 + + # 线程结束前释放资源 + if self.cap is not None: + self.cap.release() + + logger.info(f"摄像头处理线程 {self.cam_id} 已退出") diff --git a/config.py b/config.py new file mode 100644 index 0000000..e02d781 --- /dev/null +++ b/config.py @@ -0,0 +1,72 @@ +# config.py +CAMERAS = [ + # { + # "cam_id": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头" + # }, + # { + # "source": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头" + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头" + }, +] + +TCP_SERVER = { + "host": "127.0.0.1", + "port": 9000 +} + +HTTP_SERVER = { + "url": "http://127.0.0.1:8000/alert", + "timeout": 5 # 超时重试 +} + +MODEL_CLASS = { + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', + 20: '风镐' +} + +MODEL = { + "path": "weights/go-v8s-20250117.pt", + "size": 640, + "class_map": MODEL_CLASS, + "batch_size": 1, +} \ No newline at end of file diff --git a/global_logger.py b/global_logger.py new file mode 100644 index 0000000..e88e989 --- /dev/null +++ b/global_logger.py @@ -0,0 +1,52 @@ +# logger.py +import logging.handlers +import os +import sys +from logging.handlers import TimedRotatingFileHandler + +# 确保日志目录存在 +log_dir = 'logs' +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# 实例化并导出全局日志记录器 +logger = logging.getLogger("casic_algo_server_logger") +logger.setLevel(logging.DEBUG) # 设置日志级别 + + +# 创建一个TimedRotatingFileHandler +handler = TimedRotatingFileHandler( + os.path.join(log_dir, 'app.log'), # 日志文件名 + when='midnight', # 每天午夜滚动 + interval=1 # 滚动间隔为1天 +) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +# 将handler添加到日志器 +logger.addHandler(handler) + +# 创建控制台处理器 +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + + +# 将标准输出和标准错误重定向到日志记录器 +class StreamToLogger: + def __init__(self, logger, level): + self.logger = logger + self.level = level + + def write(self, message): + if message.strip(): # 忽略空消息 + self.logger.log(self.level, message) + + def flush(self): + pass # 这里不需要实现 + + +# 捕获所有stdout和stderr的内容到日志 +sys.stdout = StreamToLogger(logger, logging.INFO) +sys.stderr = StreamToLogger(logger, logging.ERROR) diff --git a/http_client.py b/http_client.py new file mode 100644 index 0000000..39e9e11 --- /dev/null +++ b/http_client.py @@ -0,0 +1,24 @@ +import aiohttp +import asyncio + +from global_logger import logger + + +class AsyncHTTPClient: + def __init__(self, url, timeout): + self.url = url + self.timeout = timeout + self.queue = asyncio.Queue() + + async def send_loop(self): + async with aiohttp.ClientSession() as session: + while True: + data = await self.queue.get() + try: + async with session.post(self.url, json=data, timeout=self.timeout) as response: + logger.info(f"HTTP 响应: {await response.text()}") + except Exception as e: + logger.exception(f"HTTP 发送失败: {e}, 数据: {data}") + + async def send(self, data: dict): + await self.queue.put(data) diff --git a/model_wrapper.py b/model_wrapper.py new file mode 100644 index 0000000..ecc9c01 --- /dev/null +++ b/model_wrapper.py @@ -0,0 +1,54 @@ +from ultralytics import YOLO +import numpy as np + +from global_logger import logger + + +class ModelWrapper: + def __init__(self, model_path, model_size=640, model_names=None, model_warm_up=5, batch_size=1): + self.model_path = model_path + self.model_size = model_size + self.model_names = model_names + self.model_warm_up = model_warm_up + self.batch_size = batch_size + + logger.info(f'start load model {self.model_path}...') + self.model = YOLO(model_path) + self.__warm_up__() + logger.info(f'load model {self.model_path} success!') + + def __warm_up__(self): + if self.model_warm_up > 0: + logger.info(f'warming up model {self.model_path}') + imgsz = self.model_size + if not isinstance(imgsz, list): + imgsz = [imgsz, imgsz] + dummy_input = np.zeros((imgsz[0], imgsz[1], 3)) + dummy_inputs = [] + for _ in range(self.batch_size): + dummy_inputs.append(dummy_input) + for i in range(self.model_warm_up): + self.model.predict(source=dummy_inputs, imgsz=imgsz, verbose=False, save=False, save_txt=False) + logger.info(f'warm up model {self.model_path} success!') + + def predict(self, frame): + results_generator = self.model.predict(source=frame, imgsz=self.model_size, + save_txt=False, + save=False, + verbose=False, stream=True) + results = list(results_generator) + return results[0].boxes if len(results) > 0 else [] + + def batch_predict(self, frames): + results_generator = self.model.predict(source=frames, imgsz=self.model_size, + save_txt=False, + save=False, + verbose=False, stream=True) + result_boxes = [] + for r in results_generator: + result_boxes.append(r.boxes) + + return result_boxes + + def get_label(self, cls): + return self.model_names.get(cls, str(cls)) diff --git a/tcp_client.py b/tcp_client.py new file mode 100644 index 0000000..ffff57c --- /dev/null +++ b/tcp_client.py @@ -0,0 +1,162 @@ +import threading +import socket +import queue +import time +import asyncio + +from global_logger import logger + + +class AsyncTCPClient: + def __init__(self, server_ip, server_port): + self.server_ip = server_ip + self.server_port = server_port + self.writer = None + self.queue = asyncio.Queue() + self.connected = False + self.running = True + + async def connect(self): + # 先关闭现有的writer + if self.writer is not None: + try: + self.writer.close() + await self.writer.wait_closed() + except Exception as e: + logger.exception(f"关闭现有连接时出错: {e}") + self.writer = None + + while self.running: + try: + logger.info(f"连接 TCP 服务器 {self.server_ip}:{self.server_port}") + _, self.writer = await asyncio.open_connection(self.server_ip, self.server_port) + self.connected = True + logger.info("TCP 连接成功") + return + except Exception as e: + logger.exception(f"TCP 连接失败: {e}, 5 秒后重试") + self.connected = False + await asyncio.sleep(5) + + async def send_loop(self): + while self.running: + if not self.connected: + logger.info(f"TCP 服务器 {self.server_ip}:{self.server_port}连接断开,重新连接") + await self.connect() + try: + message, msg_time, expire_second = await self.queue.get() + # 检查消息是否已过期 + current_time = time.time() + if current_time - msg_time > expire_second: + logger.info(f"丢弃过期消息: {message}, 已过期 {current_time - msg_time:.1f} 秒") + continue + + self.writer.write(message.encode('utf-8')) + await self.writer.drain() + logger.info(f"TCP 发送数据: {message}") + except Exception as e: + logger.exception(f"TCP 发送失败: {e}") + self.connected = False + await asyncio.sleep(5) + + async def send(self, message: str, expire_second=60): + """向队列添加消息,带过期时间 + Args: + message: 要发送的消息 + expire_second: 消息过期时间(秒),默认60秒 + """ + logger.debug(f"添加消息到TCP队列: {message}") + await self.queue.put((message, time.time(), expire_second)) + + +class TCPClient: + """TCP客户端管理""" + + def __init__(self, server_ip, server_port): + self.server_ip = server_ip + self.server_port = server_port + self.sock = None + self.queue = queue.Queue() + self.lock = threading.Lock() + self.connected = False + self.running = True + self.send_thread = None + + def connect(self): + """建立TCP连接""" + while self.running: + try: + logger.info(f"连接 TCP 服务器 {self.server_ip}:{self.server_port}") + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.server_ip, self.server_port)) + self.connected = True + logger.info("TCP 连接成功") + return True + except Exception as e: + logger.exception(f"TCP 连接失败: {e}, 5 秒后重试") + self.connected = False + time.sleep(5) + return False + + def send_data(self, data: str): + """线程安全的数据发送""" + with self.lock: + if not self.connected: + if not self.connect(): + return False + + try: + self.sock.sendall(data.encode('utf-8')) + logger.info(f"TCP 发送数据: {data}") + return True + except Exception as e: + logger.exception(f"TCP 发送失败: {e}") + self.connected = False + self.sock.close() + self.sock = None + return False + + def send_loop(self): + """发送循环处理队列数据""" + while self.running: + try: + if not self.connected: + logger.info(f"TCP 服务器 {self.server_ip}:{self.server_port}连接断开,重新连接") + self.connect() + + message, msg_time, expire_second = self.queue.get(timeout=1) + # 检查消息是否已过期 + current_time = time.time() + if current_time - msg_time > expire_second: + logger.info(f"丢弃过期消息: {message}, 已过期 {current_time - msg_time:.1f} 秒") + continue + + if not self.send_data(message): + self.queue.put((message, msg_time, expire_second)) # 重新放回队列? + time.sleep(5) + except queue.Empty: + continue + except Exception as e: + logger.exception(f"TCP 工作线程错误: {e}") + + def start(self): + """启动发送线程""" + self.send_thread = threading.Thread(target=self.send_loop, daemon=True) + self.send_thread.start() + return self + + def send(self, message: str, expire_second=60): + """向队列添加消息,带过期时间 + + Args: + message: 要发送的消息 + expire_second: 消息过期时间(秒),默认60秒 + """ + logger.debug(f"添加消息到TCP队列: {message}") + self.queue.put((message, time.time(), expire_second)) + + def stop(self): + """停止服务""" + self.running = False + if self.sock: + self.sock.close() diff --git a/weights/go-v8s-20250117.pt b/weights/go-v8s-20250117.pt new file mode 100644 index 0000000..fad8f10 --- /dev/null +++ b/weights/go-v8s-20250117.pt Binary files differ