diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/requirements.txt b/requirements.txt index 2432943..9a7d88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi sqlmodel openpyxl -python-multipart \ No newline at end of file +python-multipart +docker \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/requirements.txt b/requirements.txt index 2432943..9a7d88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi sqlmodel openpyxl -python-multipart \ No newline at end of file +python-multipart +docker \ No newline at end of file diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..bb36274 --- /dev/null +++ b/run.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +set -e +set -x + +export PYTHONIOENCODING=utf-8 +export LANG=C.UTF-8 +export LC_ALL=C.UTF-8 + +cd /code/safe-algo-pro +python3 main.py diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/requirements.txt b/requirements.txt index 2432943..9a7d88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi sqlmodel openpyxl -python-multipart \ No newline at end of file +python-multipart +docker \ No newline at end of file diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..bb36274 --- /dev/null +++ b/run.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +set -e +set -x + +export PYTHONIOENCODING=utf-8 +export LANG=C.UTF-8 +export LC_ALL=C.UTF-8 + +cd /code/safe-algo-pro +python3 main.py diff --git a/services/device_model_relation_service.py b/services/device_model_relation_service.py index 49e167a..c32b5a5 100644 --- a/services/device_model_relation_service.py +++ b/services/device_model_relation_service.py @@ -41,8 +41,6 @@ algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, algo_model_name=model.name, algo_model_version=model.version, algo_model_path=model.path, @@ -52,14 +50,14 @@ ] return models_info + + def add_relations_by_device(self, device_id: int, relations: List[DeviceModelRelationCreate]): new_relations = [ DeviceModelRelation( algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, device_id=device_id, # 统一赋值 device_id createtime=datetime.now(), updatetime=datetime.now(), diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/requirements.txt b/requirements.txt index 2432943..9a7d88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi sqlmodel openpyxl -python-multipart \ No newline at end of file +python-multipart +docker \ No newline at end of file diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..bb36274 --- /dev/null +++ b/run.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +set -e +set -x + +export PYTHONIOENCODING=utf-8 +export LANG=C.UTF-8 +export LC_ALL=C.UTF-8 + +cd /code/safe-algo-pro +python3 main.py diff --git a/services/device_model_relation_service.py b/services/device_model_relation_service.py index 49e167a..c32b5a5 100644 --- a/services/device_model_relation_service.py +++ b/services/device_model_relation_service.py @@ -41,8 +41,6 @@ algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, algo_model_name=model.name, algo_model_version=model.version, algo_model_path=model.path, @@ -52,14 +50,14 @@ ] return models_info + + def add_relations_by_device(self, device_id: int, relations: List[DeviceModelRelationCreate]): new_relations = [ DeviceModelRelation( algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, device_id=device_id, # 统一赋值 device_id createtime=datetime.now(), updatetime=datetime.now(), diff --git a/services/device_scene_relation_service.py b/services/device_scene_relation_service.py new file mode 100644 index 0000000..592ee98 --- /dev/null +++ b/services/device_scene_relation_service.py @@ -0,0 +1,70 @@ +from datetime import datetime +from typing import List, Optional + +from sqlmodel import Session, select, delete + +from common.consts import NotifyChangeType +from common.global_thread_pool import GlobalThreadPool +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation +from entity.scene import Scene + + +class DeviceSceneRelationService: + def __init__(self, db: Session): + self.db = db + self.__relation_change_callbacks = [] # 用于存储回调函数 + self.thread_pool = GlobalThreadPool() + + def register_change_callback(self, callback): + """注册设备变化回调函数""" + self.__relation_change_callbacks.append(callback) + + def notify_change(self, device_id, change_type): + """当设备发生变化时,调用回调通知变化""" + for callback in self.__relation_change_callbacks: + self.thread_pool.executor.submit(callback, device_id, change_type) + + def get_device_scene(self, device_id: int) -> Optional[DeviceSceneRelationInfo]: + statement = ( + select(DeviceSceneRelation, Scene) + .join(Scene, DeviceSceneRelation.scene_id == Scene.id) + .where(DeviceSceneRelation.device_id == device_id) + ) + + # 执行联表查询 + result = self.db.exec(statement).first() + + scene_info = None + if result: + relation, scene = result[0], result[1] + scene_info = DeviceSceneRelationInfo( + id=relation.id, + device_id=relation.device_id, + scene_id=relation.scene_id, + scene_name=scene.name, + scene_version=scene.version, + scene_handle_task=scene.handle_task, + scene_remark=scene.remark, + ) + return scene_info + + def add_relation_by_device(self, device_id: int, scene_id: int): + new_relation = DeviceSceneRelation(device_id=device_id, scene_id=scene_id) + new_relation.create_time = datetime.now() + new_relation.update_time = datetime.now() + self.db.add(new_relation) + self.db.commit() + self.db.refresh(new_relation) + return new_relation + + def delete_relation_by_device(self, device_id: int): + statement = delete(DeviceSceneRelation).where(DeviceSceneRelation.device_id == device_id) + count = self.db.exec(statement) + self.db.commit() + return count.rowcount + + def update_relation_by_device(self, device_id: int, scene_id: int): + self.delete_relation_by_device(device_id) + new_relation = self.add_relation_by_device(device_id, scene_id) + self.notify_change(device_id, NotifyChangeType.DEVICE_SCENE_RELATION_UPDATE) + return new_relation diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/requirements.txt b/requirements.txt index 2432943..9a7d88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi sqlmodel openpyxl -python-multipart \ No newline at end of file +python-multipart +docker \ No newline at end of file diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..bb36274 --- /dev/null +++ b/run.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +set -e +set -x + +export PYTHONIOENCODING=utf-8 +export LANG=C.UTF-8 +export LC_ALL=C.UTF-8 + +cd /code/safe-algo-pro +python3 main.py diff --git a/services/device_model_relation_service.py b/services/device_model_relation_service.py index 49e167a..c32b5a5 100644 --- a/services/device_model_relation_service.py +++ b/services/device_model_relation_service.py @@ -41,8 +41,6 @@ algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, algo_model_name=model.name, algo_model_version=model.version, algo_model_path=model.path, @@ -52,14 +50,14 @@ ] return models_info + + def add_relations_by_device(self, device_id: int, relations: List[DeviceModelRelationCreate]): new_relations = [ DeviceModelRelation( algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, device_id=device_id, # 统一赋值 device_id createtime=datetime.now(), updatetime=datetime.now(), diff --git a/services/device_scene_relation_service.py b/services/device_scene_relation_service.py new file mode 100644 index 0000000..592ee98 --- /dev/null +++ b/services/device_scene_relation_service.py @@ -0,0 +1,70 @@ +from datetime import datetime +from typing import List, Optional + +from sqlmodel import Session, select, delete + +from common.consts import NotifyChangeType +from common.global_thread_pool import GlobalThreadPool +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation +from entity.scene import Scene + + +class DeviceSceneRelationService: + def __init__(self, db: Session): + self.db = db + self.__relation_change_callbacks = [] # 用于存储回调函数 + self.thread_pool = GlobalThreadPool() + + def register_change_callback(self, callback): + """注册设备变化回调函数""" + self.__relation_change_callbacks.append(callback) + + def notify_change(self, device_id, change_type): + """当设备发生变化时,调用回调通知变化""" + for callback in self.__relation_change_callbacks: + self.thread_pool.executor.submit(callback, device_id, change_type) + + def get_device_scene(self, device_id: int) -> Optional[DeviceSceneRelationInfo]: + statement = ( + select(DeviceSceneRelation, Scene) + .join(Scene, DeviceSceneRelation.scene_id == Scene.id) + .where(DeviceSceneRelation.device_id == device_id) + ) + + # 执行联表查询 + result = self.db.exec(statement).first() + + scene_info = None + if result: + relation, scene = result[0], result[1] + scene_info = DeviceSceneRelationInfo( + id=relation.id, + device_id=relation.device_id, + scene_id=relation.scene_id, + scene_name=scene.name, + scene_version=scene.version, + scene_handle_task=scene.handle_task, + scene_remark=scene.remark, + ) + return scene_info + + def add_relation_by_device(self, device_id: int, scene_id: int): + new_relation = DeviceSceneRelation(device_id=device_id, scene_id=scene_id) + new_relation.create_time = datetime.now() + new_relation.update_time = datetime.now() + self.db.add(new_relation) + self.db.commit() + self.db.refresh(new_relation) + return new_relation + + def delete_relation_by_device(self, device_id: int): + statement = delete(DeviceSceneRelation).where(DeviceSceneRelation.device_id == device_id) + count = self.db.exec(statement) + self.db.commit() + return count.rowcount + + def update_relation_by_device(self, device_id: int, scene_id: int): + self.delete_relation_by_device(device_id) + new_relation = self.add_relation_by_device(device_id, scene_id) + self.notify_change(device_id, NotifyChangeType.DEVICE_SCENE_RELATION_UPDATE) + return new_relation diff --git a/services/device_service.py b/services/device_service.py index 4aa8bd5..cad46d3 100644 --- a/services/device_service.py +++ b/services/device_service.py @@ -6,10 +6,12 @@ from sqlalchemy import func from sqlmodel import Session, select +from common.device_status_manager import DeviceStatusManager from common.global_thread_pool import GlobalThreadPool -from common.consts import NotifyChangeType -from entity.device import Device, DeviceCreate, DeviceUpdate +from common.consts import NotifyChangeType, DEVICE_MODE +from entity.device import Device, DeviceCreate, DeviceUpdate, DeviceInfo from services.device_model_relation_service import DeviceModelRelationService +from services.device_scene_relation_service import DeviceSceneRelationService class DeviceService: @@ -65,7 +67,7 @@ device_type: Optional[str] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[Device], int]: + ) -> Tuple[Sequence[DeviceInfo], int]: statement = self.device_query(code, device_type, name) # 查询总记录数 @@ -77,7 +79,36 @@ # 执行查询并返回结果 results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + device_list = results.all() + device_info_list = [] + if device_list: + device_model_relation_service = DeviceModelRelationService(self.db) + device_scene_relation_service = DeviceSceneRelationService(self.db) + device_status_manager = DeviceStatusManager() + for device in device_list: + model_relations = device_model_relation_service.get_device_models(device.id) + scene_relation = device_scene_relation_service.get_device_scene(device.id) + + device_info_list.append(DeviceInfo( + id=device.id, + name=device.name, + code=device.code, + type=device.type, + ip=device.ip, + gas_ip=device.gas_ip, + mode=device.mode, + input_stream_url=device.input_stream_url, + output_stream_url=device.output_stream_url, + image_save_interval=device.image_save_interval, + alarm_interval=device.alarm_interval, + + status="在线" if device_status_manager.get_status(device.id) else "离线", + relation_model_names=", ".join( + [relation.algo_model_name for relation in model_relations if relation.is_use == 1] + ) if model_relations else "—", + relation_scene_name=scene_relation.scene_name if scene_relation else "—" + )) + return device_info_list, total # 返回分页数据和总数 def device_query(self, code, device_type, name): # 构建查询语句 @@ -94,6 +125,7 @@ device = Device.model_validate(device_data) device.create_time = datetime.now() device.update_time = datetime.now() + self.handle_device_mode(device) self.db.add(device) self.db.commit() self.db.refresh(device) @@ -110,6 +142,7 @@ for key, value in update_data.items(): setattr(device, key, value) + self.handle_device_mode(device) device.update_time = datetime.now() self.db.add(device) self.db.commit() @@ -126,9 +159,24 @@ self.db.commit() self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE) - relation_service = DeviceModelRelationService(self.db) - relation_service.delete_relations_by_device(device_id) + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device_id) + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) return device + def handle_device_mode(self, device): + if device.mode == DEVICE_MODE.ALGO: + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) + elif device.mode == DEVICE_MODE.SCENE: + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device.id) + else: + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device.id) + def get_device(self, device_id: int): return self.db.get(Device, device_id) diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/requirements.txt b/requirements.txt index 2432943..9a7d88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi sqlmodel openpyxl -python-multipart \ No newline at end of file +python-multipart +docker \ No newline at end of file diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..bb36274 --- /dev/null +++ b/run.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +set -e +set -x + +export PYTHONIOENCODING=utf-8 +export LANG=C.UTF-8 +export LC_ALL=C.UTF-8 + +cd /code/safe-algo-pro +python3 main.py diff --git a/services/device_model_relation_service.py b/services/device_model_relation_service.py index 49e167a..c32b5a5 100644 --- a/services/device_model_relation_service.py +++ b/services/device_model_relation_service.py @@ -41,8 +41,6 @@ algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, algo_model_name=model.name, algo_model_version=model.version, algo_model_path=model.path, @@ -52,14 +50,14 @@ ] return models_info + + def add_relations_by_device(self, device_id: int, relations: List[DeviceModelRelationCreate]): new_relations = [ DeviceModelRelation( algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, device_id=device_id, # 统一赋值 device_id createtime=datetime.now(), updatetime=datetime.now(), diff --git a/services/device_scene_relation_service.py b/services/device_scene_relation_service.py new file mode 100644 index 0000000..592ee98 --- /dev/null +++ b/services/device_scene_relation_service.py @@ -0,0 +1,70 @@ +from datetime import datetime +from typing import List, Optional + +from sqlmodel import Session, select, delete + +from common.consts import NotifyChangeType +from common.global_thread_pool import GlobalThreadPool +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation +from entity.scene import Scene + + +class DeviceSceneRelationService: + def __init__(self, db: Session): + self.db = db + self.__relation_change_callbacks = [] # 用于存储回调函数 + self.thread_pool = GlobalThreadPool() + + def register_change_callback(self, callback): + """注册设备变化回调函数""" + self.__relation_change_callbacks.append(callback) + + def notify_change(self, device_id, change_type): + """当设备发生变化时,调用回调通知变化""" + for callback in self.__relation_change_callbacks: + self.thread_pool.executor.submit(callback, device_id, change_type) + + def get_device_scene(self, device_id: int) -> Optional[DeviceSceneRelationInfo]: + statement = ( + select(DeviceSceneRelation, Scene) + .join(Scene, DeviceSceneRelation.scene_id == Scene.id) + .where(DeviceSceneRelation.device_id == device_id) + ) + + # 执行联表查询 + result = self.db.exec(statement).first() + + scene_info = None + if result: + relation, scene = result[0], result[1] + scene_info = DeviceSceneRelationInfo( + id=relation.id, + device_id=relation.device_id, + scene_id=relation.scene_id, + scene_name=scene.name, + scene_version=scene.version, + scene_handle_task=scene.handle_task, + scene_remark=scene.remark, + ) + return scene_info + + def add_relation_by_device(self, device_id: int, scene_id: int): + new_relation = DeviceSceneRelation(device_id=device_id, scene_id=scene_id) + new_relation.create_time = datetime.now() + new_relation.update_time = datetime.now() + self.db.add(new_relation) + self.db.commit() + self.db.refresh(new_relation) + return new_relation + + def delete_relation_by_device(self, device_id: int): + statement = delete(DeviceSceneRelation).where(DeviceSceneRelation.device_id == device_id) + count = self.db.exec(statement) + self.db.commit() + return count.rowcount + + def update_relation_by_device(self, device_id: int, scene_id: int): + self.delete_relation_by_device(device_id) + new_relation = self.add_relation_by_device(device_id, scene_id) + self.notify_change(device_id, NotifyChangeType.DEVICE_SCENE_RELATION_UPDATE) + return new_relation diff --git a/services/device_service.py b/services/device_service.py index 4aa8bd5..cad46d3 100644 --- a/services/device_service.py +++ b/services/device_service.py @@ -6,10 +6,12 @@ from sqlalchemy import func from sqlmodel import Session, select +from common.device_status_manager import DeviceStatusManager from common.global_thread_pool import GlobalThreadPool -from common.consts import NotifyChangeType -from entity.device import Device, DeviceCreate, DeviceUpdate +from common.consts import NotifyChangeType, DEVICE_MODE +from entity.device import Device, DeviceCreate, DeviceUpdate, DeviceInfo from services.device_model_relation_service import DeviceModelRelationService +from services.device_scene_relation_service import DeviceSceneRelationService class DeviceService: @@ -65,7 +67,7 @@ device_type: Optional[str] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[Device], int]: + ) -> Tuple[Sequence[DeviceInfo], int]: statement = self.device_query(code, device_type, name) # 查询总记录数 @@ -77,7 +79,36 @@ # 执行查询并返回结果 results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + device_list = results.all() + device_info_list = [] + if device_list: + device_model_relation_service = DeviceModelRelationService(self.db) + device_scene_relation_service = DeviceSceneRelationService(self.db) + device_status_manager = DeviceStatusManager() + for device in device_list: + model_relations = device_model_relation_service.get_device_models(device.id) + scene_relation = device_scene_relation_service.get_device_scene(device.id) + + device_info_list.append(DeviceInfo( + id=device.id, + name=device.name, + code=device.code, + type=device.type, + ip=device.ip, + gas_ip=device.gas_ip, + mode=device.mode, + input_stream_url=device.input_stream_url, + output_stream_url=device.output_stream_url, + image_save_interval=device.image_save_interval, + alarm_interval=device.alarm_interval, + + status="在线" if device_status_manager.get_status(device.id) else "离线", + relation_model_names=", ".join( + [relation.algo_model_name for relation in model_relations if relation.is_use == 1] + ) if model_relations else "—", + relation_scene_name=scene_relation.scene_name if scene_relation else "—" + )) + return device_info_list, total # 返回分页数据和总数 def device_query(self, code, device_type, name): # 构建查询语句 @@ -94,6 +125,7 @@ device = Device.model_validate(device_data) device.create_time = datetime.now() device.update_time = datetime.now() + self.handle_device_mode(device) self.db.add(device) self.db.commit() self.db.refresh(device) @@ -110,6 +142,7 @@ for key, value in update_data.items(): setattr(device, key, value) + self.handle_device_mode(device) device.update_time = datetime.now() self.db.add(device) self.db.commit() @@ -126,9 +159,24 @@ self.db.commit() self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE) - relation_service = DeviceModelRelationService(self.db) - relation_service.delete_relations_by_device(device_id) + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device_id) + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) return device + def handle_device_mode(self, device): + if device.mode == DEVICE_MODE.ALGO: + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) + elif device.mode == DEVICE_MODE.SCENE: + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device.id) + else: + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device.id) + def get_device(self, device_id: int): return self.db.get(Device, device_id) diff --git a/services/model_service.py b/services/model_service.py index 6f93d40..ec1fd2a 100644 --- a/services/model_service.py +++ b/services/model_service.py @@ -12,7 +12,7 @@ from common.biz_exception import BizException from common.string_utils import snake_to_camel from entity.device_model_relation import DeviceModelRelation -from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate +from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from common.global_thread_pool import GlobalThreadPool from common.consts import NotifyChangeType @@ -45,7 +45,7 @@ remark: Optional[str] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[AlgoModel], int]: + ) -> Tuple[Sequence[AlgoModelInfo], int]: statement = self.model_query(name, remark) # 查询总记录数 @@ -56,8 +56,16 @@ statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 - results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + model_list = self.db.exec(statement) + model_info_list: List[AlgoModelInfo] = [] + if model_list: + for model in model_list: + model_info_list.append(AlgoModelInfo( + **model.dict(), + usage_status="使用中" if self.get_model_usage(model.id) else "未使用" + )) + + return model_info_list, total # 返回分页数据和总数 def model_query(self, name, remark): # 构建查询语句 @@ -184,5 +192,16 @@ results = self.db.exec(statement).all() return results + def get_model_usage(self, algo_model_id) -> bool: + statement = ( + select(DeviceModelRelation) + .where( + DeviceModelRelation.is_use == 1, + DeviceModelRelation.algo_model_id == algo_model_id, + ) + ) + result = self.db.exec(statement).all() + return len(result) > 0 + def get_model_by_id(self, model_id): return self.db.get(AlgoModel, model_id) diff --git a/.gitignore b/.gitignore index 6817508..cdefb40 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ test.py -/logs/* .idea storage -weights \ No newline at end of file +weights +logs +test* \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index 4ce760c..b5a2d9e 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -6,6 +6,7 @@ from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.string_utils import camel_to_snake @@ -53,6 +54,7 @@ self.frame_analysis_result_service = FrameAnalysisResultService(db) self.thread_pool = GlobalThreadPool() + self.device_status_manager = DeviceStatusManager() self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) @@ -106,6 +108,7 @@ if frame is None: continue + self.device_status_manager.set_status(device_id=self.device.id) results_map = {} for model_exec in self.model_exec_list: handle_task_name = model_exec.algo_model_info.handle_task diff --git a/apis/control.py b/apis/control.py new file mode 100644 index 0000000..1aaf3ea --- /dev/null +++ b/apis/control.py @@ -0,0 +1,86 @@ +import os +import platform +import subprocess +import sys +import threading +import time +import traceback + +from fastapi import APIRouter +import docker +import socket + +from apis.base import standard_error_response, standard_response +from common.global_logger import logger + +router = APIRouter() + + +def is_running_in_docker(): + """ 检测当前程序是否在 Docker 容器中运行 """ + try: + with open('/proc/1/cgroup', 'rt') as f: + return 'docker' in f.read() + except Exception: + return False + + +def is_windows_host(): + try: + # 尝试解析 host.docker.internal + socket.gethostbyname("host.docker.internal") + return True + except socket.error: + return False + + +@router.get("/restart") +def restart(): + try: + # 立即返回响应的函数 + def restart_container_async(): + try: + if is_running_in_docker(): + if is_windows_host(): + client = docker.DockerClient(base_url='tcp://host.docker.internal:2375') + else: + client = docker.DockerClient(base_url='unix://var/run/docker.sock') + container_id = client.containers.get(socket.gethostname()) + + # 重启容器 + container = client.containers.get(container_id) + container.restart() + logger.info("Container restarted successfully.") + else: + os_type = platform.system() + + # 获取当前脚本路径和命令行参数 + command = [sys.executable] + sys.argv + + if os_type == "Windows": + # Windows 环境下的重启逻辑 + subprocess.Popen(["start", "python"] + sys.argv, shell=True) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + elif os_type == "Linux" or os_type == "Darwin": + # Linux 和 macOS 环境下的重启逻辑 + full_command = f"nohup {' '.join(command)} &" + subprocess.Popen(full_command, shell=True, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + time.sleep(1) # 确保新进程启动 + sys.exit(0) # 退出当前进程 + + else: + print(f"Unsupported OS for restart: {os_type}") + except Exception as ex: + traceback.print_exc() + logger.error(f"Failed to restart container asynchronously: {ex}") + + # 在新线程中启动重启操作 + threading.Thread(target=restart_container_async).start() + + return standard_response() + except Exception as e: + traceback.print_exc() + return standard_error_response(code=500, message=f"Failed to restart container: {e}") diff --git a/apis/data_gas.py b/apis/data_gas.py index 1e59979..c9ce70f 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -22,6 +22,7 @@ start_time: Optional[str] = None, end_time: Optional[str] = None, offset: int = Query(0, ge=1), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DataGasService(db) diff --git a/apis/device.py b/apis/device.py index 15eed9a..f786851 100644 --- a/apis/device.py +++ b/apis/device.py @@ -25,7 +25,7 @@ return standard_response(data=devices) -@router.get("/page/", response_model=StandardResponse[PageResponse[Device]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[DeviceInfo]]) def get_device_page( name: Optional[str] = None, code: Optional[str] = None, diff --git a/apis/device_scene_realtion.py b/apis/device_scene_realtion.py new file mode 100644 index 0000000..9a30a44 --- /dev/null +++ b/apis/device_scene_realtion.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlmodel import Session + +from apis.base import StandardResponse, standard_response +from db.database import get_db +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation + +from services.device_scene_relation_service import DeviceSceneRelationService + +router = APIRouter() + + +@router.get("/get_by_device", response_model=StandardResponse[DeviceSceneRelationInfo]) +def list_by_device( + device_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + scene = service.get_device_scene(device_id) + return standard_response(data=scene) + + +@router.post("/update_by_device", response_model=StandardResponse[DeviceSceneRelation]) +def update_by_device(device_id: int, scene_id: int, + db: Session = Depends(get_db)): + service = DeviceSceneRelationService(db) + relation = service.update_relation_by_device(device_id, scene_id) + return standard_response(data=relation) diff --git a/apis/model.py b/apis/model.py index 83c594f..264e10a 100644 --- a/apis/model.py +++ b/apis/model.py @@ -24,7 +24,7 @@ return standard_response(data=models) -@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModel]]) +@router.get("/page/", response_model=StandardResponse[PageResponse[AlgoModelInfo]]) def get_model_page( name: Optional[str] = None, remark: Optional[str] = None, diff --git a/apis/router.py b/apis/router.py index 52ffa76..eb8fb59 100644 --- a/apis/router.py +++ b/apis/router.py @@ -3,8 +3,11 @@ from .device import router as devices_router from .model import router as models_router from .device_model_realtion import router as device_model_relation_router +from .scene import router as scene_router +from .device_scene_realtion import router as device_scene_relation_router from .frame import router as frame_router from .data_gas import router as gas_router +from .control import router as control_router # 创建一个全局的 router @@ -14,6 +17,8 @@ router.include_router(devices_router, prefix="/device", tags=["Devices"]) router.include_router(models_router, prefix="/model", tags=["Models"]) router.include_router(device_model_relation_router, prefix="/device_model_relation", tags=["DeviceModelRelations"]) +router.include_router(scene_router,prefix="/scene", tags=["Scene"]) +router.include_router(device_scene_relation_router, prefix="/device_scene_relation", tags=["DeviceSceneRelations"]) router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) - +router.include_router(control_router,prefix="/control", tags=["Control"]) diff --git a/apis/scene.py b/apis/scene.py new file mode 100644 index 0000000..b0a4d22 --- /dev/null +++ b/apis/scene.py @@ -0,0 +1,79 @@ +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query, UploadFile, File, Form +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse, PageResponse, standard_error_response, convert_page_param +from db.database import get_db +from entity.scene import Scene, SceneCreate, SceneUpdate, SceneInfo +from entity.scene import SceneInfo +from services.scene_service import SceneService + +from algo.algo_runner import AlgoRunner +from algo.algo_runner_manager import get_algo_runner + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[Scene]]) +def get_scene_list( + name: Optional[str] = None, + remark: Optional[str] = None, + db: Session = Depends(get_db)): + service = SceneService(db) + scenes = list(service.get_scene_list(name, remark)) + return standard_response(data=scenes) + + +@router.get("/page/", response_model=StandardResponse[PageResponse[SceneInfo]]) +def get_scene_page( + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = Query(0, ge=0), # 从第几页开始 + limit: int = Query(10, ge=1), # 每页显示多少条记录 + db: Session = Depends(get_db)): + service = SceneService(db) + + # 获取分页后的设备列表和总数 + offset, limit = convert_page_param(offset, limit) + scenes, total = service.get_scene_page(name, remark, offset, limit) + + return standard_response( + data=PageResponse(total=total, items=scenes) + ) + + +@router.post("/add", response_model=StandardResponse[SceneInfo]) +def create_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneCreate结构"), + file: UploadFile = File(..., description="模型文件"), + db: Session = Depends(get_db), + ): + # 检查文件类型 + if not file.filename.endswith(".zip"): + return standard_error_response(code=500, message="Only .zip files are allowed.") + + scene_data = SceneCreate.parse_raw(json_data) + service = SceneService(db) + scene = service.create_scene(scene_data, file) + return standard_response(data=scene) + + +@router.post("/update", response_model=StandardResponse[SceneInfo]) +def update_scene(json_data: str = Form(..., description="JSON数据字段,内容为SceneUpdate结构"), + file: UploadFile = File(None, description="模型文件"), + db: Session = Depends(get_db)): + service = SceneService(db) + scene_data = SceneUpdate.parse_raw(json_data) + scene = service.update_scene(scene_data, file) + if not scene: + return standard_error_response(data=scene_data, message="Scene not found") + return standard_response(data=scene) + + +@router.delete("/delete", response_model=StandardResponse[int]) +def delete_scene(scene_id: int, db: Session = Depends(get_db)): + service = SceneService(db) + scene = service.delete_scene(scene_id) + if not scene: + return standard_error_response(data=scene_id, message="Scene not found") + return standard_response(data=scene_id) diff --git a/common/consts.py b/common/consts.py index d24bd72..7a73c81 100644 --- a/common/consts.py +++ b/common/consts.py @@ -15,6 +15,8 @@ DEVICE_MODEL_RELATION_UPDATE = "device_model_relation_update" # 绑定关系变化 应该只用这个吧?? DEVICE_MODEL_RELATION_DELETE = "device_model_relation_delete" + SCENE_UPDATE = "scene_update" + DEVICE_SCENE_RELATION_UPDATE = "device_model_relation_update" class DEVICE_TYPE(Constants): CAMERA = 1 @@ -24,3 +26,8 @@ class TREE_COMMAND(Constants): # 甲烷查询指令 GAS_QUERY = b'\xAA\x01\x00\x95\x00\x00\x96' + +class DEVICE_MODE(Constants): + ALGO = 1 + SCENE = 2 + NONE = 0 diff --git a/common/device_status_manager.py b/common/device_status_manager.py new file mode 100644 index 0000000..c8a9f70 --- /dev/null +++ b/common/device_status_manager.py @@ -0,0 +1,36 @@ +import threading +from datetime import datetime + + +class DeviceStatusManager: + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + # 确保线程安全的单例模式 + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(DeviceStatusManager, cls).__new__(cls) + return cls._instance + + def __init__(self): + # 初始化一次,避免重复初始化 + if not hasattr(self, "device_status"): + self.device_status = {} + self.lock = threading.Lock() + + def get_status(self, device_id): + """获取指定设备的在线状态""" + with self.lock: + return self.device_status.get(device_id, None) + + def set_status(self, device_id): + """设置指定设备的在线状态""" + with self.lock: + self.device_status[device_id] = datetime.now() + + def is_device_online(self, device_id): + ts = self.get_status(device_id) + return ts is not None and (datetime.now() - ts).seconds < 60 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 9973e56..b1d8fab 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/entity/device.py b/entity/device.py index 0a7aefa..89dd3a1 100644 --- a/entity/device.py +++ b/entity/device.py @@ -9,10 +9,12 @@ code: str type: Optional[str] = None ip: str + mode: int gas_ip: Optional[str] = None input_stream_url: Optional[str] = None output_stream_url: Optional[str] = None image_save_interval: Optional[int] = None + alarm_interval: Optional[int] = None class Device(DeviceBase, TimestampMixin, table=True): @@ -37,3 +39,6 @@ class DeviceInfo(DeviceBase, TimestampMixin): id: int + status: Optional[str] = None + relation_scene_name: Optional[str] = None + relation_model_names: Optional[str] = None diff --git a/entity/device_model_relation.py b/entity/device_model_relation.py index 2e58a39..e1d48d2 100644 --- a/entity/device_model_relation.py +++ b/entity/device_model_relation.py @@ -9,8 +9,6 @@ algo_model_id: int is_use: int threshold: Optional[float] = None - alarm_interval: Optional[int] = None - alarm_type: Optional[str] = None class DeviceModelRelation(DeviceModelRelationBase, TimestampMixin, table=True): @@ -19,6 +17,7 @@ device_id: int class DeviceModelRelationCreate(DeviceModelRelationBase): + # 批量新增,device_id单独传参,不在DeviceModelRelationCreate对象中 pass diff --git a/entity/device_scene_relation.py b/entity/device_scene_relation.py new file mode 100644 index 0000000..c4543d7 --- /dev/null +++ b/entity/device_scene_relation.py @@ -0,0 +1,23 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class DeviceSceneRelationBase(SQLModel): + scene_id: int + device_id: int + + +class DeviceSceneRelation(DeviceSceneRelationBase, TimestampMixin, table=True): + __tablename__ = 'device_scene_relation' + id: Optional[int] = Field(default=None, primary_key=True) + + +class DeviceSceneRelationInfo(DeviceSceneRelationBase, TimestampMixin): + id: int + scene_name: str + scene_version: str + scene_handle_task: str + scene_remark: Optional[str] = None diff --git a/entity/model.py b/entity/model.py index e96925b..878f4d7 100644 --- a/entity/model.py +++ b/entity/model.py @@ -32,4 +32,5 @@ class AlgoModelInfo(AlgoModelBase, TimestampMixin): id: int + usage_status: Optional[str] = None handle_task: Optional[str] = 'BaseModelHandler' diff --git a/entity/scene.py b/entity/scene.py new file mode 100644 index 0000000..d532953 --- /dev/null +++ b/entity/scene.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class SceneBase(SQLModel): + name: str + version: str + handle_task: str + remark: Optional[str] = None + + +class Scene(SceneBase, TimestampMixin, table=True): + __tablename__ = "scene" # 显式指定表名 + + id: Optional[int] = Field(default=None, primary_key=True) + + +class SceneCreate(SceneBase): + handle_task: Optional[str] = None + + +class SceneUpdate(SceneBase): + id: int + name: Optional[str] = None + version: Optional[str] = None + handle_task: Optional[str] = None + + +class SceneInfo(SceneBase, TimestampMixin): + id: int + usage_status: Optional[str] = None diff --git a/requirements.txt b/requirements.txt index 2432943..9a7d88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi sqlmodel openpyxl -python-multipart \ No newline at end of file +python-multipart +docker \ No newline at end of file diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..bb36274 --- /dev/null +++ b/run.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +set -e +set -x + +export PYTHONIOENCODING=utf-8 +export LANG=C.UTF-8 +export LC_ALL=C.UTF-8 + +cd /code/safe-algo-pro +python3 main.py diff --git a/services/device_model_relation_service.py b/services/device_model_relation_service.py index 49e167a..c32b5a5 100644 --- a/services/device_model_relation_service.py +++ b/services/device_model_relation_service.py @@ -41,8 +41,6 @@ algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, algo_model_name=model.name, algo_model_version=model.version, algo_model_path=model.path, @@ -52,14 +50,14 @@ ] return models_info + + def add_relations_by_device(self, device_id: int, relations: List[DeviceModelRelationCreate]): new_relations = [ DeviceModelRelation( algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, - alarm_interval=relation.alarm_interval, - alarm_type=relation.alarm_type, device_id=device_id, # 统一赋值 device_id createtime=datetime.now(), updatetime=datetime.now(), diff --git a/services/device_scene_relation_service.py b/services/device_scene_relation_service.py new file mode 100644 index 0000000..592ee98 --- /dev/null +++ b/services/device_scene_relation_service.py @@ -0,0 +1,70 @@ +from datetime import datetime +from typing import List, Optional + +from sqlmodel import Session, select, delete + +from common.consts import NotifyChangeType +from common.global_thread_pool import GlobalThreadPool +from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation +from entity.scene import Scene + + +class DeviceSceneRelationService: + def __init__(self, db: Session): + self.db = db + self.__relation_change_callbacks = [] # 用于存储回调函数 + self.thread_pool = GlobalThreadPool() + + def register_change_callback(self, callback): + """注册设备变化回调函数""" + self.__relation_change_callbacks.append(callback) + + def notify_change(self, device_id, change_type): + """当设备发生变化时,调用回调通知变化""" + for callback in self.__relation_change_callbacks: + self.thread_pool.executor.submit(callback, device_id, change_type) + + def get_device_scene(self, device_id: int) -> Optional[DeviceSceneRelationInfo]: + statement = ( + select(DeviceSceneRelation, Scene) + .join(Scene, DeviceSceneRelation.scene_id == Scene.id) + .where(DeviceSceneRelation.device_id == device_id) + ) + + # 执行联表查询 + result = self.db.exec(statement).first() + + scene_info = None + if result: + relation, scene = result[0], result[1] + scene_info = DeviceSceneRelationInfo( + id=relation.id, + device_id=relation.device_id, + scene_id=relation.scene_id, + scene_name=scene.name, + scene_version=scene.version, + scene_handle_task=scene.handle_task, + scene_remark=scene.remark, + ) + return scene_info + + def add_relation_by_device(self, device_id: int, scene_id: int): + new_relation = DeviceSceneRelation(device_id=device_id, scene_id=scene_id) + new_relation.create_time = datetime.now() + new_relation.update_time = datetime.now() + self.db.add(new_relation) + self.db.commit() + self.db.refresh(new_relation) + return new_relation + + def delete_relation_by_device(self, device_id: int): + statement = delete(DeviceSceneRelation).where(DeviceSceneRelation.device_id == device_id) + count = self.db.exec(statement) + self.db.commit() + return count.rowcount + + def update_relation_by_device(self, device_id: int, scene_id: int): + self.delete_relation_by_device(device_id) + new_relation = self.add_relation_by_device(device_id, scene_id) + self.notify_change(device_id, NotifyChangeType.DEVICE_SCENE_RELATION_UPDATE) + return new_relation diff --git a/services/device_service.py b/services/device_service.py index 4aa8bd5..cad46d3 100644 --- a/services/device_service.py +++ b/services/device_service.py @@ -6,10 +6,12 @@ from sqlalchemy import func from sqlmodel import Session, select +from common.device_status_manager import DeviceStatusManager from common.global_thread_pool import GlobalThreadPool -from common.consts import NotifyChangeType -from entity.device import Device, DeviceCreate, DeviceUpdate +from common.consts import NotifyChangeType, DEVICE_MODE +from entity.device import Device, DeviceCreate, DeviceUpdate, DeviceInfo from services.device_model_relation_service import DeviceModelRelationService +from services.device_scene_relation_service import DeviceSceneRelationService class DeviceService: @@ -65,7 +67,7 @@ device_type: Optional[str] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[Device], int]: + ) -> Tuple[Sequence[DeviceInfo], int]: statement = self.device_query(code, device_type, name) # 查询总记录数 @@ -77,7 +79,36 @@ # 执行查询并返回结果 results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + device_list = results.all() + device_info_list = [] + if device_list: + device_model_relation_service = DeviceModelRelationService(self.db) + device_scene_relation_service = DeviceSceneRelationService(self.db) + device_status_manager = DeviceStatusManager() + for device in device_list: + model_relations = device_model_relation_service.get_device_models(device.id) + scene_relation = device_scene_relation_service.get_device_scene(device.id) + + device_info_list.append(DeviceInfo( + id=device.id, + name=device.name, + code=device.code, + type=device.type, + ip=device.ip, + gas_ip=device.gas_ip, + mode=device.mode, + input_stream_url=device.input_stream_url, + output_stream_url=device.output_stream_url, + image_save_interval=device.image_save_interval, + alarm_interval=device.alarm_interval, + + status="在线" if device_status_manager.get_status(device.id) else "离线", + relation_model_names=", ".join( + [relation.algo_model_name for relation in model_relations if relation.is_use == 1] + ) if model_relations else "—", + relation_scene_name=scene_relation.scene_name if scene_relation else "—" + )) + return device_info_list, total # 返回分页数据和总数 def device_query(self, code, device_type, name): # 构建查询语句 @@ -94,6 +125,7 @@ device = Device.model_validate(device_data) device.create_time = datetime.now() device.update_time = datetime.now() + self.handle_device_mode(device) self.db.add(device) self.db.commit() self.db.refresh(device) @@ -110,6 +142,7 @@ for key, value in update_data.items(): setattr(device, key, value) + self.handle_device_mode(device) device.update_time = datetime.now() self.db.add(device) self.db.commit() @@ -126,9 +159,24 @@ self.db.commit() self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE) - relation_service = DeviceModelRelationService(self.db) - relation_service.delete_relations_by_device(device_id) + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device_id) + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) return device + def handle_device_mode(self, device): + if device.mode == DEVICE_MODE.ALGO: + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) + elif device.mode == DEVICE_MODE.SCENE: + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device.id) + else: + scene_relation_service = DeviceSceneRelationService(self.db) + scene_relation_service.delete_relation_by_device(device.id) + model_relation_service = DeviceModelRelationService(self.db) + model_relation_service.delete_relations_by_device(device.id) + def get_device(self, device_id: int): return self.db.get(Device, device_id) diff --git a/services/model_service.py b/services/model_service.py index 6f93d40..ec1fd2a 100644 --- a/services/model_service.py +++ b/services/model_service.py @@ -12,7 +12,7 @@ from common.biz_exception import BizException from common.string_utils import snake_to_camel from entity.device_model_relation import DeviceModelRelation -from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate +from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from common.global_thread_pool import GlobalThreadPool from common.consts import NotifyChangeType @@ -45,7 +45,7 @@ remark: Optional[str] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[AlgoModel], int]: + ) -> Tuple[Sequence[AlgoModelInfo], int]: statement = self.model_query(name, remark) # 查询总记录数 @@ -56,8 +56,16 @@ statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 - results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + model_list = self.db.exec(statement) + model_info_list: List[AlgoModelInfo] = [] + if model_list: + for model in model_list: + model_info_list.append(AlgoModelInfo( + **model.dict(), + usage_status="使用中" if self.get_model_usage(model.id) else "未使用" + )) + + return model_info_list, total # 返回分页数据和总数 def model_query(self, name, remark): # 构建查询语句 @@ -184,5 +192,16 @@ results = self.db.exec(statement).all() return results + def get_model_usage(self, algo_model_id) -> bool: + statement = ( + select(DeviceModelRelation) + .where( + DeviceModelRelation.is_use == 1, + DeviceModelRelation.algo_model_id == algo_model_id, + ) + ) + result = self.db.exec(statement).all() + return len(result) > 0 + def get_model_by_id(self, model_id): return self.db.get(AlgoModel, model_id) diff --git a/services/scene_service.py b/services/scene_service.py new file mode 100644 index 0000000..17c8f52 --- /dev/null +++ b/services/scene_service.py @@ -0,0 +1,201 @@ +import os +import uuid +import zipfile +from datetime import datetime +from pathlib import Path +from typing import Optional, Sequence, Tuple, List + +from fastapi import UploadFile +from sqlalchemy import func +from sqlmodel import Session, select + +from common.biz_exception import BizException +from common.consts import NotifyChangeType +from common.global_thread_pool import GlobalThreadPool +from common.string_utils import snake_to_camel +from entity.device_scene_relation import DeviceSceneRelation +from entity.scene import Scene, SceneInfo, SceneCreate, SceneUpdate + + +class SceneService: + def __init__(self, db: Session): + self.db = db + self.__scene_change_callbacks = [] # 用于存储回调函数 + self.thread_pool = GlobalThreadPool() + + def register_change_callback(self, callback): + """注册设备变化回调函数""" + self.__scene_change_callbacks.append(callback) + + def notify_change(self, scene_id, change_type): + """当设备发生变化时,调用回调通知变化""" + for callback in self.__scene_change_callbacks: + self.thread_pool.executor.submit(callback, scene_id, change_type) + + def get_scene_list(self, + name: Optional[str] = None, + remark: Optional[str] = None, + ) -> Sequence[Scene]: + statement = self.scene_query(name, remark) + results = self.db.exec(statement) + return results.all() + + def get_scene_page(self, + name: Optional[str] = None, + remark: Optional[str] = None, + offset: int = 0, + limit: int = 10 + ) -> Tuple[Sequence[SceneInfo], int]: + statement = self.scene_query(name, remark) + + # 查询总记录数 + total_statement = select(func.count()).select_from(statement.subquery()) + total = self.db.exec(total_statement).one() + + # 添加分页限制 + statement = statement.offset(offset).limit(limit) + + # 执行查询并返回结果 + scene_list = self.db.exec(statement) + scene_info_list: List[SceneInfo] = [] + if scene_list: + for scene in scene_list: + scene_info_list.append(SceneInfo( + **scene.dict(), + usage_status="使用中" if self.get_scene_usage(scene.id) else "未使用" + )) + + return scene_info_list, total # 返回分页数据和总数 + + def scene_query(self, name, remark): + # 构建查询语句 + statement = select(Scene) + if name: + statement = statement.where(Scene.name.like(f"%{name}%")) + if remark: + statement = statement.where(Scene.remark.like(f"%{remark}%")) + return statement + + def process_zip(self, file: UploadFile): + model_dir = Path('weights/') + scene_handle_dir = Path('scene_handler/') + model_dir.mkdir(parents=True, exist_ok=True) + scene_handle_dir.mkdir(parents=True, exist_ok=True) + # 支持的模型文件扩展名 + SUPPORTED_MODEL_EXTENSIONS = {".pt", ".onnx", ".engine"} + + # 临时保存上传文件 + temp_path = Path(f"temp_upload_{uuid.uuid4()}.zip") + with open(temp_path, "wb") as temp_file: + temp_file.write(file.file.read()) + + model_file_paths = [] + handle_file_path = None + + try: + with zipfile.ZipFile(temp_path, 'r') as zip_ref: + # 获取压缩包文件列表 + file_list = zip_ref.namelist() + + model_files = [f for f in file_list if Path(f).suffix in SUPPORTED_MODEL_EXTENSIONS] + + # 解压所有模型文件到模型目录 + for model_file in model_files: + zip_ref.extract(model_file, model_dir) + model_file_paths.append(model_dir / model_file) + + # 检查是否有可选的 Python 脚本 + handle_file = next((f for f in file_list if f.endswith(".py")), None) + if handle_file: + zip_ref.extract(handle_file, scene_handle_dir) + handle_file_path = scene_handle_dir / handle_file + else: + raise BizException( + status_code=400, + message=f"handle file (.py) is required in the zip." + ) + + except zipfile.BadZipFile: + raise BizException(status_code=400, message="Invalid zip file.") + finally: + # 删除临时文件 + temp_path.unlink() + + return [str(path) for path in model_file_paths], str(handle_file_path) + + def process_scene_file(self, file, scene): + model_file_paths, handle_file_path = self.process_zip(file) + scene.handle_task = snake_to_camel(os.path.splitext(os.path.basename(handle_file_path))[0]) + + def create_scene(self, scene_data: SceneCreate, file: UploadFile): + self.process_scene_file(file, scene_data) + scene = Scene.model_validate(scene_data) + scene.create_time = datetime.now() + scene.update_time = datetime.now() + + self.db.add(scene) + self.db.commit() + self.db.refresh(scene) + + return scene + + def update_scene(self, scene_data: SceneUpdate, file: UploadFile): + scene = self.db.get(Scene, scene_data.id) + if not scene: + return None + + update_data = scene_data.dict(exclude_unset=True) + for key, value in update_data.items(): + setattr(scene, key, value) + + scene.update_time = datetime.now() + if file: + self.process_scene_file(file, scene) + self.db.add(scene) + self.db.commit() + self.db.refresh(scene) + self.notify_change(scene.id, NotifyChangeType.SCENE_UPDATE) + + return scene + + def delete_scene(self, scene_id: int): + scene = self.db.get(Scene, scene_id) + if not scene: + return None + # 查询 device_scene_relation 中是否存在启用的绑定关系 + statement = ( + select(DeviceSceneRelation) + .where(DeviceSceneRelation.scene_id == scene_id) + ) + relation_in_use = self.db.exec(statement).first() + + # 如果存在启用的绑定关系,提示无法删除 + if relation_in_use: + raise BizException(message=f"场景 {scene.name} 正在被设备使用,无法删除") + + self.db.delete(scene) + self.db.commit() + return scene + + def get_scenes_in_use(self) -> Sequence[Scene]: + """获取所有在 device_scene_relation 表里有绑定关系的模型信息""" + statement = ( + select(Scene) + .join(DeviceSceneRelation, DeviceSceneRelation.scene_id == Scene.id) + .group_by(Scene.id) + ) + results = self.db.exec(statement).all() + return results + + def get_scene_usage(self, scene_id) -> bool: + statement = ( + select(DeviceSceneRelation) + .where( + DeviceSceneRelation.scene_id == scene_id, + ) + ) + result = self.db.exec(statement).all() + return len(result) > 0 + + def get_scene_by_id(self, scene_id): + return self.db.get(Scene, scene_id) \ No newline at end of file