diff --git a/README.md b/README.md index 9b195a6..e22f461 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,13 @@ | 8 | 标志位 | FA 表示结束 | ## 安装 -```angular2html -docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server ultralytics/ultralytics:latest-jetson-jetpack5 -``` +### 算法服务 ```angular2html docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server go-algo-server:v1.0 +``` + +### nginx +```angular2html +docker run --name nginx -d -p 80:80 --restart always -v /home/unitree/web/nginx.conf:/etc/nginx/nginx.conf -v /home/unitree/web/:/usr/share/nginx/html nginx:stable-alpine-slim ``` \ No newline at end of file diff --git a/README.md b/README.md index 9b195a6..e22f461 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,13 @@ | 8 | 标志位 | FA 表示结束 | ## 安装 -```angular2html -docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server ultralytics/ultralytics:latest-jetson-jetpack5 -``` +### 算法服务 ```angular2html docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server go-algo-server:v1.0 +``` + +### nginx +```angular2html +docker run --name nginx -d -p 80:80 --restart always -v /home/unitree/web/nginx.conf:/etc/nginx/nginx.conf -v /home/unitree/web/:/usr/share/nginx/html nginx:stable-alpine-slim ``` \ No newline at end of file diff --git a/app.py b/app.py index e32cd3b..8d34327 100644 --- a/app.py +++ b/app.py @@ -5,27 +5,42 @@ from http_client import AsyncHTTPClient from model_wrapper import ModelWrapper from camera_processor import CameraProcessor +from handle_tcp_command import HandelTCPCommand from global_logger import logger async def main(): logger.info("开始启动算法分析服务") + + cap_camera_processors = {} + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) - http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + cap_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + alarm_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) # 启动 TCP 和 HTTP 的发送任务 asyncio.create_task(tcp_client.send_loop()) - asyncio.create_task(http_client.send_loop()) + asyncio.create_task(tcp_client.receive_loop()) + asyncio.create_task(cap_upload_http_client.send_loop()) + asyncio.create_task(alarm_upload_http_client.send_loop()) model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) # 为每个摄像头启动一个处理线程,并传入事件循环 for camera_config in CAMERAS: - camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, alarm_upload_http_client, loop, MODEL["batch_size"]) camera_thread.start() + if camera_config.get("receive_capture_command", False): + cam_id = camera_config.get('cam_id') + cap_camera_processors[cam_id] = camera_thread + + # 创建TCP命令处理器并注册回调,直接传入http_client + handler = HandelTCPCommand(cap_camera_processors, cap_upload_http_client) + tcp_client.register_command_handler(handler.handle_capture_command) + while True: await asyncio.sleep(1) diff --git a/README.md b/README.md index 9b195a6..e22f461 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,13 @@ | 8 | 标志位 | FA 表示结束 | ## 安装 -```angular2html -docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server ultralytics/ultralytics:latest-jetson-jetpack5 -``` +### 算法服务 ```angular2html docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server go-algo-server:v1.0 +``` + +### nginx +```angular2html +docker run --name nginx -d -p 80:80 --restart always -v /home/unitree/web/nginx.conf:/etc/nginx/nginx.conf -v /home/unitree/web/:/usr/share/nginx/html nginx:stable-alpine-slim ``` \ No newline at end of file diff --git a/app.py b/app.py index e32cd3b..8d34327 100644 --- a/app.py +++ b/app.py @@ -5,27 +5,42 @@ from http_client import AsyncHTTPClient from model_wrapper import ModelWrapper from camera_processor import CameraProcessor +from handle_tcp_command import HandelTCPCommand from global_logger import logger async def main(): logger.info("开始启动算法分析服务") + + cap_camera_processors = {} + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) - http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + cap_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + alarm_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) # 启动 TCP 和 HTTP 的发送任务 asyncio.create_task(tcp_client.send_loop()) - asyncio.create_task(http_client.send_loop()) + asyncio.create_task(tcp_client.receive_loop()) + asyncio.create_task(cap_upload_http_client.send_loop()) + asyncio.create_task(alarm_upload_http_client.send_loop()) model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) # 为每个摄像头启动一个处理线程,并传入事件循环 for camera_config in CAMERAS: - camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, alarm_upload_http_client, loop, MODEL["batch_size"]) camera_thread.start() + if camera_config.get("receive_capture_command", False): + cam_id = camera_config.get('cam_id') + cap_camera_processors[cam_id] = camera_thread + + # 创建TCP命令处理器并注册回调,直接传入http_client + handler = HandelTCPCommand(cap_camera_processors, cap_upload_http_client) + tcp_client.register_command_handler(handler.handle_capture_command) + while True: await asyncio.sleep(1) diff --git a/camera_processor.py b/camera_processor.py index 93277e8..5675e58 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -49,13 +49,17 @@ # 添加图像保存相关参数 self.save_annotated_images = self.camera_config.get('save_annotated_images', False) self.save_path = self.camera_config.get('save_path', './saved_images') - self.save_interval = self.camera_config.get('save_interval', 25) # 每隔多少帧保存一次 + self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") + + # 最后捕获的帧 + self.last_frame = None + self.last_frame_lock = threading.Lock() def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -212,6 +216,18 @@ logger.info(f"FPS (detect) for cam {self.cam_id}: {fps}") self.fps_ts = current_time + def get_last_frame(self): + """获取最后捕获的帧,如果没有则返回None + + Returns: + numpy.ndarray 或 None: 返回帧的副本,或者在没有可用帧时返回None + """ + with self.last_frame_lock: + if self.last_frame is None: + return None + return self.last_frame.copy() + + def run(self): """摄像头处理主循环""" logger.info(f"摄像头处理线程 {self.cam_id} 启动") @@ -251,6 +267,11 @@ else: # 读取成功后,重置失败计数 failure_count = 0 + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + # 抽帧处理 self.frame_count += 1 if self.frame_count % self.frame_interval == 0: diff --git a/README.md b/README.md index 9b195a6..e22f461 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,13 @@ | 8 | 标志位 | FA 表示结束 | ## 安装 -```angular2html -docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server ultralytics/ultralytics:latest-jetson-jetpack5 -``` +### 算法服务 ```angular2html docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server go-algo-server:v1.0 +``` + +### nginx +```angular2html +docker run --name nginx -d -p 80:80 --restart always -v /home/unitree/web/nginx.conf:/etc/nginx/nginx.conf -v /home/unitree/web/:/usr/share/nginx/html nginx:stable-alpine-slim ``` \ No newline at end of file diff --git a/app.py b/app.py index e32cd3b..8d34327 100644 --- a/app.py +++ b/app.py @@ -5,27 +5,42 @@ from http_client import AsyncHTTPClient from model_wrapper import ModelWrapper from camera_processor import CameraProcessor +from handle_tcp_command import HandelTCPCommand from global_logger import logger async def main(): logger.info("开始启动算法分析服务") + + cap_camera_processors = {} + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) - http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + cap_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + alarm_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) # 启动 TCP 和 HTTP 的发送任务 asyncio.create_task(tcp_client.send_loop()) - asyncio.create_task(http_client.send_loop()) + asyncio.create_task(tcp_client.receive_loop()) + asyncio.create_task(cap_upload_http_client.send_loop()) + asyncio.create_task(alarm_upload_http_client.send_loop()) model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) # 为每个摄像头启动一个处理线程,并传入事件循环 for camera_config in CAMERAS: - camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, alarm_upload_http_client, loop, MODEL["batch_size"]) camera_thread.start() + if camera_config.get("receive_capture_command", False): + cam_id = camera_config.get('cam_id') + cap_camera_processors[cam_id] = camera_thread + + # 创建TCP命令处理器并注册回调,直接传入http_client + handler = HandelTCPCommand(cap_camera_processors, cap_upload_http_client) + tcp_client.register_command_handler(handler.handle_capture_command) + while True: await asyncio.sleep(1) diff --git a/camera_processor.py b/camera_processor.py index 93277e8..5675e58 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -49,13 +49,17 @@ # 添加图像保存相关参数 self.save_annotated_images = self.camera_config.get('save_annotated_images', False) self.save_path = self.camera_config.get('save_path', './saved_images') - self.save_interval = self.camera_config.get('save_interval', 25) # 每隔多少帧保存一次 + self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") + + # 最后捕获的帧 + self.last_frame = None + self.last_frame_lock = threading.Lock() def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -212,6 +216,18 @@ logger.info(f"FPS (detect) for cam {self.cam_id}: {fps}") self.fps_ts = current_time + def get_last_frame(self): + """获取最后捕获的帧,如果没有则返回None + + Returns: + numpy.ndarray 或 None: 返回帧的副本,或者在没有可用帧时返回None + """ + with self.last_frame_lock: + if self.last_frame is None: + return None + return self.last_frame.copy() + + def run(self): """摄像头处理主循环""" logger.info(f"摄像头处理线程 {self.cam_id} 启动") @@ -251,6 +267,11 @@ else: # 读取成功后,重置失败计数 failure_count = 0 + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + # 抽帧处理 self.frame_count += 1 if self.frame_count % self.frame_interval == 0: diff --git a/config.py b/config.py index b6177c9..1e33aea 100644 --- a/config.py +++ b/config.py @@ -1,54 +1,58 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头", + # "save_annotated_images": True, # "frame_interval": 5, - # "remark": "本地测试摄像头", - # "save_annotated_images": True + # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { - "host": "127.0.0.1", + "host": "192.168.123.18", "port": 8888 } HTTP_SERVER = { - "url": "http://127.0.0.1:8000/alert", + "url": "http://127.0.0.1:8000/test", "timeout": 5 # 超时重试 } diff --git a/README.md b/README.md index 9b195a6..e22f461 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,13 @@ | 8 | 标志位 | FA 表示结束 | ## 安装 -```angular2html -docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server ultralytics/ultralytics:latest-jetson-jetpack5 -``` +### 算法服务 ```angular2html docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server go-algo-server:v1.0 +``` + +### nginx +```angular2html +docker run --name nginx -d -p 80:80 --restart always -v /home/unitree/web/nginx.conf:/etc/nginx/nginx.conf -v /home/unitree/web/:/usr/share/nginx/html nginx:stable-alpine-slim ``` \ No newline at end of file diff --git a/app.py b/app.py index e32cd3b..8d34327 100644 --- a/app.py +++ b/app.py @@ -5,27 +5,42 @@ from http_client import AsyncHTTPClient from model_wrapper import ModelWrapper from camera_processor import CameraProcessor +from handle_tcp_command import HandelTCPCommand from global_logger import logger async def main(): logger.info("开始启动算法分析服务") + + cap_camera_processors = {} + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) - http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + cap_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + alarm_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) # 启动 TCP 和 HTTP 的发送任务 asyncio.create_task(tcp_client.send_loop()) - asyncio.create_task(http_client.send_loop()) + asyncio.create_task(tcp_client.receive_loop()) + asyncio.create_task(cap_upload_http_client.send_loop()) + asyncio.create_task(alarm_upload_http_client.send_loop()) model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) # 为每个摄像头启动一个处理线程,并传入事件循环 for camera_config in CAMERAS: - camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, alarm_upload_http_client, loop, MODEL["batch_size"]) camera_thread.start() + if camera_config.get("receive_capture_command", False): + cam_id = camera_config.get('cam_id') + cap_camera_processors[cam_id] = camera_thread + + # 创建TCP命令处理器并注册回调,直接传入http_client + handler = HandelTCPCommand(cap_camera_processors, cap_upload_http_client) + tcp_client.register_command_handler(handler.handle_capture_command) + while True: await asyncio.sleep(1) diff --git a/camera_processor.py b/camera_processor.py index 93277e8..5675e58 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -49,13 +49,17 @@ # 添加图像保存相关参数 self.save_annotated_images = self.camera_config.get('save_annotated_images', False) self.save_path = self.camera_config.get('save_path', './saved_images') - self.save_interval = self.camera_config.get('save_interval', 25) # 每隔多少帧保存一次 + self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") + + # 最后捕获的帧 + self.last_frame = None + self.last_frame_lock = threading.Lock() def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -212,6 +216,18 @@ logger.info(f"FPS (detect) for cam {self.cam_id}: {fps}") self.fps_ts = current_time + def get_last_frame(self): + """获取最后捕获的帧,如果没有则返回None + + Returns: + numpy.ndarray 或 None: 返回帧的副本,或者在没有可用帧时返回None + """ + with self.last_frame_lock: + if self.last_frame is None: + return None + return self.last_frame.copy() + + def run(self): """摄像头处理主循环""" logger.info(f"摄像头处理线程 {self.cam_id} 启动") @@ -251,6 +267,11 @@ else: # 读取成功后,重置失败计数 failure_count = 0 + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + # 抽帧处理 self.frame_count += 1 if self.frame_count % self.frame_interval == 0: diff --git a/config.py b/config.py index b6177c9..1e33aea 100644 --- a/config.py +++ b/config.py @@ -1,54 +1,58 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头", + # "save_annotated_images": True, # "frame_interval": 5, - # "remark": "本地测试摄像头", - # "save_annotated_images": True + # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { - "host": "127.0.0.1", + "host": "192.168.123.18", "port": 8888 } HTTP_SERVER = { - "url": "http://127.0.0.1:8000/alert", + "url": "http://127.0.0.1:8000/test", "timeout": 5 # 超时重试 } diff --git a/handle_tcp_command.py b/handle_tcp_command.py new file mode 100644 index 0000000..c05d85c --- /dev/null +++ b/handle_tcp_command.py @@ -0,0 +1,95 @@ +import asyncio +import base64 +import json +import time +import cv2 +from global_logger import logger + + +class HandelTCPCommand: + def __init__(self, camera_processors, http_client): + self.camera_processors = camera_processors + self.http_client = http_client + + async def capture_and_send_current_frame(self, camera_processor, message): + """捕获当前帧并通过HTTP发送到后台""" + + cam_id = camera_processor.cam_id + logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧") + + # 使用get_last_frame方法获取最后一帧,不直接访问内部属性 + frame = camera_processor.get_last_frame() + if frame is None: + logger.warning(f"摄像头 {cam_id} 没有可用的帧") + return False + + try: + # 将图像编码为JPEG + success, jpg_data = cv2.imencode('.jpg', frame) + if not success: + logger.error(f"摄像头 {cam_id} 编码图像失败") + return False + + def get_cap_content(message): + map = { + "1": ('点1', '完成1'), + "2": ('点2', '完成2'), + "3": ('点3', '完成3'), + "4": ('点4', '完成4'), + } + return map.get(str(message).strip(), ('', '')) + + # 构建请求数据 + content, status = get_cap_content(message) + request_data = { + "camera_id": cam_id, + "timestamp": time.time(), + "image": base64.b64encode(jpg_data.tobytes()).decode('utf-8'), + 'content': content, + "status": status + } + + # 发送HTTP请求 + await self.http_client.send(json.dumps(request_data)) + logger.info(f"摄像头 {cam_id} 已发送当前帧到后台") + return True + except Exception as e: + logger.exception(f"发送当前帧时发生错误: {e}") + return False + + async def handle_capture_command(self, message): + """处理TCP服务器发来的命令""" + try: + logger.info(f"处理TCP命令: {message}") + + # 检查是否是捕获图像的命令 + if str(message) in ['1', '2', '3', '4']: # todo + + # 等待5秒 + logger.info(f"收到捕获图像命令,等待5秒...") + await asyncio.sleep(5) + + # 对所有摄像头执行 + for cam_id, processor in self.camera_processors.items(): + await self.capture_and_send_current_frame(processor, message) + + logger.info("图像捕获和发送完成") + + # 发送处理完成的响应 + response = { + "status": "success", + "timestamp": time.time(), + "message": "图像已成功捕获并发送" + } + return response + except json.JSONDecodeError: + logger.error(f"无法解析JSON命令: {message}") + except Exception as e: + logger.exception(f"处理TCP命令时出错: {e}") + + # 发送错误响应 + return { + "status": "error", + "timestamp": time.time(), + "message": "处理命令时出错" + } diff --git a/README.md b/README.md index 9b195a6..e22f461 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,13 @@ | 8 | 标志位 | FA 表示结束 | ## 安装 -```angular2html -docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server ultralytics/ultralytics:latest-jetson-jetpack5 -``` +### 算法服务 ```angular2html docker run -d --ipc=host --runtime=nvidia --network=host --privileged -v /var/run/docker.sock:/var/run/docker.sock -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro -v ~/docker_data:/data/ -v ~/docker_src:/code/ --restart=always --entrypoint /code/go-algo-server/run.sh --name algo-server go-algo-server:v1.0 +``` + +### nginx +```angular2html +docker run --name nginx -d -p 80:80 --restart always -v /home/unitree/web/nginx.conf:/etc/nginx/nginx.conf -v /home/unitree/web/:/usr/share/nginx/html nginx:stable-alpine-slim ``` \ No newline at end of file diff --git a/app.py b/app.py index e32cd3b..8d34327 100644 --- a/app.py +++ b/app.py @@ -5,27 +5,42 @@ from http_client import AsyncHTTPClient from model_wrapper import ModelWrapper from camera_processor import CameraProcessor +from handle_tcp_command import HandelTCPCommand from global_logger import logger async def main(): logger.info("开始启动算法分析服务") + + cap_camera_processors = {} + loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) - http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + cap_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) + alarm_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) # 启动 TCP 和 HTTP 的发送任务 asyncio.create_task(tcp_client.send_loop()) - asyncio.create_task(http_client.send_loop()) + asyncio.create_task(tcp_client.receive_loop()) + asyncio.create_task(cap_upload_http_client.send_loop()) + asyncio.create_task(alarm_upload_http_client.send_loop()) model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) # 为每个摄像头启动一个处理线程,并传入事件循环 for camera_config in CAMERAS: - camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, http_client, loop, + camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, alarm_upload_http_client, loop, MODEL["batch_size"]) camera_thread.start() + if camera_config.get("receive_capture_command", False): + cam_id = camera_config.get('cam_id') + cap_camera_processors[cam_id] = camera_thread + + # 创建TCP命令处理器并注册回调,直接传入http_client + handler = HandelTCPCommand(cap_camera_processors, cap_upload_http_client) + tcp_client.register_command_handler(handler.handle_capture_command) + while True: await asyncio.sleep(1) diff --git a/camera_processor.py b/camera_processor.py index 93277e8..5675e58 100644 --- a/camera_processor.py +++ b/camera_processor.py @@ -49,13 +49,17 @@ # 添加图像保存相关参数 self.save_annotated_images = self.camera_config.get('save_annotated_images', False) self.save_path = self.camera_config.get('save_path', './saved_images') - self.save_interval = self.camera_config.get('save_interval', 25) # 每隔多少帧保存一次 + self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") + + # 最后捕获的帧 + self.last_frame = None + self.last_frame_lock = threading.Lock() def _open_camera(self): """尝试打开摄像头,返回是否成功""" @@ -212,6 +216,18 @@ logger.info(f"FPS (detect) for cam {self.cam_id}: {fps}") self.fps_ts = current_time + def get_last_frame(self): + """获取最后捕获的帧,如果没有则返回None + + Returns: + numpy.ndarray 或 None: 返回帧的副本,或者在没有可用帧时返回None + """ + with self.last_frame_lock: + if self.last_frame is None: + return None + return self.last_frame.copy() + + def run(self): """摄像头处理主循环""" logger.info(f"摄像头处理线程 {self.cam_id} 启动") @@ -251,6 +267,11 @@ else: # 读取成功后,重置失败计数 failure_count = 0 + + # 保存最后一帧 + with self.last_frame_lock: + self.last_frame = frame.copy() + # 抽帧处理 self.frame_count += 1 if self.frame_count % self.frame_interval == 0: diff --git a/config.py b/config.py index b6177c9..1e33aea 100644 --- a/config.py +++ b/config.py @@ -1,54 +1,58 @@ # config.py CAMERAS = [ - { - "cam_id": 0, - "gst_str": ( - "v4l2src device=/dev/video0 ! " - "image/jpeg, width=1280, height=720, framerate=30/1 ! " - "jpegdec ! videoconvert ! appsink" - ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! " - # "image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! appsink" - # ), - # "gst_str": ( - # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " - # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" - # ), - "tcp_send_cls": ["井盖眼"], - "remark": "机械臂摄像头", - "save_annotated_images": True - }, - { - "cam_id": 1, - "gst_str": ( - "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " - "application/x-rtp, media=video, encoding-name=H264 ! " - "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " - "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" - ), - "tcp_send_cls": [], - "remark": "机器狗前置摄像头", - "save_annotated_images": True - }, # { # "cam_id": 0, - # "gst_str": 0, + # "gst_str": ( + # "v4l2src device=/dev/video0 ! " + # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # "jpegdec ! videoconvert ! appsink" + # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! " + # # "image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! appsink" + # # ), + # # "gst_str": ( + # # "v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=30/1 ! " + # # "jpegparse ! jpegdec ! videoconvert ! video/x-raw, format=BGR ! appsink drop=true sync=false" + # # ), # "tcp_send_cls": ["井盖眼"], + # "remark": "机械臂摄像头", + # "save_annotated_images": True, # "frame_interval": 5, - # "remark": "本地测试摄像头", - # "save_annotated_images": True + # "receive_capture_command": True # }, + # { + # "cam_id": 1, + # "gst_str": ( + # "udpsrc address=230.1.1.1 port=1720 multicast-iface=eth0 ! " + # "application/x-rtp, media=video, encoding-name=H264 ! " + # "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! " + # "video/x-raw,width=1280,height=720,format=BGR ! appsink drop=1" + # ), + # "tcp_send_cls": [], + # "remark": "机器狗前置摄像头", + # "save_annotated_images": True, + # "frame_interval": 5, + # }, + { + "cam_id": 0, + "gst_str": 0, + "tcp_send_cls": ["井盖眼"], + "frame_interval": 5, + "remark": "本地测试摄像头", + "save_annotated_images": True, + "receive_capture_command": True + }, ] TCP_SERVER = { - "host": "127.0.0.1", + "host": "192.168.123.18", "port": 8888 } HTTP_SERVER = { - "url": "http://127.0.0.1:8000/alert", + "url": "http://127.0.0.1:8000/test", "timeout": 5 # 超时重试 } diff --git a/handle_tcp_command.py b/handle_tcp_command.py new file mode 100644 index 0000000..c05d85c --- /dev/null +++ b/handle_tcp_command.py @@ -0,0 +1,95 @@ +import asyncio +import base64 +import json +import time +import cv2 +from global_logger import logger + + +class HandelTCPCommand: + def __init__(self, camera_processors, http_client): + self.camera_processors = camera_processors + self.http_client = http_client + + async def capture_and_send_current_frame(self, camera_processor, message): + """捕获当前帧并通过HTTP发送到后台""" + + cam_id = camera_processor.cam_id + logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧") + + # 使用get_last_frame方法获取最后一帧,不直接访问内部属性 + frame = camera_processor.get_last_frame() + if frame is None: + logger.warning(f"摄像头 {cam_id} 没有可用的帧") + return False + + try: + # 将图像编码为JPEG + success, jpg_data = cv2.imencode('.jpg', frame) + if not success: + logger.error(f"摄像头 {cam_id} 编码图像失败") + return False + + def get_cap_content(message): + map = { + "1": ('点1', '完成1'), + "2": ('点2', '完成2'), + "3": ('点3', '完成3'), + "4": ('点4', '完成4'), + } + return map.get(str(message).strip(), ('', '')) + + # 构建请求数据 + content, status = get_cap_content(message) + request_data = { + "camera_id": cam_id, + "timestamp": time.time(), + "image": base64.b64encode(jpg_data.tobytes()).decode('utf-8'), + 'content': content, + "status": status + } + + # 发送HTTP请求 + await self.http_client.send(json.dumps(request_data)) + logger.info(f"摄像头 {cam_id} 已发送当前帧到后台") + return True + except Exception as e: + logger.exception(f"发送当前帧时发生错误: {e}") + return False + + async def handle_capture_command(self, message): + """处理TCP服务器发来的命令""" + try: + logger.info(f"处理TCP命令: {message}") + + # 检查是否是捕获图像的命令 + if str(message) in ['1', '2', '3', '4']: # todo + + # 等待5秒 + logger.info(f"收到捕获图像命令,等待5秒...") + await asyncio.sleep(5) + + # 对所有摄像头执行 + for cam_id, processor in self.camera_processors.items(): + await self.capture_and_send_current_frame(processor, message) + + logger.info("图像捕获和发送完成") + + # 发送处理完成的响应 + response = { + "status": "success", + "timestamp": time.time(), + "message": "图像已成功捕获并发送" + } + return response + except json.JSONDecodeError: + logger.error(f"无法解析JSON命令: {message}") + except Exception as e: + logger.exception(f"处理TCP命令时出错: {e}") + + # 发送错误响应 + return { + "status": "error", + "timestamp": time.time(), + "message": "处理命令时出错" + } diff --git a/tcp_client.py b/tcp_client.py index ffff57c..96c2e0e 100644 --- a/tcp_client.py +++ b/tcp_client.py @@ -12,9 +12,20 @@ self.server_ip = server_ip self.server_port = server_port self.writer = None + self.reader = None self.queue = asyncio.Queue() self.connected = False self.running = True + self.command_callbacks = [] + + def register_command_handler(self, callback): + """注册命令处理回调函数 + + Args: + callback: 异步回调函数,接收消息字符串作为参数 + """ + self.command_callbacks.append(callback) + logger.info(f"已注册TCP命令处理回调函数: {callback.__name__}") async def connect(self): # 先关闭现有的writer @@ -29,7 +40,7 @@ while self.running: try: logger.info(f"连接 TCP 服务器 {self.server_ip}:{self.server_port}") - _, self.writer = await asyncio.open_connection(self.server_ip, self.server_port) + self.reader, self.writer = await asyncio.open_connection(self.server_ip, self.server_port) self.connected = True logger.info("TCP 连接成功") return @@ -68,6 +79,34 @@ logger.debug(f"添加消息到TCP队列: {message}") await self.queue.put((message, time.time(), expire_second)) + async def receive_loop(self): + """接收服务器消息的循环""" + while self.running: + if not self.connected: + logger.info(f"TCP 服务器 {self.server_ip}:{self.server_port}连接断开,重新连接") + await self.connect() + try: + if self.reader: + data = await self.reader.read(1024) + if not data: + logger.info("服务器断开连接") + self.connected = False + continue + + message = data.decode('utf-8').strip() + logger.info(f"收到TCP服务器消息: {message}") + + # 调用所有注册的回调函数处理消息 + for callback in self.command_callbacks: + try: + await callback(message) + except Exception as e: + logger.exception(f"处理TCP消息时出错: {e}") + except Exception as e: + logger.exception(f"TCP 接收失败: {e}") + self.connected = False + await asyncio.sleep(5) + class TCPClient: """TCP客户端管理"""