diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/entity/data_gas.py b/entity/data_gas.py index 6eb3e50..b15a9b8 100644 --- a/entity/data_gas.py +++ b/entity/data_gas.py @@ -22,3 +22,7 @@ class DataGasCreate(DataGasBase): pass + +class DataGasInfo(DataGasBase): + id: int + device_name: str diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/entity/data_gas.py b/entity/data_gas.py index 6eb3e50..b15a9b8 100644 --- a/entity/data_gas.py +++ b/entity/data_gas.py @@ -22,3 +22,7 @@ class DataGasCreate(DataGasBase): pass + +class DataGasInfo(DataGasBase): + id: int + device_name: str diff --git a/entity/push_config.py b/entity/push_config.py new file mode 100644 index 0000000..493d443 --- /dev/null +++ b/entity/push_config.py @@ -0,0 +1,20 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class PushConfigBase(SQLModel): + push_url: str + push_interval: Optional[int] = 0 + push_type: int + + +class PushConfig(PushConfigBase, TimestampMixin, table=True): + __tablename__ = "push_config" # 显式指定表名 + id: Optional[int] = Field(default=None, primary_key=True) + + +class PushConfigCreate(PushConfigBase): + pass diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/entity/data_gas.py b/entity/data_gas.py index 6eb3e50..b15a9b8 100644 --- a/entity/data_gas.py +++ b/entity/data_gas.py @@ -22,3 +22,7 @@ class DataGasCreate(DataGasBase): pass + +class DataGasInfo(DataGasBase): + id: int + device_name: str diff --git a/entity/push_config.py b/entity/push_config.py new file mode 100644 index 0000000..493d443 --- /dev/null +++ b/entity/push_config.py @@ -0,0 +1,20 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class PushConfigBase(SQLModel): + push_url: str + push_interval: Optional[int] = 0 + push_type: int + + +class PushConfig(PushConfigBase, TimestampMixin, table=True): + __tablename__ = "push_config" # 显式指定表名 + id: Optional[int] = Field(default=None, primary_key=True) + + +class PushConfigCreate(PushConfigBase): + pass diff --git a/requirements.txt b/requirements.txt index af16eda..a1b369b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ pandas starlette uvicorn -sqlalchemy \ No newline at end of file +sqlalchemy +aiohttp \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/entity/data_gas.py b/entity/data_gas.py index 6eb3e50..b15a9b8 100644 --- a/entity/data_gas.py +++ b/entity/data_gas.py @@ -22,3 +22,7 @@ class DataGasCreate(DataGasBase): pass + +class DataGasInfo(DataGasBase): + id: int + device_name: str diff --git a/entity/push_config.py b/entity/push_config.py new file mode 100644 index 0000000..493d443 --- /dev/null +++ b/entity/push_config.py @@ -0,0 +1,20 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class PushConfigBase(SQLModel): + push_url: str + push_interval: Optional[int] = 0 + push_type: int + + +class PushConfig(PushConfigBase, TimestampMixin, table=True): + __tablename__ = "push_config" # 显式指定表名 + id: Optional[int] = Field(default=None, primary_key=True) + + +class PushConfigCreate(PushConfigBase): + pass diff --git a/requirements.txt b/requirements.txt index af16eda..a1b369b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ pandas starlette uvicorn -sqlalchemy \ No newline at end of file +sqlalchemy +aiohttp \ No newline at end of file diff --git a/services/data_gas_service.py b/services/data_gas_service.py index 52b4d33..fe6ce53 100644 --- a/services/data_gas_service.py +++ b/services/data_gas_service.py @@ -4,7 +4,8 @@ from sqlalchemy import func from sqlmodel import Session, select -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo +from entity.device import Device class DataGasService: @@ -23,7 +24,7 @@ end_time: Optional[datetime] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[DataGas], int]: + ) -> Tuple[Sequence[DataGasInfo], int]: statement = self.gas_query(device_code, end_time, start_time) # 查询总记录数 @@ -34,20 +35,44 @@ statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 - results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + + return data_gas_info_list, total # 返回分页数据和总数 def get_data_gas_list(self, device_code: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, - ) -> Sequence[DataGas]: + ) -> Sequence[DataGasInfo]: statement = self.gas_query(device_code, end_time, start_time) - results = self.db.exec(statement) - return results.all() + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + return data_gas_info_list def gas_query(self, device_code, end_time, start_time): - statement = select(DataGas) + statement = ( + select(DataGas, Device.name) + .join(Device, DataGas.device_code == Device.code) + ) if device_code: statement = statement.where(DataGas.device_code.like(f"%{device_code}%")) if start_time: diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/entity/data_gas.py b/entity/data_gas.py index 6eb3e50..b15a9b8 100644 --- a/entity/data_gas.py +++ b/entity/data_gas.py @@ -22,3 +22,7 @@ class DataGasCreate(DataGasBase): pass + +class DataGasInfo(DataGasBase): + id: int + device_name: str diff --git a/entity/push_config.py b/entity/push_config.py new file mode 100644 index 0000000..493d443 --- /dev/null +++ b/entity/push_config.py @@ -0,0 +1,20 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class PushConfigBase(SQLModel): + push_url: str + push_interval: Optional[int] = 0 + push_type: int + + +class PushConfig(PushConfigBase, TimestampMixin, table=True): + __tablename__ = "push_config" # 显式指定表名 + id: Optional[int] = Field(default=None, primary_key=True) + + +class PushConfigCreate(PushConfigBase): + pass diff --git a/requirements.txt b/requirements.txt index af16eda..a1b369b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ pandas starlette uvicorn -sqlalchemy \ No newline at end of file +sqlalchemy +aiohttp \ No newline at end of file diff --git a/services/data_gas_service.py b/services/data_gas_service.py index 52b4d33..fe6ce53 100644 --- a/services/data_gas_service.py +++ b/services/data_gas_service.py @@ -4,7 +4,8 @@ from sqlalchemy import func from sqlmodel import Session, select -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo +from entity.device import Device class DataGasService: @@ -23,7 +24,7 @@ end_time: Optional[datetime] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[DataGas], int]: + ) -> Tuple[Sequence[DataGasInfo], int]: statement = self.gas_query(device_code, end_time, start_time) # 查询总记录数 @@ -34,20 +35,44 @@ statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 - results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + + return data_gas_info_list, total # 返回分页数据和总数 def get_data_gas_list(self, device_code: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, - ) -> Sequence[DataGas]: + ) -> Sequence[DataGasInfo]: statement = self.gas_query(device_code, end_time, start_time) - results = self.db.exec(statement) - return results.all() + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + return data_gas_info_list def gas_query(self, device_code, end_time, start_time): - statement = select(DataGas) + statement = ( + select(DataGas, Device.name) + .join(Device, DataGas.device_code == Device.code) + ) if device_code: statement = statement.where(DataGas.device_code.like(f"%{device_code}%")) if start_time: diff --git a/services/global_config.py b/services/global_config.py new file mode 100644 index 0000000..7dae73b --- /dev/null +++ b/services/global_config.py @@ -0,0 +1,67 @@ +from common.consts import PUSH_TYPE +from db.database import get_db +from entity.push_config import PushConfig +from services.push_config_service import PushConfigService + + +class GlobalConfig: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, "initialized"): # 确保只初始化一次 + self.initialized = True + self.config_service = None + self.gas_push_config = None + self.algo_result_push_config = None + self.alarm_push_config = None + self.init_config() # 进行初始化 + + def init_config(self): + # 初始化配置逻辑 + with next(get_db()) as db: + self.config_service = PushConfigService(db) + self.set_gas_push_config(self.config_service.get_push_config(PUSH_TYPE.GAS)) + self.set_algo_result_push_config(self.config_service.get_push_config(PUSH_TYPE.ALGO_RESULT)) + self.set_alarm_push_config(self.config_service.get_push_config(PUSH_TYPE.ALARM)) + + self.config_service.register_change_callback(self.on_config_change) + + def on_config_change(self, config: PushConfig): + if config.push_type == PUSH_TYPE.GAS: + self.set_gas_push_config(config) + elif config.push_type == PUSH_TYPE.ALGO_RESULT: + self.set_algo_result_push_config(config) + elif config.push_type == PUSH_TYPE.ALARM: + self.set_alarm_push_config(config) + + def get_gas_push_config(self): + """获取 gas_push_config 配置""" + return self.gas_push_config + + def set_gas_push_config(self, config): + """设置 gas_push_config 配置""" + if config: + self.gas_push_config = config + + def get_algo_result_push_config(self): + """获取 algo_result_push_config 配置""" + return self.algo_result_push_config + + def set_algo_result_push_config(self, config): + """设置 algo_result_push_config 配置""" + if config: + self.algo_result_push_config = config + + def get_alarm_push_config(self): + """获取 algo_result_push_config 配置""" + return self.alarm_push_config + + def set_alarm_push_config(self, config): + """设置 algo_result_push_config 配置""" + if config: + self.alarm_push_config = config diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/entity/data_gas.py b/entity/data_gas.py index 6eb3e50..b15a9b8 100644 --- a/entity/data_gas.py +++ b/entity/data_gas.py @@ -22,3 +22,7 @@ class DataGasCreate(DataGasBase): pass + +class DataGasInfo(DataGasBase): + id: int + device_name: str diff --git a/entity/push_config.py b/entity/push_config.py new file mode 100644 index 0000000..493d443 --- /dev/null +++ b/entity/push_config.py @@ -0,0 +1,20 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class PushConfigBase(SQLModel): + push_url: str + push_interval: Optional[int] = 0 + push_type: int + + +class PushConfig(PushConfigBase, TimestampMixin, table=True): + __tablename__ = "push_config" # 显式指定表名 + id: Optional[int] = Field(default=None, primary_key=True) + + +class PushConfigCreate(PushConfigBase): + pass diff --git a/requirements.txt b/requirements.txt index af16eda..a1b369b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ pandas starlette uvicorn -sqlalchemy \ No newline at end of file +sqlalchemy +aiohttp \ No newline at end of file diff --git a/services/data_gas_service.py b/services/data_gas_service.py index 52b4d33..fe6ce53 100644 --- a/services/data_gas_service.py +++ b/services/data_gas_service.py @@ -4,7 +4,8 @@ from sqlalchemy import func from sqlmodel import Session, select -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo +from entity.device import Device class DataGasService: @@ -23,7 +24,7 @@ end_time: Optional[datetime] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[DataGas], int]: + ) -> Tuple[Sequence[DataGasInfo], int]: statement = self.gas_query(device_code, end_time, start_time) # 查询总记录数 @@ -34,20 +35,44 @@ statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 - results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + + return data_gas_info_list, total # 返回分页数据和总数 def get_data_gas_list(self, device_code: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, - ) -> Sequence[DataGas]: + ) -> Sequence[DataGasInfo]: statement = self.gas_query(device_code, end_time, start_time) - results = self.db.exec(statement) - return results.all() + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + return data_gas_info_list def gas_query(self, device_code, end_time, start_time): - statement = select(DataGas) + statement = ( + select(DataGas, Device.name) + .join(Device, DataGas.device_code == Device.code) + ) if device_code: statement = statement.where(DataGas.device_code.like(f"%{device_code}%")) if start_time: diff --git a/services/global_config.py b/services/global_config.py new file mode 100644 index 0000000..7dae73b --- /dev/null +++ b/services/global_config.py @@ -0,0 +1,67 @@ +from common.consts import PUSH_TYPE +from db.database import get_db +from entity.push_config import PushConfig +from services.push_config_service import PushConfigService + + +class GlobalConfig: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, "initialized"): # 确保只初始化一次 + self.initialized = True + self.config_service = None + self.gas_push_config = None + self.algo_result_push_config = None + self.alarm_push_config = None + self.init_config() # 进行初始化 + + def init_config(self): + # 初始化配置逻辑 + with next(get_db()) as db: + self.config_service = PushConfigService(db) + self.set_gas_push_config(self.config_service.get_push_config(PUSH_TYPE.GAS)) + self.set_algo_result_push_config(self.config_service.get_push_config(PUSH_TYPE.ALGO_RESULT)) + self.set_alarm_push_config(self.config_service.get_push_config(PUSH_TYPE.ALARM)) + + self.config_service.register_change_callback(self.on_config_change) + + def on_config_change(self, config: PushConfig): + if config.push_type == PUSH_TYPE.GAS: + self.set_gas_push_config(config) + elif config.push_type == PUSH_TYPE.ALGO_RESULT: + self.set_algo_result_push_config(config) + elif config.push_type == PUSH_TYPE.ALARM: + self.set_alarm_push_config(config) + + def get_gas_push_config(self): + """获取 gas_push_config 配置""" + return self.gas_push_config + + def set_gas_push_config(self, config): + """设置 gas_push_config 配置""" + if config: + self.gas_push_config = config + + def get_algo_result_push_config(self): + """获取 algo_result_push_config 配置""" + return self.algo_result_push_config + + def set_algo_result_push_config(self, config): + """设置 algo_result_push_config 配置""" + if config: + self.algo_result_push_config = config + + def get_alarm_push_config(self): + """获取 algo_result_push_config 配置""" + return self.alarm_push_config + + def set_alarm_push_config(self, config): + """设置 algo_result_push_config 配置""" + if config: + self.alarm_push_config = config diff --git a/services/push_config_service.py b/services/push_config_service.py new file mode 100644 index 0000000..8a2c298 --- /dev/null +++ b/services/push_config_service.py @@ -0,0 +1,57 @@ +from datetime import datetime + +from sqlmodel import Session, select + +from entity.push_config import PushConfigCreate, PushConfig + + +class PushConfigService: + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, db: Session): + if not hasattr(self, 'initialized'): + self.db = db + self.__push_change_callbacks = [] # 用于存储回调函数 + self.initialized = True + + def register_change_callback(self, callback): + """注册设备变化回调函数""" + self.__push_change_callbacks.append(callback) + + def notify_change(self, push_config): + for callback in self.__push_change_callbacks: + callback(push_config) + + def set_push_config(self, push_config_create: PushConfigCreate): + push_config = self.get_push_config(push_config_create.push_type) + if push_config: + update_data = push_config_create.dict(exclude_unset=True) + for key, value in update_data.items(): + setattr(push_config, key, value) + push_config.update_time = datetime.now() + else: + push_config = PushConfig.model_validate(push_config_create) + push_config.create_time = datetime.now() + push_config.update_time = datetime.now() + + self.db.add(push_config) + self.db.commit() + self.db.refresh(push_config) + + self.notify_change(push_config) + return push_config + + def get_push_config(self, push_type): + statement = select(PushConfig).where(PushConfig.push_type == push_type) + results = self.db.exec(statement) + return results.first() + + def get_push_config_list(self): + statement = select(PushConfig) + results = self.db.exec(statement) + return results.all() \ No newline at end of file diff --git a/algo/device_detection_task.py b/algo/device_detection_task.py index e222ca7..dbc3fee 100644 --- a/algo/device_detection_task.py +++ b/algo/device_detection_task.py @@ -1,3 +1,5 @@ +import asyncio +import json from dataclasses import dataclass import importlib from datetime import datetime @@ -9,15 +11,14 @@ from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool -from common.string_utils import camel_to_snake, get_class +from common.http_utils import send_request +from common.string_utils import camel_to_snake, get_class, default_serializer from db.database import get_db from entity.device import Device from entity.frame_analysis_result import FrameAnalysisResultCreate from services.device_frame_service import DeviceFrameService from services.frame_analysis_result_service import FrameAnalysisResultService - - - +from services.global_config import GlobalConfig @dataclass @@ -43,6 +44,7 @@ self.model_exec_list = model_exec_list self.__stop_event = Event() # 使用 Event 控制线程的运行状态 self.frame_ts = None + self.push_ts = None self.thread_id = thread_id with next(get_db()) as db: @@ -91,6 +93,22 @@ ) frame_results.append(frame_result) self.frame_analysis_result_service.add_frame_analysis_results(frame_results) + self.thread_pool.submit_task(self.push_frame_results, frame_results) + + def push_frame_results(self, frame_results): + global_config = GlobalConfig() + push_config = global_config.get_algo_result_push_config() + if push_config: + last_ts = self.push_ts + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > push_config.push_interval: + send_request( + push_config.push_url, + json.dumps([r.dict() for r in frame_results], default=default_serializer) + ) + self.push_ts = current_time # 更新推送时间戳 def run(self): while not self.stream_loader.init: diff --git a/apis/data_gas.py b/apis/data_gas.py index c9ce70f..fc8ec54 100644 --- a/apis/data_gas.py +++ b/apis/data_gas.py @@ -10,13 +10,13 @@ from apis.base import StandardResponse, PageResponse, convert_page_param, standard_response from db.database import get_db from entity.base import parse_datetime -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo from services.data_gas_service import DataGasService router = APIRouter() -@router.get("/page", response_model=StandardResponse[PageResponse[DataGas]]) +@router.get("/page", response_model=StandardResponse[PageResponse[DataGasInfo]]) def get_gas_page( device_code: Optional[str] = None, start_time: Optional[str] = None, @@ -45,7 +45,9 @@ data = service.get_data_gas_list(device_code, start_time, end_time) # 将查询结果转换为 DataFrame - data = [{"设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} for item in data] + data = [ + {"设备名称": item.device_name, "设备编号": item.device_code, "燃气浓度(ppm.m)": item.gas_value, "时间": item.ts} + for item in data] df = pd.DataFrame(data) # 使用 BytesIO 生成内存中的 Excel 文件 diff --git a/apis/device_model_realtion.py b/apis/device_model_realtion.py index 443176b..e61e1d6 100644 --- a/apis/device_model_realtion.py +++ b/apis/device_model_realtion.py @@ -14,7 +14,7 @@ app = get_app() def get_service(): - return app.state.device_model_relation_service + return app.state.model_relation_service @router.get("/list_by_device", response_model=StandardResponse[List[DeviceModelRelationInfo]]) diff --git a/apis/model.py b/apis/model.py index 78c064f..a1161eb 100644 --- a/apis/model.py +++ b/apis/model.py @@ -9,13 +9,14 @@ from entity.model import AlgoModel, AlgoModelCreate, AlgoModelUpdate, AlgoModelInfo from services.model_service import ModelService - router = APIRouter() app = get_app() + def get_service(): return app.state.model_service + @router.get("/list", response_model=StandardResponse[List[AlgoModelInfo]]) def get_model_list( name: Optional[str] = None, diff --git a/apis/push_config.py b/apis/push_config.py new file mode 100644 index 0000000..5e25579 --- /dev/null +++ b/apis/push_config.py @@ -0,0 +1,32 @@ +from typing import List + +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from apis.base import standard_response, StandardResponse +from db.database import get_db +from entity.push_config import PushConfig, PushConfigCreate +from services.push_config_service import PushConfigService + +router = APIRouter() + + +@router.get("/list", response_model=StandardResponse[List[PushConfig]]) +def get_push_config_list(db: Session = Depends(get_db)): + service = PushConfigService(db) + push_configs = service.get_push_config_list() + return standard_response(data=push_configs) + + +@router.get("/get_by_type", response_model=StandardResponse[PushConfig]) +def get_by_type(push_type: int, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.get_push_config(push_type) + return standard_response(data=push_config) + + +@router.post("/set_push_config", response_model=StandardResponse[PushConfig]) +def set_push_config(push_config: PushConfigCreate, db: Session = Depends(get_db)): + service = PushConfigService(db) + push_config = service.set_push_config(push_config) + return standard_response(data=push_config) diff --git a/apis/router.py b/apis/router.py index eb8fb59..18e1311 100644 --- a/apis/router.py +++ b/apis/router.py @@ -8,6 +8,7 @@ from .frame import router as frame_router from .data_gas import router as gas_router from .control import router as control_router +from .push_config import router as push_config_router # 创建一个全局的 router @@ -22,3 +23,5 @@ router.include_router(frame_router, prefix="/frame", tags=["DeviceFrame"]) router.include_router(gas_router, prefix="/gas", tags=["DataGas"]) router.include_router(control_router,prefix="/control", tags=["Control"]) +router.include_router(push_config_router,prefix="/push", tags=["PushConfig"]) + diff --git a/common/consts.py b/common/consts.py index 7a73c81..5a3286c 100644 --- a/common/consts.py +++ b/common/consts.py @@ -31,3 +31,8 @@ ALGO = 1 SCENE = 2 NONE = 0 + +class PUSH_TYPE(Constants): + GAS = 1 + ALGO_RESULT = 2 + ALARM = 3 diff --git a/common/http_utils.py b/common/http_utils.py new file mode 100644 index 0000000..5f15707 --- /dev/null +++ b/common/http_utils.py @@ -0,0 +1,24 @@ +import requests + +from common.global_logger import logger +import aiohttp + + +async def send_request_async(push_url, data): + try: + async with aiohttp.ClientSession() as session: + logger.info(f"Push to {push_url}, data = {data}") + async with session.post(push_url, json=data) as response: + response_text = await response.text() + logger.info(f"Response: {response.status}, {response_text}") + except aiohttp.ClientError as e: + logger.error(f"Failed to push data: {e}") + + +def send_request(push_url, data): + try: + logger.info(f"Push to {push_url}, data = {data}") + response = requests.post(push_url, json=data) + logger.info(f"Response: {response.status_code}, {response.text}") + except requests.RequestException as e: + logger.error(f"Failed to push data: {e}") diff --git a/common/string_utils.py b/common/string_utils.py index 7ff7f03..6c06477 100644 --- a/common/string_utils.py +++ b/common/string_utils.py @@ -1,5 +1,7 @@ import importlib import re +from datetime import datetime + def get_class(module_name, class_name): # 动态导入模块 @@ -15,4 +17,10 @@ def snake_to_camel(name): # 将下划线后的字母转换为大写,并删除下划线 - return ''.join(word.capitalize() for word in name.split('_')) \ No newline at end of file + return ''.join(word.capitalize() for word in name.split('_')) + + +def default_serializer(obj): + if isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") diff --git a/entity/data_gas.py b/entity/data_gas.py index 6eb3e50..b15a9b8 100644 --- a/entity/data_gas.py +++ b/entity/data_gas.py @@ -22,3 +22,7 @@ class DataGasCreate(DataGasBase): pass + +class DataGasInfo(DataGasBase): + id: int + device_name: str diff --git a/entity/push_config.py b/entity/push_config.py new file mode 100644 index 0000000..493d443 --- /dev/null +++ b/entity/push_config.py @@ -0,0 +1,20 @@ +from typing import Optional + +from sqlmodel import SQLModel, Field + +from entity.base import TimestampMixin + + +class PushConfigBase(SQLModel): + push_url: str + push_interval: Optional[int] = 0 + push_type: int + + +class PushConfig(PushConfigBase, TimestampMixin, table=True): + __tablename__ = "push_config" # 显式指定表名 + id: Optional[int] = Field(default=None, primary_key=True) + + +class PushConfigCreate(PushConfigBase): + pass diff --git a/requirements.txt b/requirements.txt index af16eda..a1b369b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ pandas starlette uvicorn -sqlalchemy \ No newline at end of file +sqlalchemy +aiohttp \ No newline at end of file diff --git a/services/data_gas_service.py b/services/data_gas_service.py index 52b4d33..fe6ce53 100644 --- a/services/data_gas_service.py +++ b/services/data_gas_service.py @@ -4,7 +4,8 @@ from sqlalchemy import func from sqlmodel import Session, select -from entity.data_gas import DataGas +from entity.data_gas import DataGas, DataGasInfo +from entity.device import Device class DataGasService: @@ -23,7 +24,7 @@ end_time: Optional[datetime] = None, offset: int = 0, limit: int = 10 - ) -> Tuple[Sequence[DataGas], int]: + ) -> Tuple[Sequence[DataGasInfo], int]: statement = self.gas_query(device_code, end_time, start_time) # 查询总记录数 @@ -34,20 +35,44 @@ statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 - results = self.db.exec(statement) - return results.all(), total # 返回分页数据和总数 + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + + return data_gas_info_list, total # 返回分页数据和总数 def get_data_gas_list(self, device_code: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, - ) -> Sequence[DataGas]: + ) -> Sequence[DataGasInfo]: statement = self.gas_query(device_code, end_time, start_time) - results = self.db.exec(statement) - return results.all() + results = self.db.exec(statement).all() + data_gas_info_list = [ + DataGasInfo( + id=data_gas.id, + device_code=data_gas.device_code, + gas_value=data_gas.gas_value, + ts=data_gas.ts, + device_name=device_name + ) + for data_gas, device_name in results + ] + return data_gas_info_list def gas_query(self, device_code, end_time, start_time): - statement = select(DataGas) + statement = ( + select(DataGas, Device.name) + .join(Device, DataGas.device_code == Device.code) + ) if device_code: statement = statement.where(DataGas.device_code.like(f"%{device_code}%")) if start_time: diff --git a/services/global_config.py b/services/global_config.py new file mode 100644 index 0000000..7dae73b --- /dev/null +++ b/services/global_config.py @@ -0,0 +1,67 @@ +from common.consts import PUSH_TYPE +from db.database import get_db +from entity.push_config import PushConfig +from services.push_config_service import PushConfigService + + +class GlobalConfig: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, "initialized"): # 确保只初始化一次 + self.initialized = True + self.config_service = None + self.gas_push_config = None + self.algo_result_push_config = None + self.alarm_push_config = None + self.init_config() # 进行初始化 + + def init_config(self): + # 初始化配置逻辑 + with next(get_db()) as db: + self.config_service = PushConfigService(db) + self.set_gas_push_config(self.config_service.get_push_config(PUSH_TYPE.GAS)) + self.set_algo_result_push_config(self.config_service.get_push_config(PUSH_TYPE.ALGO_RESULT)) + self.set_alarm_push_config(self.config_service.get_push_config(PUSH_TYPE.ALARM)) + + self.config_service.register_change_callback(self.on_config_change) + + def on_config_change(self, config: PushConfig): + if config.push_type == PUSH_TYPE.GAS: + self.set_gas_push_config(config) + elif config.push_type == PUSH_TYPE.ALGO_RESULT: + self.set_algo_result_push_config(config) + elif config.push_type == PUSH_TYPE.ALARM: + self.set_alarm_push_config(config) + + def get_gas_push_config(self): + """获取 gas_push_config 配置""" + return self.gas_push_config + + def set_gas_push_config(self, config): + """设置 gas_push_config 配置""" + if config: + self.gas_push_config = config + + def get_algo_result_push_config(self): + """获取 algo_result_push_config 配置""" + return self.algo_result_push_config + + def set_algo_result_push_config(self, config): + """设置 algo_result_push_config 配置""" + if config: + self.algo_result_push_config = config + + def get_alarm_push_config(self): + """获取 algo_result_push_config 配置""" + return self.alarm_push_config + + def set_alarm_push_config(self, config): + """设置 algo_result_push_config 配置""" + if config: + self.alarm_push_config = config diff --git a/services/push_config_service.py b/services/push_config_service.py new file mode 100644 index 0000000..8a2c298 --- /dev/null +++ b/services/push_config_service.py @@ -0,0 +1,57 @@ +from datetime import datetime + +from sqlmodel import Session, select + +from entity.push_config import PushConfigCreate, PushConfig + + +class PushConfigService: + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, db: Session): + if not hasattr(self, 'initialized'): + self.db = db + self.__push_change_callbacks = [] # 用于存储回调函数 + self.initialized = True + + def register_change_callback(self, callback): + """注册设备变化回调函数""" + self.__push_change_callbacks.append(callback) + + def notify_change(self, push_config): + for callback in self.__push_change_callbacks: + callback(push_config) + + def set_push_config(self, push_config_create: PushConfigCreate): + push_config = self.get_push_config(push_config_create.push_type) + if push_config: + update_data = push_config_create.dict(exclude_unset=True) + for key, value in update_data.items(): + setattr(push_config, key, value) + push_config.update_time = datetime.now() + else: + push_config = PushConfig.model_validate(push_config_create) + push_config.create_time = datetime.now() + push_config.update_time = datetime.now() + + self.db.add(push_config) + self.db.commit() + self.db.refresh(push_config) + + self.notify_change(push_config) + return push_config + + def get_push_config(self, push_type): + statement = select(PushConfig).where(PushConfig.push_type == push_type) + results = self.db.exec(statement) + return results.first() + + def get_push_config_list(self): + statement = select(PushConfig) + results = self.db.exec(statement) + return results.all() \ No newline at end of file diff --git a/tcp/tcp_client_connector.py b/tcp/tcp_client_connector.py index a62758e..3d6d076 100644 --- a/tcp/tcp_client_connector.py +++ b/tcp/tcp_client_connector.py @@ -1,11 +1,14 @@ import asyncio +from datetime import datetime from common.byte_utils import format_bytes from common.consts import TREE_COMMAND from common.global_logger import logger +from common.http_utils import send_request_async from db.database import get_db from entity.data_gas import DataGas from services.data_gas_service import DataGasService +from services.global_config import GlobalConfig def parse_gas_data(data): @@ -59,6 +62,8 @@ self.timeout = timeout # 连接/发送超时时间 self.is_connected = False # 连接状态标志 + self.push_ts_dict = {} + async def connect(self): """连接到设备""" try: @@ -116,14 +121,29 @@ logger.info(res) with next(get_db()) as db: data_gas_service = DataGasService(db) - data_gas = data_gas = DataGas( + data_gas = DataGas( device_code=res['device_code'], gas_value=res['gas_value'] ) + data_gas_service.add_data_gas(data_gas) + self.gas_push(data_gas) + except Exception as e: logger.error(f"Parse and save gas data failed: {e}") + def gas_push(self, data_gas): + global_config = GlobalConfig() + gas_push_config = global_config.get_gas_push_config() + if gas_push_config: + last_ts = self.push_ts_dict.get(data_gas.device_code) + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > gas_push_config.push_interval: + asyncio.create_task(send_request_async(gas_push_config.push_url, data_gas.json())) + self.push_ts_dict[data_gas.device_code] = current_time # 更新推送时间戳 + async def send_message(self, message: bytes, have_response=True): """发送自定义消息的接口,供其他类调用""" try: