import asyncio import base64 import time from datetime import datetime import cv2 from common.global_thread_pool import GlobalThreadPool from common.http_utils import send_request from db.database import get_db from entity.alarm_record import AlarmRecordCreate from services.alarm_record_service import AlarmRecordService from services.global_config import GlobalConfig def image_to_base64(numpy_image, format='jpg'): # 将NumPy数组转换为图片格式的字节流 # format指定图片格式,'png'或'jpeg' success, encoded_image = cv2.imencode(f'.{format}', numpy_image) if not success: raise ValueError("Could not encode image") # 将字节流转换为Base64编码 base64_encoded_image = base64.b64encode(encoded_image) # 将bytes类型转换为UTF-8字符串 base64_message = base64_encoded_image.decode('utf-8') return base64_message # return f'data:image/{format};base64,{base64_message}' class AlarmRecordCenter: def __init__(self, save_interval=-1, main_loop=None): """ 初始化报警记录中心 :param upload_interval: 报警上传间隔(秒),如果 <= 0,则不报警 """ self.main_loop = main_loop self.save_interval = save_interval self.thread_pool = GlobalThreadPool() self.global_config = GlobalConfig() # self.upload_interval = upload_interval self.device_alarm_upload_time = {} # key: device_code, value: {alarm_type: last upload time} self.device_alarm_save_time = {} def need_alarm(self, device_code, alarm_dict): """ 是否需要报警 :param device_code: 设备编号 :param alarm_dict: 报警类型字典 :return: 是否需要报警 """ current_time = time.time() alarm_type = alarm_dict['alarmType'] return self.need_save_alarm_record(device_code, current_time, alarm_type) \ or self.need_upload_alarm_record(device_code, current_time, alarm_type) def need_save_alarm_record(self, device_code, current_time, alarm_type): if self.save_interval <= 0: return False if device_code not in self.device_alarm_save_time: self.device_alarm_save_time[device_code] = {} last_save_time = self.device_alarm_save_time[device_code].get(alarm_type) if last_save_time is None or (current_time - last_save_time) > self.save_interval: return True return False def need_upload_alarm_record(self, device_code, current_time, alarm_type): push_config = self.global_config.get_alarm_push_config() if not push_config or not push_config.push_url: return False if device_code not in self.device_alarm_upload_time: self.device_alarm_upload_time[device_code] = {} last_upload_time = self.device_alarm_upload_time[device_code].get(alarm_type) if last_upload_time is None or (current_time - last_upload_time) > push_config.upload_interval: return True return False async def save_record(self, alarm_data, alarm_np_img): async for db in get_db(): alarm_record_service = AlarmRecordService(db) await alarm_record_service.add_alarm(alarm_data=alarm_data, alarm_np_img=alarm_np_img) def upload_alarm_record(self, device_code, alarm_dict, alarm_np_img=None, alarm_value=None): """ 上传报警记录 :param alarm_value: :param device_code: 设备编号 :param alarm_dict: 报警类型字典 :param alarm_np_img: 报警图片(np array) """ # 获取当前时间 current_time = time.time() alarm_type = alarm_dict['alarmType'] if self.need_save_alarm_record(device_code, current_time, alarm_type): alarm_record_data = AlarmRecordCreate( device_code=device_code, device_id=100, alarm_type=alarm_type, alarm_content=alarm_dict['alarmContent'], alarm_time=datetime.now(), alarm_value=alarm_value if alarm_value else None, ) asyncio.run_coroutine_threadsafe( self.save_record(alarm_record_data, alarm_np_img), self.main_loop) self.device_alarm_save_time[device_code][alarm_type] = current_time if self.need_upload_alarm_record(device_code, current_time, alarm_type): alarm_record = { 'devcode': device_code, 'alarmTime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'alarmType': alarm_dict['alarmType'], 'alarmContent': alarm_dict['alarmContent'], } if alarm_value: alarm_record['alarmValue'] = alarm_value if alarm_np_img: alarm_record['alarmImage'] = image_to_base64(alarm_np_img) push_config = self.global_config.get_alarm_push_config() self.thread_pool.submit_task(send_request, push_config.upload_url, alarm_record) self.device_alarm_upload_time[device_code][alarm_type] = current_time