diff --git a/camera_processor.py b/camera_processor.py index 5675e58..4b0028c 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -1,4 +1,7 @@ # camera_processor.py +import base64 +from copy import deepcopy + import cv2 import threading import time @@ -6,7 +9,9 @@ import json import os -from global_logger import logger +from config import BIZ_CLASS +from global_logger import logger, process_log_data +from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): @@ -23,6 +28,8 @@ self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) + self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None @@ -51,15 +58,19 @@ self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 - + # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") - + # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() + self.last_biz_boxes = [] + + self.last_alarm_ts = None + self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -68,8 +79,8 @@ self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") - # self.cap = cv2.VideoCapture(self.gst_str) - self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") @@ -121,8 +132,6 @@ self.loop ) - # 如果有额外HTTP处理逻辑,可在此添加 - def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: @@ -135,17 +144,24 @@ for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] - + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + self.last_biz_boxes = [] + # 用于保存的带标注图像 - annotated_frame = frame.copy() if self.save_annotated_images else None - + annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None + + frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) + frame_labels.append(label) # print(label) - + # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ @@ -157,34 +173,72 @@ self.loop ) + if label in self.biz_cls: + self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) + # 在复制的图像上绘制标注 if self.save_annotated_images: - self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf) - + annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) + + print(self.last_biz_boxes) + + # 上传第三方施工报警 + if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): + + if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: + continue + + alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] + alarm_annotator = Annotator(deepcopy(frame)) + for box in alarm_boxes: + alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) + + success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) + if not success: + logger.error(f"摄像头 {self.cam_id} 编码图像失败") + continue + + alarm_labels = [box[5] for box in alarm_boxes] + request_data = { + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', + "reportType": '10006', + "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', + "isAlarm": 1, + } + self.last_alarm_ts = time.time() + logger.debug(process_log_data(request_data)) + # 发送HTTP请求 + asyncio.run_coroutine_threadsafe( + self.http_client.send([request_data]), + self.loop + ) + # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: + annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) - + def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - + # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - + return image - + def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" - + # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") @@ -205,7 +259,6 @@ self.frame_buffer = [] self.frame_info_buffer = [] - def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() @@ -224,9 +277,9 @@ """ with self.last_frame_lock: if self.last_frame is None: - return None - return self.last_frame.copy() - + return None, [] + print(f'====================={self.last_biz_boxes}') + return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" @@ -268,9 +321,9 @@ # 读取成功后,重置失败计数 failure_count = 0 - # 保存最后一帧 - with self.last_frame_lock: - self.last_frame = frame.copy() + # # 保存最后一帧 + # with self.last_frame_lock: + # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 diff --git a/camera_processor.py b/camera_processor.py index 5675e58..4b0028c 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -1,4 +1,7 @@ # camera_processor.py +import base64 +from copy import deepcopy + import cv2 import threading import time @@ -6,7 +9,9 @@ import json import os -from global_logger import logger +from config import BIZ_CLASS +from global_logger import logger, process_log_data +from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): @@ -23,6 +28,8 @@ self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) + self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None @@ -51,15 +58,19 @@ self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 - + # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") - + # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() + self.last_biz_boxes = [] + + self.last_alarm_ts = None + self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -68,8 +79,8 @@ self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") - # self.cap = cv2.VideoCapture(self.gst_str) - self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") @@ -121,8 +132,6 @@ self.loop ) - # 如果有额外HTTP处理逻辑,可在此添加 - def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: @@ -135,17 +144,24 @@ for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] - + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + self.last_biz_boxes = [] + # 用于保存的带标注图像 - annotated_frame = frame.copy() if self.save_annotated_images else None - + annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None + + frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) + frame_labels.append(label) # print(label) - + # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ @@ -157,34 +173,72 @@ self.loop ) + if label in self.biz_cls: + self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) + # 在复制的图像上绘制标注 if self.save_annotated_images: - self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf) - + annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) + + print(self.last_biz_boxes) + + # 上传第三方施工报警 + if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): + + if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: + continue + + alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] + alarm_annotator = Annotator(deepcopy(frame)) + for box in alarm_boxes: + alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) + + success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) + if not success: + logger.error(f"摄像头 {self.cam_id} 编码图像失败") + continue + + alarm_labels = [box[5] for box in alarm_boxes] + request_data = { + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', + "reportType": '10006', + "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', + "isAlarm": 1, + } + self.last_alarm_ts = time.time() + logger.debug(process_log_data(request_data)) + # 发送HTTP请求 + asyncio.run_coroutine_threadsafe( + self.http_client.send([request_data]), + self.loop + ) + # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: + annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) - + def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - + # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - + return image - + def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" - + # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") @@ -205,7 +259,6 @@ self.frame_buffer = [] self.frame_info_buffer = [] - def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() @@ -224,9 +277,9 @@ """ with self.last_frame_lock: if self.last_frame is None: - return None - return self.last_frame.copy() - + return None, [] + print(f'====================={self.last_biz_boxes}') + return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" @@ -268,9 +321,9 @@ # 读取成功后,重置失败计数 failure_count = 0 - # 保存最后一帧 - with self.last_frame_lock: - self.last_frame = frame.copy() + # # 保存最后一帧 + # with self.last_frame_lock: + # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 diff --git a/config.py b/config.py index e3438b0..eeffc86 100644 --- a/config.py +++ b/config.py @@ -1,49 +1,51 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True, - "frame_interval": 5, - "receive_capture_command": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True, - "frame_interval": 5, - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], - # "frame_interval": 5, - # "remark": "本地测试摄像头", + # "alarm_send_cls": ["路锥", "反光衣"], + # "remark": "机械臂摄像头", # "save_annotated_images": True, + # "frame_interval": 5, # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "alarm_send_cls": ["路锥", "反光衣"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { @@ -52,32 +54,33 @@ } HTTP_SERVER = { - "url": "http://111.198.10.15:11645/algorithm/receiveData", + # "url": "http://111.198.10.15:11645/algorithm/receiveData", # "url": "http://192.168.150.39:30028/algorithm/receiveData", + "url": "http://127.0.0.1:8000/test", "timeout": 10 # 超时重试 } MODEL_CLASS = { - 0: '井盖', - 1: '井盖塌陷', - 2: '井盖眼', - 3: '井盖破损', - 4: '井盖移位', - 5: '井盖缺失', - 6: '人', - 7: '压路机', - 8: '反光衣', - 9: '土堆', - 10: '土方车', - 11: '头', - 12: '安全帽', - 13: '挖掘机', - 14: '推土机', - 15: '施工路牌', - 16: '水马', - 17: '路锥', - 18: '铁锹', - 19: '防护栏', + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', 20: '风镐' } @@ -86,4 +89,20 @@ "size": 640, "class_map": MODEL_CLASS, "batch_size": 1, +} + +BIZ_CLASS = { + "WELL": ['井盖', '井盖塌陷', '井盖眼', '井盖破损', '井盖移位', '井盖缺失', ], + "ENTER": [], + "BOX": [], + "CONSTRUCTION": ['反光衣', '路锥'] +} + + +PRESET_CONTENT_MAP = { + "1": ('10006', '第三方施工监测', 1), + "2": ('10007', '闸井监测', None), + "3": ('10008', '引入口监测', None), + "4": ('10009', '调压箱监测 ', None), + "5": ('10011', '燃气泄漏', None), # 使用None表示需要根据gas_data计算 } \ No newline at end of file diff --git a/camera_processor.py b/camera_processor.py index 5675e58..4b0028c 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -1,4 +1,7 @@ # camera_processor.py +import base64 +from copy import deepcopy + import cv2 import threading import time @@ -6,7 +9,9 @@ import json import os -from global_logger import logger +from config import BIZ_CLASS +from global_logger import logger, process_log_data +from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): @@ -23,6 +28,8 @@ self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) + self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None @@ -51,15 +58,19 @@ self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 - + # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") - + # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() + self.last_biz_boxes = [] + + self.last_alarm_ts = None + self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -68,8 +79,8 @@ self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") - # self.cap = cv2.VideoCapture(self.gst_str) - self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") @@ -121,8 +132,6 @@ self.loop ) - # 如果有额外HTTP处理逻辑,可在此添加 - def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: @@ -135,17 +144,24 @@ for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] - + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + self.last_biz_boxes = [] + # 用于保存的带标注图像 - annotated_frame = frame.copy() if self.save_annotated_images else None - + annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None + + frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) + frame_labels.append(label) # print(label) - + # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ @@ -157,34 +173,72 @@ self.loop ) + if label in self.biz_cls: + self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) + # 在复制的图像上绘制标注 if self.save_annotated_images: - self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf) - + annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) + + print(self.last_biz_boxes) + + # 上传第三方施工报警 + if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): + + if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: + continue + + alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] + alarm_annotator = Annotator(deepcopy(frame)) + for box in alarm_boxes: + alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) + + success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) + if not success: + logger.error(f"摄像头 {self.cam_id} 编码图像失败") + continue + + alarm_labels = [box[5] for box in alarm_boxes] + request_data = { + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', + "reportType": '10006', + "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', + "isAlarm": 1, + } + self.last_alarm_ts = time.time() + logger.debug(process_log_data(request_data)) + # 发送HTTP请求 + asyncio.run_coroutine_threadsafe( + self.http_client.send([request_data]), + self.loop + ) + # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: + annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) - + def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - + # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - + return image - + def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" - + # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") @@ -205,7 +259,6 @@ self.frame_buffer = [] self.frame_info_buffer = [] - def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() @@ -224,9 +277,9 @@ """ with self.last_frame_lock: if self.last_frame is None: - return None - return self.last_frame.copy() - + return None, [] + print(f'====================={self.last_biz_boxes}') + return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" @@ -268,9 +321,9 @@ # 读取成功后,重置失败计数 failure_count = 0 - # 保存最后一帧 - with self.last_frame_lock: - self.last_frame = frame.copy() + # # 保存最后一帧 + # with self.last_frame_lock: + # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 diff --git a/config.py b/config.py index e3438b0..eeffc86 100644 --- a/config.py +++ b/config.py @@ -1,49 +1,51 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True, - "frame_interval": 5, - "receive_capture_command": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True, - "frame_interval": 5, - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], - # "frame_interval": 5, - # "remark": "本地测试摄像头", + # "alarm_send_cls": ["路锥", "反光衣"], + # "remark": "机械臂摄像头", # "save_annotated_images": True, + # "frame_interval": 5, # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "alarm_send_cls": ["路锥", "反光衣"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { @@ -52,32 +54,33 @@ } HTTP_SERVER = { - "url": "http://111.198.10.15:11645/algorithm/receiveData", + # "url": "http://111.198.10.15:11645/algorithm/receiveData", # "url": "http://192.168.150.39:30028/algorithm/receiveData", + "url": "http://127.0.0.1:8000/test", "timeout": 10 # 超时重试 } MODEL_CLASS = { - 0: '井盖', - 1: '井盖塌陷', - 2: '井盖眼', - 3: '井盖破损', - 4: '井盖移位', - 5: '井盖缺失', - 6: '人', - 7: '压路机', - 8: '反光衣', - 9: '土堆', - 10: '土方车', - 11: '头', - 12: '安全帽', - 13: '挖掘机', - 14: '推土机', - 15: '施工路牌', - 16: '水马', - 17: '路锥', - 18: '铁锹', - 19: '防护栏', + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', 20: '风镐' } @@ -86,4 +89,20 @@ "size": 640, "class_map": MODEL_CLASS, "batch_size": 1, +} + +BIZ_CLASS = { + "WELL": ['井盖', '井盖塌陷', '井盖眼', '井盖破损', '井盖移位', '井盖缺失', ], + "ENTER": [], + "BOX": [], + "CONSTRUCTION": ['反光衣', '路锥'] +} + + +PRESET_CONTENT_MAP = { + "1": ('10006', '第三方施工监测', 1), + "2": ('10007', '闸井监测', None), + "3": ('10008', '引入口监测', None), + "4": ('10009', '调压箱监测 ', None), + "5": ('10011', '燃气泄漏', None), # 使用None表示需要根据gas_data计算 } \ No newline at end of file diff --git a/global_logger.py b/global_logger.py index fed8eee..bf4726d 100644 --- a/global_logger.py +++ b/global_logger.py @@ -42,7 +42,8 @@ logger.addHandler(handler) # 创建控制台处理器 -console_handler = logging.StreamHandler() +utf8_stream = open(sys.__stdout__.fileno(), mode='w', encoding='utf-8', buffering=1) +console_handler = logging.StreamHandler(utf8_stream) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(formatter) logger.addHandler(console_handler) diff --git a/camera_processor.py b/camera_processor.py index 5675e58..4b0028c 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -1,4 +1,7 @@ # camera_processor.py +import base64 +from copy import deepcopy + import cv2 import threading import time @@ -6,7 +9,9 @@ import json import os -from global_logger import logger +from config import BIZ_CLASS +from global_logger import logger, process_log_data +from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): @@ -23,6 +28,8 @@ self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) + self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None @@ -51,15 +58,19 @@ self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 - + # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") - + # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() + self.last_biz_boxes = [] + + self.last_alarm_ts = None + self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -68,8 +79,8 @@ self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") - # self.cap = cv2.VideoCapture(self.gst_str) - self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") @@ -121,8 +132,6 @@ self.loop ) - # 如果有额外HTTP处理逻辑,可在此添加 - def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: @@ -135,17 +144,24 @@ for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] - + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + self.last_biz_boxes = [] + # 用于保存的带标注图像 - annotated_frame = frame.copy() if self.save_annotated_images else None - + annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None + + frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) + frame_labels.append(label) # print(label) - + # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ @@ -157,34 +173,72 @@ self.loop ) + if label in self.biz_cls: + self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) + # 在复制的图像上绘制标注 if self.save_annotated_images: - self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf) - + annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) + + print(self.last_biz_boxes) + + # 上传第三方施工报警 + if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): + + if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: + continue + + alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] + alarm_annotator = Annotator(deepcopy(frame)) + for box in alarm_boxes: + alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) + + success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) + if not success: + logger.error(f"摄像头 {self.cam_id} 编码图像失败") + continue + + alarm_labels = [box[5] for box in alarm_boxes] + request_data = { + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', + "reportType": '10006', + "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', + "isAlarm": 1, + } + self.last_alarm_ts = time.time() + logger.debug(process_log_data(request_data)) + # 发送HTTP请求 + asyncio.run_coroutine_threadsafe( + self.http_client.send([request_data]), + self.loop + ) + # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: + annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) - + def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - + # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - + return image - + def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" - + # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") @@ -205,7 +259,6 @@ self.frame_buffer = [] self.frame_info_buffer = [] - def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() @@ -224,9 +277,9 @@ """ with self.last_frame_lock: if self.last_frame is None: - return None - return self.last_frame.copy() - + return None, [] + print(f'====================={self.last_biz_boxes}') + return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" @@ -268,9 +321,9 @@ # 读取成功后,重置失败计数 failure_count = 0 - # 保存最后一帧 - with self.last_frame_lock: - self.last_frame = frame.copy() + # # 保存最后一帧 + # with self.last_frame_lock: + # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 diff --git a/config.py b/config.py index e3438b0..eeffc86 100644 --- a/config.py +++ b/config.py @@ -1,49 +1,51 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True, - "frame_interval": 5, - "receive_capture_command": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True, - "frame_interval": 5, - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], - # "frame_interval": 5, - # "remark": "本地测试摄像头", + # "alarm_send_cls": ["路锥", "反光衣"], + # "remark": "机械臂摄像头", # "save_annotated_images": True, + # "frame_interval": 5, # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "alarm_send_cls": ["路锥", "反光衣"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { @@ -52,32 +54,33 @@ } HTTP_SERVER = { - "url": "http://111.198.10.15:11645/algorithm/receiveData", + # "url": "http://111.198.10.15:11645/algorithm/receiveData", # "url": "http://192.168.150.39:30028/algorithm/receiveData", + "url": "http://127.0.0.1:8000/test", "timeout": 10 # 超时重试 } MODEL_CLASS = { - 0: '井盖', - 1: '井盖塌陷', - 2: '井盖眼', - 3: '井盖破损', - 4: '井盖移位', - 5: '井盖缺失', - 6: '人', - 7: '压路机', - 8: '反光衣', - 9: '土堆', - 10: '土方车', - 11: '头', - 12: '安全帽', - 13: '挖掘机', - 14: '推土机', - 15: '施工路牌', - 16: '水马', - 17: '路锥', - 18: '铁锹', - 19: '防护栏', + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', 20: '风镐' } @@ -86,4 +89,20 @@ "size": 640, "class_map": MODEL_CLASS, "batch_size": 1, +} + +BIZ_CLASS = { + "WELL": ['井盖', '井盖塌陷', '井盖眼', '井盖破损', '井盖移位', '井盖缺失', ], + "ENTER": [], + "BOX": [], + "CONSTRUCTION": ['反光衣', '路锥'] +} + + +PRESET_CONTENT_MAP = { + "1": ('10006', '第三方施工监测', 1), + "2": ('10007', '闸井监测', None), + "3": ('10008', '引入口监测', None), + "4": ('10009', '调压箱监测 ', None), + "5": ('10011', '燃气泄漏', None), # 使用None表示需要根据gas_data计算 } \ No newline at end of file diff --git a/global_logger.py b/global_logger.py index fed8eee..bf4726d 100644 --- a/global_logger.py +++ b/global_logger.py @@ -42,7 +42,8 @@ logger.addHandler(handler) # 创建控制台处理器 -console_handler = logging.StreamHandler() +utf8_stream = open(sys.__stdout__.fileno(), mode='w', encoding='utf-8', buffering=1) +console_handler = logging.StreamHandler(utf8_stream) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(formatter) logger.addHandler(console_handler) diff --git a/handle_tcp_command.py b/handle_tcp_command.py index 8e7d5d8..3964d20 100644 --- a/handle_tcp_command.py +++ b/handle_tcp_command.py @@ -3,8 +3,13 @@ import json import re import time +from copy import deepcopy + import cv2 + +from config import BIZ_CLASS, PRESET_CONTENT_MAP from global_logger import logger, process_log_data +from image_plotting import Annotator, COLOR_RED, COLOR_BLUE class HandelTCPCommand: @@ -12,6 +17,47 @@ self.camera_processors = camera_processors self.http_client = http_client + @staticmethod + def get_cap_content(preset, gas_data): + """获取捕获内容的配置信息""" + content = PRESET_CONTENT_MAP.get(str(preset)) + if content is None: + return '', '', 0 + + report_type, desc, status = content + if status is None: # 燃气泄漏特殊处理 + status = 0 if float(gas_data) <= 0 else 1 + return report_type, desc, status + + def _process_boxes_with_annotation(self, frame, biz_boxes, box_type, gas_alarm): + """处理特定类型的检测框并添加标注""" + boxes = [box for box in biz_boxes if box[5] in BIZ_CLASS[box_type]] + if not boxes: + return None, None + + annotator = Annotator(deepcopy(frame)) + color = COLOR_RED if gas_alarm else COLOR_BLUE + + for box in boxes: + annotator.box_label( + box=[box[0], box[1], box[2], box[3]], + label='井盖' if box[5].index('井盖')>=0 else box[5], + color=color + ) + + success, jpg_data = cv2.imencode('.jpg', annotator.result()) + if not success: + return None, None + + # 根据不同类型返回对应的报告信息 + report_info = { + 'WELL': ('10007', f'井盖完整,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'ENTER': ('10008', f'检测到引入口,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'BOX': ('10009', f'检测到调压箱,燃气浓度{"正常" if not gas_alarm else "异常"}') + } + + return jpg_data, report_info.get(box_type) + async def capture_and_send_current_frame(self, camera_processor, preset, gas_data, longitude, latitude): """捕获当前帧并通过HTTP发送到后台""" @@ -19,38 +65,42 @@ logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧") # 使用get_last_frame方法获取最后一帧,不直接访问内部属性 - frame = camera_processor.get_last_frame() + frame, biz_boxes = camera_processor.get_last_frame() if frame is None: logger.warning(f"摄像头 {cam_id} 没有可用的帧") return False try: skip_img = int(preset) == 5 and float(gas_data) <= 0 - # 将图像编码为JPEG - success, jpg_data = cv2.imencode('.jpg', frame) if not skip_img else (True, None) - if not success: - logger.error(f"摄像头 {cam_id} 编码图像失败") - return False + gas_alarm = 0 if float(gas_data) <= 0 else 1 - def get_cap_content(preset, gas_data): - map = { - "1": ('10006', '第三方施工监测', 1), - "2": ('10007', '闸井监测', 0), - "3": ('10008', '引入口监测', 0), - "4": ('10009', '调压箱监测 ', 0), - "5": ('10011', '燃气泄漏', 0 if float(gas_data) <= 0 else 1), - } - return map.get(str(preset), ('', '', 0)) + # 获取基础配置 + report_type, content, status = self.get_cap_content(preset, gas_data) + jpg_data = None + + if not skip_img: + # 按优先级依次检查各种类型的检测框 + for box_type in ['WELL', 'ENTER', 'BOX']: + jpg_data, report_info = self._process_boxes_with_annotation( + frame, biz_boxes, box_type, gas_alarm + ) + if jpg_data is not None: + report_type, content = report_info + status = gas_alarm + break + + # 如果没有检测到任何目标,使用原始帧 + if jpg_data is None and not skip_img: + success, jpg_data = cv2.imencode('.jpg', frame) + if not success: + logger.error(f"摄像头 {cam_id} 编码图像失败") + return False - # 构建请求数据 - report_type, content, status = get_cap_content(preset, gas_data) - - if int(preset) == 5: - status = 0 if float(gas_data) <= 0 else 1 - + request_data = { "routeNumber": preset, - "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if (jpg_data is not None and jpg_data.size > 0) else '', + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', "reportType": report_type, "reportContent": content, "isAlarm": status, @@ -79,7 +129,8 @@ if match: preset, gas_data, longitude, latitude = match.groups() - logger.debug(f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") + logger.debug( + f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") # 对所有摄像头执行 for cam_id, processor in self.camera_processors.items(): diff --git a/camera_processor.py b/camera_processor.py index 5675e58..4b0028c 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -1,4 +1,7 @@ # camera_processor.py +import base64 +from copy import deepcopy + import cv2 import threading import time @@ -6,7 +9,9 @@ import json import os -from global_logger import logger +from config import BIZ_CLASS +from global_logger import logger, process_log_data +from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): @@ -23,6 +28,8 @@ self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) + self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None @@ -51,15 +58,19 @@ self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 - + # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") - + # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() + self.last_biz_boxes = [] + + self.last_alarm_ts = None + self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -68,8 +79,8 @@ self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") - # self.cap = cv2.VideoCapture(self.gst_str) - self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") @@ -121,8 +132,6 @@ self.loop ) - # 如果有额外HTTP处理逻辑,可在此添加 - def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: @@ -135,17 +144,24 @@ for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] - + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + self.last_biz_boxes = [] + # 用于保存的带标注图像 - annotated_frame = frame.copy() if self.save_annotated_images else None - + annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None + + frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) + frame_labels.append(label) # print(label) - + # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ @@ -157,34 +173,72 @@ self.loop ) + if label in self.biz_cls: + self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) + # 在复制的图像上绘制标注 if self.save_annotated_images: - self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf) - + annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) + + print(self.last_biz_boxes) + + # 上传第三方施工报警 + if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): + + if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: + continue + + alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] + alarm_annotator = Annotator(deepcopy(frame)) + for box in alarm_boxes: + alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) + + success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) + if not success: + logger.error(f"摄像头 {self.cam_id} 编码图像失败") + continue + + alarm_labels = [box[5] for box in alarm_boxes] + request_data = { + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', + "reportType": '10006', + "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', + "isAlarm": 1, + } + self.last_alarm_ts = time.time() + logger.debug(process_log_data(request_data)) + # 发送HTTP请求 + asyncio.run_coroutine_threadsafe( + self.http_client.send([request_data]), + self.loop + ) + # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: + annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) - + def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - + # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - + return image - + def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" - + # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") @@ -205,7 +259,6 @@ self.frame_buffer = [] self.frame_info_buffer = [] - def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() @@ -224,9 +277,9 @@ """ with self.last_frame_lock: if self.last_frame is None: - return None - return self.last_frame.copy() - + return None, [] + print(f'====================={self.last_biz_boxes}') + return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" @@ -268,9 +321,9 @@ # 读取成功后,重置失败计数 failure_count = 0 - # 保存最后一帧 - with self.last_frame_lock: - self.last_frame = frame.copy() + # # 保存最后一帧 + # with self.last_frame_lock: + # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 diff --git a/config.py b/config.py index e3438b0..eeffc86 100644 --- a/config.py +++ b/config.py @@ -1,49 +1,51 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True, - "frame_interval": 5, - "receive_capture_command": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True, - "frame_interval": 5, - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], - # "frame_interval": 5, - # "remark": "本地测试摄像头", + # "alarm_send_cls": ["路锥", "反光衣"], + # "remark": "机械臂摄像头", # "save_annotated_images": True, + # "frame_interval": 5, # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "alarm_send_cls": ["路锥", "反光衣"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { @@ -52,32 +54,33 @@ } HTTP_SERVER = { - "url": "http://111.198.10.15:11645/algorithm/receiveData", + # "url": "http://111.198.10.15:11645/algorithm/receiveData", # "url": "http://192.168.150.39:30028/algorithm/receiveData", + "url": "http://127.0.0.1:8000/test", "timeout": 10 # 超时重试 } MODEL_CLASS = { - 0: '井盖', - 1: '井盖塌陷', - 2: '井盖眼', - 3: '井盖破损', - 4: '井盖移位', - 5: '井盖缺失', - 6: '人', - 7: '压路机', - 8: '反光衣', - 9: '土堆', - 10: '土方车', - 11: '头', - 12: '安全帽', - 13: '挖掘机', - 14: '推土机', - 15: '施工路牌', - 16: '水马', - 17: '路锥', - 18: '铁锹', - 19: '防护栏', + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', 20: '风镐' } @@ -86,4 +89,20 @@ "size": 640, "class_map": MODEL_CLASS, "batch_size": 1, +} + +BIZ_CLASS = { + "WELL": ['井盖', '井盖塌陷', '井盖眼', '井盖破损', '井盖移位', '井盖缺失', ], + "ENTER": [], + "BOX": [], + "CONSTRUCTION": ['反光衣', '路锥'] +} + + +PRESET_CONTENT_MAP = { + "1": ('10006', '第三方施工监测', 1), + "2": ('10007', '闸井监测', None), + "3": ('10008', '引入口监测', None), + "4": ('10009', '调压箱监测 ', None), + "5": ('10011', '燃气泄漏', None), # 使用None表示需要根据gas_data计算 } \ No newline at end of file diff --git a/global_logger.py b/global_logger.py index fed8eee..bf4726d 100644 --- a/global_logger.py +++ b/global_logger.py @@ -42,7 +42,8 @@ logger.addHandler(handler) # 创建控制台处理器 -console_handler = logging.StreamHandler() +utf8_stream = open(sys.__stdout__.fileno(), mode='w', encoding='utf-8', buffering=1) +console_handler = logging.StreamHandler(utf8_stream) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(formatter) logger.addHandler(console_handler) diff --git a/handle_tcp_command.py b/handle_tcp_command.py index 8e7d5d8..3964d20 100644 --- a/handle_tcp_command.py +++ b/handle_tcp_command.py @@ -3,8 +3,13 @@ import json import re import time +from copy import deepcopy + import cv2 + +from config import BIZ_CLASS, PRESET_CONTENT_MAP from global_logger import logger, process_log_data +from image_plotting import Annotator, COLOR_RED, COLOR_BLUE class HandelTCPCommand: @@ -12,6 +17,47 @@ self.camera_processors = camera_processors self.http_client = http_client + @staticmethod + def get_cap_content(preset, gas_data): + """获取捕获内容的配置信息""" + content = PRESET_CONTENT_MAP.get(str(preset)) + if content is None: + return '', '', 0 + + report_type, desc, status = content + if status is None: # 燃气泄漏特殊处理 + status = 0 if float(gas_data) <= 0 else 1 + return report_type, desc, status + + def _process_boxes_with_annotation(self, frame, biz_boxes, box_type, gas_alarm): + """处理特定类型的检测框并添加标注""" + boxes = [box for box in biz_boxes if box[5] in BIZ_CLASS[box_type]] + if not boxes: + return None, None + + annotator = Annotator(deepcopy(frame)) + color = COLOR_RED if gas_alarm else COLOR_BLUE + + for box in boxes: + annotator.box_label( + box=[box[0], box[1], box[2], box[3]], + label='井盖' if box[5].index('井盖')>=0 else box[5], + color=color + ) + + success, jpg_data = cv2.imencode('.jpg', annotator.result()) + if not success: + return None, None + + # 根据不同类型返回对应的报告信息 + report_info = { + 'WELL': ('10007', f'井盖完整,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'ENTER': ('10008', f'检测到引入口,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'BOX': ('10009', f'检测到调压箱,燃气浓度{"正常" if not gas_alarm else "异常"}') + } + + return jpg_data, report_info.get(box_type) + async def capture_and_send_current_frame(self, camera_processor, preset, gas_data, longitude, latitude): """捕获当前帧并通过HTTP发送到后台""" @@ -19,38 +65,42 @@ logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧") # 使用get_last_frame方法获取最后一帧,不直接访问内部属性 - frame = camera_processor.get_last_frame() + frame, biz_boxes = camera_processor.get_last_frame() if frame is None: logger.warning(f"摄像头 {cam_id} 没有可用的帧") return False try: skip_img = int(preset) == 5 and float(gas_data) <= 0 - # 将图像编码为JPEG - success, jpg_data = cv2.imencode('.jpg', frame) if not skip_img else (True, None) - if not success: - logger.error(f"摄像头 {cam_id} 编码图像失败") - return False + gas_alarm = 0 if float(gas_data) <= 0 else 1 - def get_cap_content(preset, gas_data): - map = { - "1": ('10006', '第三方施工监测', 1), - "2": ('10007', '闸井监测', 0), - "3": ('10008', '引入口监测', 0), - "4": ('10009', '调压箱监测 ', 0), - "5": ('10011', '燃气泄漏', 0 if float(gas_data) <= 0 else 1), - } - return map.get(str(preset), ('', '', 0)) + # 获取基础配置 + report_type, content, status = self.get_cap_content(preset, gas_data) + jpg_data = None + + if not skip_img: + # 按优先级依次检查各种类型的检测框 + for box_type in ['WELL', 'ENTER', 'BOX']: + jpg_data, report_info = self._process_boxes_with_annotation( + frame, biz_boxes, box_type, gas_alarm + ) + if jpg_data is not None: + report_type, content = report_info + status = gas_alarm + break + + # 如果没有检测到任何目标,使用原始帧 + if jpg_data is None and not skip_img: + success, jpg_data = cv2.imencode('.jpg', frame) + if not success: + logger.error(f"摄像头 {cam_id} 编码图像失败") + return False - # 构建请求数据 - report_type, content, status = get_cap_content(preset, gas_data) - - if int(preset) == 5: - status = 0 if float(gas_data) <= 0 else 1 - + request_data = { "routeNumber": preset, - "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if (jpg_data is not None and jpg_data.size > 0) else '', + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', "reportType": report_type, "reportContent": content, "isAlarm": status, @@ -79,7 +129,8 @@ if match: preset, gas_data, longitude, latitude = match.groups() - logger.debug(f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") + logger.debug( + f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") # 对所有摄像头执行 for cam_id, processor in self.camera_processors.items(): diff --git a/image_plotting.py b/image_plotting.py new file mode 100644 index 0000000..e0af106 --- /dev/null +++ b/image_plotting.py @@ -0,0 +1,130 @@ +from collections.abc import Sequence + +import numpy as np +import cv2 +from PIL import Image, ImageDraw, ImageFont + + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) + + +class Colors: + """ + Ultralytics default color palette https://ultralytics.com/. + + This class provides methods to work with the Ultralytics color palette, including converting hex color codes to + RGB values. + + Attributes: + palette (list of tuple): List of RGB color values. + n (int): The number of colors in the palette. + pose_palette (np.ndarray): A specific color palette array with dtype np.uint8. + """ + + def __init__(self): + """Initialize colors as hex = matplotlib.colors.TABLEAU_COLORS.values().""" + hexs = ( + "FF3838", + "FF9D97", + "FF701F", + "FFB21D", + "CFD231", + "48F90A", + "92CC17", + "3DDB86", + "1A9334", + "00D4BB", + "2C99A8", + "00C2FF", + "344593", + "6473FF", + "0018EC", + "8438FF", + "520085", + "CB38FF", + "FF95C8", + "FF37C7", + ) + self.palette = [self.hex2rgb(f"#{c}") for c in hexs] + self.n = len(self.palette) + self.pose_palette = np.array( + [ + [255, 128, 0], + [255, 153, 51], + [255, 178, 102], + [230, 230, 0], + [255, 153, 255], + [153, 204, 255], + [255, 102, 255], + [255, 51, 255], + [102, 178, 255], + [51, 153, 255], + [255, 153, 153], + [255, 102, 102], + [255, 51, 51], + [153, 255, 153], + [102, 255, 102], + [51, 255, 51], + [0, 255, 0], + [0, 0, 255], + [255, 0, 0], + [255, 255, 255], + ], + dtype=np.uint8, + ) + + def __call__(self, i, bgr=False): + """Converts hex color codes to RGB values.""" + c = self.palette[int(i) % self.n] + return (c[2], c[1], c[0]) if bgr else c + + @staticmethod + def hex2rgb(h): + """Converts hex color codes to RGB values (i.e. default PIL order).""" + return tuple(int(h[1 + i: 1 + i + 2], 16) for i in (0, 2, 4)) + + +colors = Colors() + + +class Annotator: + + def __init__(self, im, line_width=None, font_size=None, font="Arial.ttf", pil=False, example="abc"): + self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) + + self.im = Image.fromarray(im) + self.draw = ImageDraw.Draw(self.im) + + font = 'static/font/Arial.Unicode.ttf' + size = font_size or max(round(sum(self.im.size) / 2 * 0.035), 12) + self.font = ImageFont.truetype(str(font), size) + if not hasattr(self.font, 'getsize'): + self.font.getsize = lambda x: self.font.getbbox(x)[2:4] + + self.tf = max(self.lw - 1, 1) # font thickness + self.sf = self.lw / 3 # font scale + + def box_label(self, box, label="", color=(128, 128, 128), txt_color=(255, 255, 255), rotated=False): + if not isinstance(box, Sequence): + box = box.tolist() + if rotated: + p1 = box[0] + # NOTE: PIL-version polygon needs tuple type. + self.draw.polygon([tuple(b) for b in box], width=self.lw, outline=color) + else: + p1 = (box[0], box[1]) + self.draw.rectangle(box, width=self.lw, outline=color) # box + if label: + w, h = self.font.getsize(label) # text width, height + outside = p1[1] - h >= 0 # label fits outside box + self.draw.rectangle( + (p1[0], p1[1] - h if outside else p1[1], p1[0] + w + 1, p1[1] + 1 if outside else p1[1] + h + 1), + fill=color, + ) + # self.draw.text((box[0], box[1]), label, fill=txt_color, font=self.font, anchor='ls') # for PIL>8.0 + self.draw.text((p1[0], p1[1] - h if outside else p1[1]), label, fill=txt_color, font=self.font) + + def result(self): + """Return annotated image as array.""" + return np.asarray(self.im) \ No newline at end of file diff --git a/camera_processor.py b/camera_processor.py index 5675e58..4b0028c 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -1,4 +1,7 @@ # camera_processor.py +import base64 +from copy import deepcopy + import cv2 import threading import time @@ -6,7 +9,9 @@ import json import os -from global_logger import logger +from config import BIZ_CLASS +from global_logger import logger, process_log_data +from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): @@ -23,6 +28,8 @@ self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) + self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None @@ -51,15 +58,19 @@ self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 - + # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") - + # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() + self.last_biz_boxes = [] + + self.last_alarm_ts = None + self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -68,8 +79,8 @@ self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") - # self.cap = cv2.VideoCapture(self.gst_str) - self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") @@ -121,8 +132,6 @@ self.loop ) - # 如果有额外HTTP处理逻辑,可在此添加 - def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: @@ -135,17 +144,24 @@ for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] - + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + self.last_biz_boxes = [] + # 用于保存的带标注图像 - annotated_frame = frame.copy() if self.save_annotated_images else None - + annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None + + frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) + frame_labels.append(label) # print(label) - + # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ @@ -157,34 +173,72 @@ self.loop ) + if label in self.biz_cls: + self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) + # 在复制的图像上绘制标注 if self.save_annotated_images: - self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf) - + annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) + + print(self.last_biz_boxes) + + # 上传第三方施工报警 + if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): + + if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: + continue + + alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] + alarm_annotator = Annotator(deepcopy(frame)) + for box in alarm_boxes: + alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) + + success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) + if not success: + logger.error(f"摄像头 {self.cam_id} 编码图像失败") + continue + + alarm_labels = [box[5] for box in alarm_boxes] + request_data = { + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', + "reportType": '10006', + "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', + "isAlarm": 1, + } + self.last_alarm_ts = time.time() + logger.debug(process_log_data(request_data)) + # 发送HTTP请求 + asyncio.run_coroutine_threadsafe( + self.http_client.send([request_data]), + self.loop + ) + # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: + annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) - + def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - + # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - + return image - + def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" - + # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") @@ -205,7 +259,6 @@ self.frame_buffer = [] self.frame_info_buffer = [] - def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() @@ -224,9 +277,9 @@ """ with self.last_frame_lock: if self.last_frame is None: - return None - return self.last_frame.copy() - + return None, [] + print(f'====================={self.last_biz_boxes}') + return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" @@ -268,9 +321,9 @@ # 读取成功后,重置失败计数 failure_count = 0 - # 保存最后一帧 - with self.last_frame_lock: - self.last_frame = frame.copy() + # # 保存最后一帧 + # with self.last_frame_lock: + # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 diff --git a/config.py b/config.py index e3438b0..eeffc86 100644 --- a/config.py +++ b/config.py @@ -1,49 +1,51 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True, - "frame_interval": 5, - "receive_capture_command": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True, - "frame_interval": 5, - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], - # "frame_interval": 5, - # "remark": "本地测试摄像头", + # "alarm_send_cls": ["路锥", "反光衣"], + # "remark": "机械臂摄像头", # "save_annotated_images": True, + # "frame_interval": 5, # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "alarm_send_cls": ["路锥", "反光衣"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { @@ -52,32 +54,33 @@ } HTTP_SERVER = { - "url": "http://111.198.10.15:11645/algorithm/receiveData", + # "url": "http://111.198.10.15:11645/algorithm/receiveData", # "url": "http://192.168.150.39:30028/algorithm/receiveData", + "url": "http://127.0.0.1:8000/test", "timeout": 10 # 超时重试 } MODEL_CLASS = { - 0: '井盖', - 1: '井盖塌陷', - 2: '井盖眼', - 3: '井盖破损', - 4: '井盖移位', - 5: '井盖缺失', - 6: '人', - 7: '压路机', - 8: '反光衣', - 9: '土堆', - 10: '土方车', - 11: '头', - 12: '安全帽', - 13: '挖掘机', - 14: '推土机', - 15: '施工路牌', - 16: '水马', - 17: '路锥', - 18: '铁锹', - 19: '防护栏', + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', 20: '风镐' } @@ -86,4 +89,20 @@ "size": 640, "class_map": MODEL_CLASS, "batch_size": 1, +} + +BIZ_CLASS = { + "WELL": ['井盖', '井盖塌陷', '井盖眼', '井盖破损', '井盖移位', '井盖缺失', ], + "ENTER": [], + "BOX": [], + "CONSTRUCTION": ['反光衣', '路锥'] +} + + +PRESET_CONTENT_MAP = { + "1": ('10006', '第三方施工监测', 1), + "2": ('10007', '闸井监测', None), + "3": ('10008', '引入口监测', None), + "4": ('10009', '调压箱监测 ', None), + "5": ('10011', '燃气泄漏', None), # 使用None表示需要根据gas_data计算 } \ No newline at end of file diff --git a/global_logger.py b/global_logger.py index fed8eee..bf4726d 100644 --- a/global_logger.py +++ b/global_logger.py @@ -42,7 +42,8 @@ logger.addHandler(handler) # 创建控制台处理器 -console_handler = logging.StreamHandler() +utf8_stream = open(sys.__stdout__.fileno(), mode='w', encoding='utf-8', buffering=1) +console_handler = logging.StreamHandler(utf8_stream) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(formatter) logger.addHandler(console_handler) diff --git a/handle_tcp_command.py b/handle_tcp_command.py index 8e7d5d8..3964d20 100644 --- a/handle_tcp_command.py +++ b/handle_tcp_command.py @@ -3,8 +3,13 @@ import json import re import time +from copy import deepcopy + import cv2 + +from config import BIZ_CLASS, PRESET_CONTENT_MAP from global_logger import logger, process_log_data +from image_plotting import Annotator, COLOR_RED, COLOR_BLUE class HandelTCPCommand: @@ -12,6 +17,47 @@ self.camera_processors = camera_processors self.http_client = http_client + @staticmethod + def get_cap_content(preset, gas_data): + """获取捕获内容的配置信息""" + content = PRESET_CONTENT_MAP.get(str(preset)) + if content is None: + return '', '', 0 + + report_type, desc, status = content + if status is None: # 燃气泄漏特殊处理 + status = 0 if float(gas_data) <= 0 else 1 + return report_type, desc, status + + def _process_boxes_with_annotation(self, frame, biz_boxes, box_type, gas_alarm): + """处理特定类型的检测框并添加标注""" + boxes = [box for box in biz_boxes if box[5] in BIZ_CLASS[box_type]] + if not boxes: + return None, None + + annotator = Annotator(deepcopy(frame)) + color = COLOR_RED if gas_alarm else COLOR_BLUE + + for box in boxes: + annotator.box_label( + box=[box[0], box[1], box[2], box[3]], + label='井盖' if box[5].index('井盖')>=0 else box[5], + color=color + ) + + success, jpg_data = cv2.imencode('.jpg', annotator.result()) + if not success: + return None, None + + # 根据不同类型返回对应的报告信息 + report_info = { + 'WELL': ('10007', f'井盖完整,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'ENTER': ('10008', f'检测到引入口,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'BOX': ('10009', f'检测到调压箱,燃气浓度{"正常" if not gas_alarm else "异常"}') + } + + return jpg_data, report_info.get(box_type) + async def capture_and_send_current_frame(self, camera_processor, preset, gas_data, longitude, latitude): """捕获当前帧并通过HTTP发送到后台""" @@ -19,38 +65,42 @@ logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧") # 使用get_last_frame方法获取最后一帧,不直接访问内部属性 - frame = camera_processor.get_last_frame() + frame, biz_boxes = camera_processor.get_last_frame() if frame is None: logger.warning(f"摄像头 {cam_id} 没有可用的帧") return False try: skip_img = int(preset) == 5 and float(gas_data) <= 0 - # 将图像编码为JPEG - success, jpg_data = cv2.imencode('.jpg', frame) if not skip_img else (True, None) - if not success: - logger.error(f"摄像头 {cam_id} 编码图像失败") - return False + gas_alarm = 0 if float(gas_data) <= 0 else 1 - def get_cap_content(preset, gas_data): - map = { - "1": ('10006', '第三方施工监测', 1), - "2": ('10007', '闸井监测', 0), - "3": ('10008', '引入口监测', 0), - "4": ('10009', '调压箱监测 ', 0), - "5": ('10011', '燃气泄漏', 0 if float(gas_data) <= 0 else 1), - } - return map.get(str(preset), ('', '', 0)) + # 获取基础配置 + report_type, content, status = self.get_cap_content(preset, gas_data) + jpg_data = None + + if not skip_img: + # 按优先级依次检查各种类型的检测框 + for box_type in ['WELL', 'ENTER', 'BOX']: + jpg_data, report_info = self._process_boxes_with_annotation( + frame, biz_boxes, box_type, gas_alarm + ) + if jpg_data is not None: + report_type, content = report_info + status = gas_alarm + break + + # 如果没有检测到任何目标,使用原始帧 + if jpg_data is None and not skip_img: + success, jpg_data = cv2.imencode('.jpg', frame) + if not success: + logger.error(f"摄像头 {cam_id} 编码图像失败") + return False - # 构建请求数据 - report_type, content, status = get_cap_content(preset, gas_data) - - if int(preset) == 5: - status = 0 if float(gas_data) <= 0 else 1 - + request_data = { "routeNumber": preset, - "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if (jpg_data is not None and jpg_data.size > 0) else '', + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', "reportType": report_type, "reportContent": content, "isAlarm": status, @@ -79,7 +129,8 @@ if match: preset, gas_data, longitude, latitude = match.groups() - logger.debug(f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") + logger.debug( + f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") # 对所有摄像头执行 for cam_id, processor in self.camera_processors.items(): diff --git a/image_plotting.py b/image_plotting.py new file mode 100644 index 0000000..e0af106 --- /dev/null +++ b/image_plotting.py @@ -0,0 +1,130 @@ +from collections.abc import Sequence + +import numpy as np +import cv2 +from PIL import Image, ImageDraw, ImageFont + + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) + + +class Colors: + """ + Ultralytics default color palette https://ultralytics.com/. + + This class provides methods to work with the Ultralytics color palette, including converting hex color codes to + RGB values. + + Attributes: + palette (list of tuple): List of RGB color values. + n (int): The number of colors in the palette. + pose_palette (np.ndarray): A specific color palette array with dtype np.uint8. + """ + + def __init__(self): + """Initialize colors as hex = matplotlib.colors.TABLEAU_COLORS.values().""" + hexs = ( + "FF3838", + "FF9D97", + "FF701F", + "FFB21D", + "CFD231", + "48F90A", + "92CC17", + "3DDB86", + "1A9334", + "00D4BB", + "2C99A8", + "00C2FF", + "344593", + "6473FF", + "0018EC", + "8438FF", + "520085", + "CB38FF", + "FF95C8", + "FF37C7", + ) + self.palette = [self.hex2rgb(f"#{c}") for c in hexs] + self.n = len(self.palette) + self.pose_palette = np.array( + [ + [255, 128, 0], + [255, 153, 51], + [255, 178, 102], + [230, 230, 0], + [255, 153, 255], + [153, 204, 255], + [255, 102, 255], + [255, 51, 255], + [102, 178, 255], + [51, 153, 255], + [255, 153, 153], + [255, 102, 102], + [255, 51, 51], + [153, 255, 153], + [102, 255, 102], + [51, 255, 51], + [0, 255, 0], + [0, 0, 255], + [255, 0, 0], + [255, 255, 255], + ], + dtype=np.uint8, + ) + + def __call__(self, i, bgr=False): + """Converts hex color codes to RGB values.""" + c = self.palette[int(i) % self.n] + return (c[2], c[1], c[0]) if bgr else c + + @staticmethod + def hex2rgb(h): + """Converts hex color codes to RGB values (i.e. default PIL order).""" + return tuple(int(h[1 + i: 1 + i + 2], 16) for i in (0, 2, 4)) + + +colors = Colors() + + +class Annotator: + + def __init__(self, im, line_width=None, font_size=None, font="Arial.ttf", pil=False, example="abc"): + self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) + + self.im = Image.fromarray(im) + self.draw = ImageDraw.Draw(self.im) + + font = 'static/font/Arial.Unicode.ttf' + size = font_size or max(round(sum(self.im.size) / 2 * 0.035), 12) + self.font = ImageFont.truetype(str(font), size) + if not hasattr(self.font, 'getsize'): + self.font.getsize = lambda x: self.font.getbbox(x)[2:4] + + self.tf = max(self.lw - 1, 1) # font thickness + self.sf = self.lw / 3 # font scale + + def box_label(self, box, label="", color=(128, 128, 128), txt_color=(255, 255, 255), rotated=False): + if not isinstance(box, Sequence): + box = box.tolist() + if rotated: + p1 = box[0] + # NOTE: PIL-version polygon needs tuple type. + self.draw.polygon([tuple(b) for b in box], width=self.lw, outline=color) + else: + p1 = (box[0], box[1]) + self.draw.rectangle(box, width=self.lw, outline=color) # box + if label: + w, h = self.font.getsize(label) # text width, height + outside = p1[1] - h >= 0 # label fits outside box + self.draw.rectangle( + (p1[0], p1[1] - h if outside else p1[1], p1[0] + w + 1, p1[1] + 1 if outside else p1[1] + h + 1), + fill=color, + ) + # self.draw.text((box[0], box[1]), label, fill=txt_color, font=self.font, anchor='ls') # for PIL>8.0 + self.draw.text((p1[0], p1[1] - h if outside else p1[1]), label, fill=txt_color, font=self.font) + + def result(self): + """Return annotated image as array.""" + return np.asarray(self.im) \ No newline at end of file diff --git a/static/font/Arial.Unicode.ttf b/static/font/Arial.Unicode.ttf new file mode 100644 index 0000000..1537c5b --- /dev/null +++ b/static/font/Arial.Unicode.ttf Binary files differ diff --git a/camera_processor.py b/camera_processor.py index 5675e58..4b0028c 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -1,4 +1,7 @@ # camera_processor.py +import base64 +from copy import deepcopy + import cv2 import threading import time @@ -6,7 +9,9 @@ import json import os -from global_logger import logger +from config import BIZ_CLASS +from global_logger import logger, process_log_data +from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): @@ -23,6 +28,8 @@ self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) + self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) + self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None @@ -51,15 +58,19 @@ self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 - + # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") - + # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() + self.last_biz_boxes = [] + + self.last_alarm_ts = None + self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -68,8 +79,8 @@ self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") - # self.cap = cv2.VideoCapture(self.gst_str) - self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) + self.cap = cv2.VideoCapture(self.gst_str) + # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") @@ -121,8 +132,6 @@ self.loop ) - # 如果有额外HTTP处理逻辑,可在此添加 - def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: @@ -135,17 +144,24 @@ for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] - + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + self.last_biz_boxes = [] + # 用于保存的带标注图像 - annotated_frame = frame.copy() if self.save_annotated_images else None - + annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None + + frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) + frame_labels.append(label) # print(label) - + # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ @@ -157,34 +173,72 @@ self.loop ) + if label in self.biz_cls: + self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) + # 在复制的图像上绘制标注 if self.save_annotated_images: - self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf) - + annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) + + print(self.last_biz_boxes) + + # 上传第三方施工报警 + if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): + + if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: + continue + + alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] + alarm_annotator = Annotator(deepcopy(frame)) + for box in alarm_boxes: + alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) + + success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) + if not success: + logger.error(f"摄像头 {self.cam_id} 编码图像失败") + continue + + alarm_labels = [box[5] for box in alarm_boxes] + request_data = { + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', + "reportType": '10006', + "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', + "isAlarm": 1, + } + self.last_alarm_ts = time.time() + logger.debug(process_log_data(request_data)) + # 发送HTTP请求 + asyncio.run_coroutine_threadsafe( + self.http_client.send([request_data]), + self.loop + ) + # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: + annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) - + def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - + # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - + return image - + def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" - + # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") @@ -205,7 +259,6 @@ self.frame_buffer = [] self.frame_info_buffer = [] - def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() @@ -224,9 +277,9 @@ """ with self.last_frame_lock: if self.last_frame is None: - return None - return self.last_frame.copy() - + return None, [] + print(f'====================={self.last_biz_boxes}') + return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" @@ -268,9 +321,9 @@ # 读取成功后,重置失败计数 failure_count = 0 - # 保存最后一帧 - with self.last_frame_lock: - self.last_frame = frame.copy() + # # 保存最后一帧 + # with self.last_frame_lock: + # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 diff --git a/config.py b/config.py index e3438b0..eeffc86 100644 --- a/config.py +++ b/config.py @@ -1,49 +1,51 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True, - "frame_interval": 5, - "receive_capture_command": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True, - "frame_interval": 5, - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], - # "frame_interval": 5, - # "remark": "本地测试摄像头", + # "alarm_send_cls": ["路锥", "反光衣"], + # "remark": "机械臂摄像头", # "save_annotated_images": True, + # "frame_interval": 5, # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "alarm_send_cls": ["路锥", "反光衣"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { @@ -52,32 +54,33 @@ } HTTP_SERVER = { - "url": "http://111.198.10.15:11645/algorithm/receiveData", + # "url": "http://111.198.10.15:11645/algorithm/receiveData", # "url": "http://192.168.150.39:30028/algorithm/receiveData", + "url": "http://127.0.0.1:8000/test", "timeout": 10 # 超时重试 } MODEL_CLASS = { - 0: '井盖', - 1: '井盖塌陷', - 2: '井盖眼', - 3: '井盖破损', - 4: '井盖移位', - 5: '井盖缺失', - 6: '人', - 7: '压路机', - 8: '反光衣', - 9: '土堆', - 10: '土方车', - 11: '头', - 12: '安全帽', - 13: '挖掘机', - 14: '推土机', - 15: '施工路牌', - 16: '水马', - 17: '路锥', - 18: '铁锹', - 19: '防护栏', + 0: '井盖', + 1: '井盖塌陷', + 2: '井盖眼', + 3: '井盖破损', + 4: '井盖移位', + 5: '井盖缺失', + 6: '人', + 7: '压路机', + 8: '反光衣', + 9: '土堆', + 10: '土方车', + 11: '头', + 12: '安全帽', + 13: '挖掘机', + 14: '推土机', + 15: '施工路牌', + 16: '水马', + 17: '路锥', + 18: '铁锹', + 19: '防护栏', 20: '风镐' } @@ -86,4 +89,20 @@ "size": 640, "class_map": MODEL_CLASS, "batch_size": 1, +} + +BIZ_CLASS = { + "WELL": ['井盖', '井盖塌陷', '井盖眼', '井盖破损', '井盖移位', '井盖缺失', ], + "ENTER": [], + "BOX": [], + "CONSTRUCTION": ['反光衣', '路锥'] +} + + +PRESET_CONTENT_MAP = { + "1": ('10006', '第三方施工监测', 1), + "2": ('10007', '闸井监测', None), + "3": ('10008', '引入口监测', None), + "4": ('10009', '调压箱监测 ', None), + "5": ('10011', '燃气泄漏', None), # 使用None表示需要根据gas_data计算 } \ No newline at end of file diff --git a/global_logger.py b/global_logger.py index fed8eee..bf4726d 100644 --- a/global_logger.py +++ b/global_logger.py @@ -42,7 +42,8 @@ logger.addHandler(handler) # 创建控制台处理器 -console_handler = logging.StreamHandler() +utf8_stream = open(sys.__stdout__.fileno(), mode='w', encoding='utf-8', buffering=1) +console_handler = logging.StreamHandler(utf8_stream) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(formatter) logger.addHandler(console_handler) diff --git a/handle_tcp_command.py b/handle_tcp_command.py index 8e7d5d8..3964d20 100644 --- a/handle_tcp_command.py +++ b/handle_tcp_command.py @@ -3,8 +3,13 @@ import json import re import time +from copy import deepcopy + import cv2 + +from config import BIZ_CLASS, PRESET_CONTENT_MAP from global_logger import logger, process_log_data +from image_plotting import Annotator, COLOR_RED, COLOR_BLUE class HandelTCPCommand: @@ -12,6 +17,47 @@ self.camera_processors = camera_processors self.http_client = http_client + @staticmethod + def get_cap_content(preset, gas_data): + """获取捕获内容的配置信息""" + content = PRESET_CONTENT_MAP.get(str(preset)) + if content is None: + return '', '', 0 + + report_type, desc, status = content + if status is None: # 燃气泄漏特殊处理 + status = 0 if float(gas_data) <= 0 else 1 + return report_type, desc, status + + def _process_boxes_with_annotation(self, frame, biz_boxes, box_type, gas_alarm): + """处理特定类型的检测框并添加标注""" + boxes = [box for box in biz_boxes if box[5] in BIZ_CLASS[box_type]] + if not boxes: + return None, None + + annotator = Annotator(deepcopy(frame)) + color = COLOR_RED if gas_alarm else COLOR_BLUE + + for box in boxes: + annotator.box_label( + box=[box[0], box[1], box[2], box[3]], + label='井盖' if box[5].index('井盖')>=0 else box[5], + color=color + ) + + success, jpg_data = cv2.imencode('.jpg', annotator.result()) + if not success: + return None, None + + # 根据不同类型返回对应的报告信息 + report_info = { + 'WELL': ('10007', f'井盖完整,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'ENTER': ('10008', f'检测到引入口,燃气浓度{"正常" if not gas_alarm else "异常"}'), + 'BOX': ('10009', f'检测到调压箱,燃气浓度{"正常" if not gas_alarm else "异常"}') + } + + return jpg_data, report_info.get(box_type) + async def capture_and_send_current_frame(self, camera_processor, preset, gas_data, longitude, latitude): """捕获当前帧并通过HTTP发送到后台""" @@ -19,38 +65,42 @@ logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧") # 使用get_last_frame方法获取最后一帧,不直接访问内部属性 - frame = camera_processor.get_last_frame() + frame, biz_boxes = camera_processor.get_last_frame() if frame is None: logger.warning(f"摄像头 {cam_id} 没有可用的帧") return False try: skip_img = int(preset) == 5 and float(gas_data) <= 0 - # 将图像编码为JPEG - success, jpg_data = cv2.imencode('.jpg', frame) if not skip_img else (True, None) - if not success: - logger.error(f"摄像头 {cam_id} 编码图像失败") - return False + gas_alarm = 0 if float(gas_data) <= 0 else 1 - def get_cap_content(preset, gas_data): - map = { - "1": ('10006', '第三方施工监测', 1), - "2": ('10007', '闸井监测', 0), - "3": ('10008', '引入口监测', 0), - "4": ('10009', '调压箱监测 ', 0), - "5": ('10011', '燃气泄漏', 0 if float(gas_data) <= 0 else 1), - } - return map.get(str(preset), ('', '', 0)) + # 获取基础配置 + report_type, content, status = self.get_cap_content(preset, gas_data) + jpg_data = None + + if not skip_img: + # 按优先级依次检查各种类型的检测框 + for box_type in ['WELL', 'ENTER', 'BOX']: + jpg_data, report_info = self._process_boxes_with_annotation( + frame, biz_boxes, box_type, gas_alarm + ) + if jpg_data is not None: + report_type, content = report_info + status = gas_alarm + break + + # 如果没有检测到任何目标,使用原始帧 + if jpg_data is None and not skip_img: + success, jpg_data = cv2.imencode('.jpg', frame) + if not success: + logger.error(f"摄像头 {cam_id} 编码图像失败") + return False - # 构建请求数据 - report_type, content, status = get_cap_content(preset, gas_data) - - if int(preset) == 5: - status = 0 if float(gas_data) <= 0 else 1 - + request_data = { "routeNumber": preset, - "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if (jpg_data is not None and jpg_data.size > 0) else '', + "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( + jpg_data is not None and jpg_data.size > 0) else '', "reportType": report_type, "reportContent": content, "isAlarm": status, @@ -79,7 +129,8 @@ if match: preset, gas_data, longitude, latitude = match.groups() - logger.debug(f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") + logger.debug( + f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") # 对所有摄像头执行 for cam_id, processor in self.camera_processors.items(): diff --git a/image_plotting.py b/image_plotting.py new file mode 100644 index 0000000..e0af106 --- /dev/null +++ b/image_plotting.py @@ -0,0 +1,130 @@ +from collections.abc import Sequence + +import numpy as np +import cv2 +from PIL import Image, ImageDraw, ImageFont + + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) + + +class Colors: + """ + Ultralytics default color palette https://ultralytics.com/. + + This class provides methods to work with the Ultralytics color palette, including converting hex color codes to + RGB values. + + Attributes: + palette (list of tuple): List of RGB color values. + n (int): The number of colors in the palette. + pose_palette (np.ndarray): A specific color palette array with dtype np.uint8. + """ + + def __init__(self): + """Initialize colors as hex = matplotlib.colors.TABLEAU_COLORS.values().""" + hexs = ( + "FF3838", + "FF9D97", + "FF701F", + "FFB21D", + "CFD231", + "48F90A", + "92CC17", + "3DDB86", + "1A9334", + "00D4BB", + "2C99A8", + "00C2FF", + "344593", + "6473FF", + "0018EC", + "8438FF", + "520085", + "CB38FF", + "FF95C8", + "FF37C7", + ) + self.palette = [self.hex2rgb(f"#{c}") for c in hexs] + self.n = len(self.palette) + self.pose_palette = np.array( + [ + [255, 128, 0], + [255, 153, 51], + [255, 178, 102], + [230, 230, 0], + [255, 153, 255], + [153, 204, 255], + [255, 102, 255], + [255, 51, 255], + [102, 178, 255], + [51, 153, 255], + [255, 153, 153], + [255, 102, 102], + [255, 51, 51], + [153, 255, 153], + [102, 255, 102], + [51, 255, 51], + [0, 255, 0], + [0, 0, 255], + [255, 0, 0], + [255, 255, 255], + ], + dtype=np.uint8, + ) + + def __call__(self, i, bgr=False): + """Converts hex color codes to RGB values.""" + c = self.palette[int(i) % self.n] + return (c[2], c[1], c[0]) if bgr else c + + @staticmethod + def hex2rgb(h): + """Converts hex color codes to RGB values (i.e. default PIL order).""" + return tuple(int(h[1 + i: 1 + i + 2], 16) for i in (0, 2, 4)) + + +colors = Colors() + + +class Annotator: + + def __init__(self, im, line_width=None, font_size=None, font="Arial.ttf", pil=False, example="abc"): + self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) + + self.im = Image.fromarray(im) + self.draw = ImageDraw.Draw(self.im) + + font = 'static/font/Arial.Unicode.ttf' + size = font_size or max(round(sum(self.im.size) / 2 * 0.035), 12) + self.font = ImageFont.truetype(str(font), size) + if not hasattr(self.font, 'getsize'): + self.font.getsize = lambda x: self.font.getbbox(x)[2:4] + + self.tf = max(self.lw - 1, 1) # font thickness + self.sf = self.lw / 3 # font scale + + def box_label(self, box, label="", color=(128, 128, 128), txt_color=(255, 255, 255), rotated=False): + if not isinstance(box, Sequence): + box = box.tolist() + if rotated: + p1 = box[0] + # NOTE: PIL-version polygon needs tuple type. + self.draw.polygon([tuple(b) for b in box], width=self.lw, outline=color) + else: + p1 = (box[0], box[1]) + self.draw.rectangle(box, width=self.lw, outline=color) # box + if label: + w, h = self.font.getsize(label) # text width, height + outside = p1[1] - h >= 0 # label fits outside box + self.draw.rectangle( + (p1[0], p1[1] - h if outside else p1[1], p1[0] + w + 1, p1[1] + 1 if outside else p1[1] + h + 1), + fill=color, + ) + # self.draw.text((box[0], box[1]), label, fill=txt_color, font=self.font, anchor='ls') # for PIL>8.0 + self.draw.text((p1[0], p1[1] - h if outside else p1[1]), label, fill=txt_color, font=self.font) + + def result(self): + """Return annotated image as array.""" + return np.asarray(self.im) \ No newline at end of file diff --git a/static/font/Arial.Unicode.ttf b/static/font/Arial.Unicode.ttf new file mode 100644 index 0000000..1537c5b --- /dev/null +++ b/static/font/Arial.Unicode.ttf Binary files differ diff --git a/static/font/Arial.ttf b/static/font/Arial.ttf new file mode 100644 index 0000000..ab68fb1 --- /dev/null +++ b/static/font/Arial.ttf Binary files differ