diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/common/http_utils.py b/common/http_utils.py index a49c6ea..405d951 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -1,3 +1,5 @@ +import json + import requests from common.global_logger import logger @@ -6,18 +8,21 @@ async def send_request_async(push_url, data): try: + headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: logger.info(f"Push to {push_url}, data = {data}") - async with session.post(push_url, json=data) as response: + async with session.post(push_url, json=data, headers=headers) as response: response_text = await response.text() logger.info(f"Response: {response.status}, {response_text}") except aiohttp.ClientError as e: logger.error(f"Failed to push data: {e}") -def send_request(push_url, data): +def send_request(push_url, data, log_data = None): try: - logger.info(f"Push to {push_url}, data = {data}") + if log_data is None: + log_data = data + logger.info(f"Push to {push_url}, data = {log_data}") response = requests.post(push_url, json=data) logger.info(f"Response: {response.status_code}, {response.text}") except requests.RequestException as e: diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/common/http_utils.py b/common/http_utils.py index a49c6ea..405d951 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -1,3 +1,5 @@ +import json + import requests from common.global_logger import logger @@ -6,18 +8,21 @@ async def send_request_async(push_url, data): try: + headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: logger.info(f"Push to {push_url}, data = {data}") - async with session.post(push_url, json=data) as response: + async with session.post(push_url, json=data, headers=headers) as response: response_text = await response.text() logger.info(f"Response: {response.status}, {response_text}") except aiohttp.ClientError as e: logger.error(f"Failed to push data: {e}") -def send_request(push_url, data): +def send_request(push_url, data, log_data = None): try: - logger.info(f"Push to {push_url}, data = {data}") + if log_data is None: + log_data = data + logger.info(f"Push to {push_url}, data = {log_data}") response = requests.post(push_url, json=data) logger.info(f"Response: {response.status_code}, {response.text}") except requests.RequestException as e: diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 668520b..191f99f 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/common/http_utils.py b/common/http_utils.py index a49c6ea..405d951 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -1,3 +1,5 @@ +import json + import requests from common.global_logger import logger @@ -6,18 +8,21 @@ async def send_request_async(push_url, data): try: + headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: logger.info(f"Push to {push_url}, data = {data}") - async with session.post(push_url, json=data) as response: + async with session.post(push_url, json=data, headers=headers) as response: response_text = await response.text() logger.info(f"Response: {response.status}, {response_text}") except aiohttp.ClientError as e: logger.error(f"Failed to push data: {e}") -def send_request(push_url, data): +def send_request(push_url, data, log_data = None): try: - logger.info(f"Push to {push_url}, data = {data}") + if log_data is None: + log_data = data + logger.info(f"Push to {push_url}, data = {log_data}") response = requests.post(push_url, json=data) logger.info(f"Response: {response.status_code}, {response.text}") except requests.RequestException as e: diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 668520b..191f99f 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 729d298..051927e 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -6,7 +6,7 @@ ''' 队列消息取出规则: -- 按 alarmCategory(优先级:2 > 1 > 3 > 0)和 category_order 从小到大排序。 +- 按 alarmCategory和 category_order 从小到大排序。 - 确保每个 alarmCategory 在 10 秒内只能发送一次。 消息定时清理: @@ -38,6 +38,7 @@ message = copy.deepcopy(message_ori) message['timestamp'] = int(time.time()) # 添加消息放入队列的时间 with self.lock: + print(message) self.queue.append(message) # 动态更新优先级映射 diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/common/http_utils.py b/common/http_utils.py index a49c6ea..405d951 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -1,3 +1,5 @@ +import json + import requests from common.global_logger import logger @@ -6,18 +8,21 @@ async def send_request_async(push_url, data): try: + headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: logger.info(f"Push to {push_url}, data = {data}") - async with session.post(push_url, json=data) as response: + async with session.post(push_url, json=data, headers=headers) as response: response_text = await response.text() logger.info(f"Response: {response.status}, {response_text}") except aiohttp.ClientError as e: logger.error(f"Failed to push data: {e}") -def send_request(push_url, data): +def send_request(push_url, data, log_data = None): try: - logger.info(f"Push to {push_url}, data = {data}") + if log_data is None: + log_data = data + logger.info(f"Push to {push_url}, data = {log_data}") response = requests.post(push_url, json=data) logger.info(f"Response: {response.status_code}, {response.text}") except requests.RequestException as e: diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 668520b..191f99f 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 729d298..051927e 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -6,7 +6,7 @@ ''' 队列消息取出规则: -- 按 alarmCategory(优先级:2 > 1 > 3 > 0)和 category_order 从小到大排序。 +- 按 alarmCategory和 category_order 从小到大排序。 - 确保每个 alarmCategory 在 10 秒内只能发送一次。 消息定时清理: @@ -38,6 +38,7 @@ message = copy.deepcopy(message_ori) message['timestamp'] = int(time.time()) # 添加消息放入队列的时间 with self.lock: + print(message) self.queue.append(message) # 动态更新优先级映射 diff --git a/scene_handler/alarm_record_center.py b/scene_handler/alarm_record_center.py index 5373cd5..2607912 100644 --- a/scene_handler/alarm_record_center.py +++ b/scene_handler/alarm_record_center.py @@ -1,5 +1,6 @@ import asyncio import base64 +import copy import time from datetime import datetime @@ -34,13 +35,11 @@ def __init__(self, save_interval=-1, main_loop=None): """ 初始化报警记录中心 - :param upload_interval: 报警上传间隔(秒),如果 <= 0,则不报警 """ self.main_loop = main_loop self.save_interval = save_interval self.thread_pool = GlobalThreadPool() self.global_config = GlobalConfig() - # self.upload_interval = upload_interval self.device_alarm_upload_time = {} # key: device_code, value: {alarm_type: last upload time} self.device_alarm_save_time = {} @@ -70,13 +69,13 @@ def need_upload_alarm_record(self, device_code, current_time, alarm_type): push_config = self.global_config.get_alarm_push_config() - if not push_config or not push_config.push_url: + if not push_config or not push_config.push_url or push_config.push_interval <= 0: return False if device_code not in self.device_alarm_upload_time: self.device_alarm_upload_time[device_code] = {} last_upload_time = self.device_alarm_upload_time[device_code].get(alarm_type) - if last_upload_time is None or (current_time - last_upload_time) > push_config.upload_interval: + if last_upload_time is None or (current_time - last_upload_time) > push_config.push_interval: return True return False @@ -118,11 +117,14 @@ 'alarmType': alarm_dict['alarmType'], 'alarmContent': alarm_dict['alarmContent'], } - if alarm_value: + if alarm_value is not None: alarm_record['alarmValue'] = alarm_value - if alarm_np_img: + if alarm_np_img is not None: alarm_record['alarmImage'] = image_to_base64(alarm_np_img) + log_data = copy.deepcopy(alarm_record) + if log_data.get('alarmImage', None): + log_data.pop('alarmImage') push_config = self.global_config.get_alarm_push_config() - self.thread_pool.submit_task(send_request, push_config.upload_url, alarm_record) + self.thread_pool.submit_task(send_request, push_config.push_url, alarm_record, log_data) self.device_alarm_upload_time[device_code][alarm_type] = current_time diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/common/http_utils.py b/common/http_utils.py index a49c6ea..405d951 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -1,3 +1,5 @@ +import json + import requests from common.global_logger import logger @@ -6,18 +8,21 @@ async def send_request_async(push_url, data): try: + headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: logger.info(f"Push to {push_url}, data = {data}") - async with session.post(push_url, json=data) as response: + async with session.post(push_url, json=data, headers=headers) as response: response_text = await response.text() logger.info(f"Response: {response.status}, {response_text}") except aiohttp.ClientError as e: logger.error(f"Failed to push data: {e}") -def send_request(push_url, data): +def send_request(push_url, data, log_data = None): try: - logger.info(f"Push to {push_url}, data = {data}") + if log_data is None: + log_data = data + logger.info(f"Push to {push_url}, data = {log_data}") response = requests.post(push_url, json=data) logger.info(f"Response: {response.status_code}, {response.text}") except requests.RequestException as e: diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 668520b..191f99f 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 729d298..051927e 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -6,7 +6,7 @@ ''' 队列消息取出规则: -- 按 alarmCategory(优先级:2 > 1 > 3 > 0)和 category_order 从小到大排序。 +- 按 alarmCategory和 category_order 从小到大排序。 - 确保每个 alarmCategory 在 10 秒内只能发送一次。 消息定时清理: @@ -38,6 +38,7 @@ message = copy.deepcopy(message_ori) message['timestamp'] = int(time.time()) # 添加消息放入队列的时间 with self.lock: + print(message) self.queue.append(message) # 动态更新优先级映射 diff --git a/scene_handler/alarm_record_center.py b/scene_handler/alarm_record_center.py index 5373cd5..2607912 100644 --- a/scene_handler/alarm_record_center.py +++ b/scene_handler/alarm_record_center.py @@ -1,5 +1,6 @@ import asyncio import base64 +import copy import time from datetime import datetime @@ -34,13 +35,11 @@ def __init__(self, save_interval=-1, main_loop=None): """ 初始化报警记录中心 - :param upload_interval: 报警上传间隔(秒),如果 <= 0,则不报警 """ self.main_loop = main_loop self.save_interval = save_interval self.thread_pool = GlobalThreadPool() self.global_config = GlobalConfig() - # self.upload_interval = upload_interval self.device_alarm_upload_time = {} # key: device_code, value: {alarm_type: last upload time} self.device_alarm_save_time = {} @@ -70,13 +69,13 @@ def need_upload_alarm_record(self, device_code, current_time, alarm_type): push_config = self.global_config.get_alarm_push_config() - if not push_config or not push_config.push_url: + if not push_config or not push_config.push_url or push_config.push_interval <= 0: return False if device_code not in self.device_alarm_upload_time: self.device_alarm_upload_time[device_code] = {} last_upload_time = self.device_alarm_upload_time[device_code].get(alarm_type) - if last_upload_time is None or (current_time - last_upload_time) > push_config.upload_interval: + if last_upload_time is None or (current_time - last_upload_time) > push_config.push_interval: return True return False @@ -118,11 +117,14 @@ 'alarmType': alarm_dict['alarmType'], 'alarmContent': alarm_dict['alarmContent'], } - if alarm_value: + if alarm_value is not None: alarm_record['alarmValue'] = alarm_value - if alarm_np_img: + if alarm_np_img is not None: alarm_record['alarmImage'] = image_to_base64(alarm_np_img) + log_data = copy.deepcopy(alarm_record) + if log_data.get('alarmImage', None): + log_data.pop('alarmImage') push_config = self.global_config.get_alarm_push_config() - self.thread_pool.submit_task(send_request, push_config.upload_url, alarm_record) + self.thread_pool.submit_task(send_request, push_config.push_url, alarm_record, log_data) self.device_alarm_upload_time[device_code][alarm_type] = current_time diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index 1f25d14..c1696af 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -4,7 +4,8 @@ from copy import deepcopy from datetime import datetime -from flatbuffers.builder import np +import numpy as np +import asyncio from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad @@ -15,11 +16,13 @@ from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager from common.image_plotting import Annotator, colors +from db.database import get_db from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter from scene_handler.base_scene_handler import BaseSceneHandler from scene_handler.limit_space_scene_handler import is_overlapping +from services.data_gas_service import DataGasService from services.global_config import GlobalConfig from tcp.tcp_manager import TcpManager @@ -43,102 +46,102 @@ ALARM_DICT = [ { 'alarmCategory': 0, - 'alarmType': '1', + 'alarmType': '14', 'handelType': 1, 'category_order': 1, 'class_idx': [34], 'alarm_name': 'no_fire_extinguisher', 'alarmContent': '未检测到灭火器', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '2', + 'alarmType': '15', 'handelType': 1, 'category_order': 2, 'class_idx': [43], 'alarm_name': 'no_barrier_tape', 'alarmContent': '未检测到警戒线', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '3', + 'alarmType': '16', 'handelType': 1, 'category_order': 3, 'class_idx': [48], 'alarm_name': 'no_cone', 'alarmContent': '未检测到锥桶', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '4', + 'alarmType': '17', 'handelType': 1, 'category_order': 4, 'class_idx': [4, 5, 16], 'alarm_name': 'no_board', 'alarmContent': '未检测到指示牌', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '5', + 'alarmType': '2', 'handelType': 2, 'category_order': -1, 'class_idx': [18], 'alarm_name': 'no_helmet', 'alarmContent': '未佩戴安全帽', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', 'label': '未佩戴安全帽', }, # todo 明火 { 'alarmCategory': 1, - 'alarmType': '7', + 'alarmType': '1', 'handelType': 3, 'category_order': 1, 'class_idx': [], 'alarm_name': 'gas_alarm', 'alarmContent': '甲烷浓度超限', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8', 'label': '', }, { 'alarmCategory': 1, - 'alarmType': '8', + 'alarmType': '', 'handelType': 3, 'category_order': 2, 'class_idx': [], 'alarm_name': 'harmful_alarm', 'alarmContent': '有害气体浓度超标', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', 'label': '', }, { 'alarmCategory': 2, - 'alarmType': '9', + 'alarmType': '18', 'handelType': 3, 'category_order': -1, 'class_idx': [], 'alarm_name': 'health_alarm', 'alarmContent': '心率血氧异常', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', 'label': '', }, { 'alarmCategory': 3, - 'alarmType': '10', + 'alarmType': '3', 'handelType': 2, 'category_order': 4, 'class_idx': [24], 'alarm_name': 'break_in_alarm', 'alarmContent': '非法闯入', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', 'label': '非法闯入', }, @@ -158,7 +161,9 @@ self.thread_pool = GlobalThreadPool() self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, - category_priority={2: 0, 1: 1, 3: 2, 0: 3}) + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={2: 0, 1: 1, 3: 2, + 0: 3}) # (优先级:2 > 1 > 3 > 0) self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() @@ -168,6 +173,7 @@ self.health_device_codes = ['HWIH061000056395'] self.harmful_device_codes = ['862635063168165A'] + self.thread_pool.submit_task(self.gas_data_task, device.code) for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: @@ -176,15 +182,15 @@ self.thread_pool.submit_task(self.alarm_message_center.process_messages) # todo 明火 - self.model = YOLO('weights/labor-v8-20241114.pt') + self.model = YOLO('weights/labor-v8-20250115-fp16.engine') self.model_classes = { - 0: '三脚架', + # 0: '三脚架', 3: '人', 4: '作业信息公示牌', 6: '危险告知牌', 9: '反光衣', - 11: '呼吸面罩', - 13: '四合一', + # 11: '呼吸面罩', + # 13: '四合一', 15: '头', 16: '安全告知牌', 18: '安全帽', @@ -198,19 +204,24 @@ self.PERSON_CLASS_IDX = 3 self.HEAD_CLASS_IDX = 15 + self.vid_stride = 3 + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, - device_thread_id=thread_id) + device_thread_id=thread_id, vid_stride=self.vid_stride) self.range_points = range_points self.abs_range_points = self.get_absolute_range() self.tracking_status = {} # 跟踪每个行人的状态 - self.max_missing_frames = 25 # 报警的阈值 - self.disappear_threshold = 25 * 3 # 移除行人的阈值 + self.max_missing_frames = 25 / self.vid_stride # 报警的阈值 + self.disappear_threshold = 25 * 3 / self.vid_stride # 移除行人的阈值 self.frames_detected = 0 self.fps_ts = None def get_absolute_range(self): + if not self.range_points: + return None + fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: abs_points = [] @@ -226,6 +237,31 @@ else: return None + def gas_data_task(self, tree_device_code): + while not self.__stop_event.is_set(): + asyncio.run_coroutine_threadsafe( + self.handle_gas_alarm(tree_device_code), self.main_loop + ) + time.sleep(5) + + async def handle_gas_alarm(self, tree_device_code): + try: + async for db in get_db(): + gasService = DataGasService(db) + gas_data = await gasService.latest_query(tree_device_code) + if gas_data and time.time() - gas_data.ts.timestamp() < 60: + if gas_data.gas_value > 200.0: + alarm_dict = [d for d in ALARM_DICT if + d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], + alarm_np_img=None,alarm_value=gas_data.gas_value) + except Exception as e: + print(f"Error in latest_query: {e}") + # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): @@ -249,12 +285,12 @@ uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") if last_ts is None or (uptime.timestamp() - last_ts) > 0: self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() - if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 ch4 = data.get('ch4') co = data.get('co') h2s = data.get('h2s') o2 = data.get('o2') - self.handle_query_harmful_gas_alarm(harmful_device_code,ch4, co, h2s, o2) + self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2) time.sleep(5) def health_data_task(self, helmet_code): @@ -280,7 +316,7 @@ def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' f'upload_timestamp = {upload_timestamp}') - if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: + if heartrate < 50 or heartrate > 120 or blood_oxygen < 85: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) @@ -291,7 +327,7 @@ or float(co) > 10.0 \ or float(h2s) > 120.0 \ or float(o2) < 15: - alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) @@ -308,7 +344,7 @@ alarm = gas_value > 10 if alarm: - alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 @@ -343,7 +379,7 @@ color=box_color, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 1: # 检测不到报警 if object_boxes: @@ -362,7 +398,7 @@ color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 2: # 人未穿戴报警 person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] @@ -410,7 +446,7 @@ color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) def handle_break_in_alarm(self, frames, result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] @@ -486,9 +522,11 @@ if not frames: continue + t1 = time.time() self.device_status_manager.set_status(device_id=self.device.id) result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + t2 = time.time() for idx, frame_boxes in enumerate(result_boxes): current_person_ids = {int(box.id) for box in frame_boxes if box.cls is not None and box.id is not None and int( @@ -507,19 +545,23 @@ self.handle_behave_alarm(frames, result_boxes) self.handle_break_in_alarm(frames, result_boxes) + t3 = time.time() # for person_id in self.tracking_status.keys(): # print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') - # for idx, frame in enumerate(frames): - # annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") - # frame_boxes = result_boxes[idx] - # for s_box in frame_boxes: - # annotator.box_label(s_box.xyxy.cpu().squeeze(), - # f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", - # color=colors(int(s_box.cls)), - # rotated=False) - # self.display_frame_manager.add_frame(self.device.id, annotator.result()) + for idx, frame in enumerate(frames): + annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + frame_boxes = result_boxes[idx] + for s_box in frame_boxes: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + self.display_frame_manager.add_frame(self.device.id, annotator.result()) # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + + t4 = time.time() + print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms') self.log_fps(len(frames)) except Exception as ex: diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/common/http_utils.py b/common/http_utils.py index a49c6ea..405d951 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -1,3 +1,5 @@ +import json + import requests from common.global_logger import logger @@ -6,18 +8,21 @@ async def send_request_async(push_url, data): try: + headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: logger.info(f"Push to {push_url}, data = {data}") - async with session.post(push_url, json=data) as response: + async with session.post(push_url, json=data, headers=headers) as response: response_text = await response.text() logger.info(f"Response: {response.status}, {response_text}") except aiohttp.ClientError as e: logger.error(f"Failed to push data: {e}") -def send_request(push_url, data): +def send_request(push_url, data, log_data = None): try: - logger.info(f"Push to {push_url}, data = {data}") + if log_data is None: + log_data = data + logger.info(f"Push to {push_url}, data = {log_data}") response = requests.post(push_url, json=data) logger.info(f"Response: {response.status_code}, {response.text}") except requests.RequestException as e: diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 668520b..191f99f 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 729d298..051927e 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -6,7 +6,7 @@ ''' 队列消息取出规则: -- 按 alarmCategory(优先级:2 > 1 > 3 > 0)和 category_order 从小到大排序。 +- 按 alarmCategory和 category_order 从小到大排序。 - 确保每个 alarmCategory 在 10 秒内只能发送一次。 消息定时清理: @@ -38,6 +38,7 @@ message = copy.deepcopy(message_ori) message['timestamp'] = int(time.time()) # 添加消息放入队列的时间 with self.lock: + print(message) self.queue.append(message) # 动态更新优先级映射 diff --git a/scene_handler/alarm_record_center.py b/scene_handler/alarm_record_center.py index 5373cd5..2607912 100644 --- a/scene_handler/alarm_record_center.py +++ b/scene_handler/alarm_record_center.py @@ -1,5 +1,6 @@ import asyncio import base64 +import copy import time from datetime import datetime @@ -34,13 +35,11 @@ def __init__(self, save_interval=-1, main_loop=None): """ 初始化报警记录中心 - :param upload_interval: 报警上传间隔(秒),如果 <= 0,则不报警 """ self.main_loop = main_loop self.save_interval = save_interval self.thread_pool = GlobalThreadPool() self.global_config = GlobalConfig() - # self.upload_interval = upload_interval self.device_alarm_upload_time = {} # key: device_code, value: {alarm_type: last upload time} self.device_alarm_save_time = {} @@ -70,13 +69,13 @@ def need_upload_alarm_record(self, device_code, current_time, alarm_type): push_config = self.global_config.get_alarm_push_config() - if not push_config or not push_config.push_url: + if not push_config or not push_config.push_url or push_config.push_interval <= 0: return False if device_code not in self.device_alarm_upload_time: self.device_alarm_upload_time[device_code] = {} last_upload_time = self.device_alarm_upload_time[device_code].get(alarm_type) - if last_upload_time is None or (current_time - last_upload_time) > push_config.upload_interval: + if last_upload_time is None or (current_time - last_upload_time) > push_config.push_interval: return True return False @@ -118,11 +117,14 @@ 'alarmType': alarm_dict['alarmType'], 'alarmContent': alarm_dict['alarmContent'], } - if alarm_value: + if alarm_value is not None: alarm_record['alarmValue'] = alarm_value - if alarm_np_img: + if alarm_np_img is not None: alarm_record['alarmImage'] = image_to_base64(alarm_np_img) + log_data = copy.deepcopy(alarm_record) + if log_data.get('alarmImage', None): + log_data.pop('alarmImage') push_config = self.global_config.get_alarm_push_config() - self.thread_pool.submit_task(send_request, push_config.upload_url, alarm_record) + self.thread_pool.submit_task(send_request, push_config.push_url, alarm_record, log_data) self.device_alarm_upload_time[device_code][alarm_type] = current_time diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index 1f25d14..c1696af 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -4,7 +4,8 @@ from copy import deepcopy from datetime import datetime -from flatbuffers.builder import np +import numpy as np +import asyncio from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad @@ -15,11 +16,13 @@ from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager from common.image_plotting import Annotator, colors +from db.database import get_db from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter from scene_handler.base_scene_handler import BaseSceneHandler from scene_handler.limit_space_scene_handler import is_overlapping +from services.data_gas_service import DataGasService from services.global_config import GlobalConfig from tcp.tcp_manager import TcpManager @@ -43,102 +46,102 @@ ALARM_DICT = [ { 'alarmCategory': 0, - 'alarmType': '1', + 'alarmType': '14', 'handelType': 1, 'category_order': 1, 'class_idx': [34], 'alarm_name': 'no_fire_extinguisher', 'alarmContent': '未检测到灭火器', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '2', + 'alarmType': '15', 'handelType': 1, 'category_order': 2, 'class_idx': [43], 'alarm_name': 'no_barrier_tape', 'alarmContent': '未检测到警戒线', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '3', + 'alarmType': '16', 'handelType': 1, 'category_order': 3, 'class_idx': [48], 'alarm_name': 'no_cone', 'alarmContent': '未检测到锥桶', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '4', + 'alarmType': '17', 'handelType': 1, 'category_order': 4, 'class_idx': [4, 5, 16], 'alarm_name': 'no_board', 'alarmContent': '未检测到指示牌', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '5', + 'alarmType': '2', 'handelType': 2, 'category_order': -1, 'class_idx': [18], 'alarm_name': 'no_helmet', 'alarmContent': '未佩戴安全帽', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', 'label': '未佩戴安全帽', }, # todo 明火 { 'alarmCategory': 1, - 'alarmType': '7', + 'alarmType': '1', 'handelType': 3, 'category_order': 1, 'class_idx': [], 'alarm_name': 'gas_alarm', 'alarmContent': '甲烷浓度超限', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8', 'label': '', }, { 'alarmCategory': 1, - 'alarmType': '8', + 'alarmType': '', 'handelType': 3, 'category_order': 2, 'class_idx': [], 'alarm_name': 'harmful_alarm', 'alarmContent': '有害气体浓度超标', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', 'label': '', }, { 'alarmCategory': 2, - 'alarmType': '9', + 'alarmType': '18', 'handelType': 3, 'category_order': -1, 'class_idx': [], 'alarm_name': 'health_alarm', 'alarmContent': '心率血氧异常', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', 'label': '', }, { 'alarmCategory': 3, - 'alarmType': '10', + 'alarmType': '3', 'handelType': 2, 'category_order': 4, 'class_idx': [24], 'alarm_name': 'break_in_alarm', 'alarmContent': '非法闯入', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', 'label': '非法闯入', }, @@ -158,7 +161,9 @@ self.thread_pool = GlobalThreadPool() self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, - category_priority={2: 0, 1: 1, 3: 2, 0: 3}) + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={2: 0, 1: 1, 3: 2, + 0: 3}) # (优先级:2 > 1 > 3 > 0) self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() @@ -168,6 +173,7 @@ self.health_device_codes = ['HWIH061000056395'] self.harmful_device_codes = ['862635063168165A'] + self.thread_pool.submit_task(self.gas_data_task, device.code) for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: @@ -176,15 +182,15 @@ self.thread_pool.submit_task(self.alarm_message_center.process_messages) # todo 明火 - self.model = YOLO('weights/labor-v8-20241114.pt') + self.model = YOLO('weights/labor-v8-20250115-fp16.engine') self.model_classes = { - 0: '三脚架', + # 0: '三脚架', 3: '人', 4: '作业信息公示牌', 6: '危险告知牌', 9: '反光衣', - 11: '呼吸面罩', - 13: '四合一', + # 11: '呼吸面罩', + # 13: '四合一', 15: '头', 16: '安全告知牌', 18: '安全帽', @@ -198,19 +204,24 @@ self.PERSON_CLASS_IDX = 3 self.HEAD_CLASS_IDX = 15 + self.vid_stride = 3 + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, - device_thread_id=thread_id) + device_thread_id=thread_id, vid_stride=self.vid_stride) self.range_points = range_points self.abs_range_points = self.get_absolute_range() self.tracking_status = {} # 跟踪每个行人的状态 - self.max_missing_frames = 25 # 报警的阈值 - self.disappear_threshold = 25 * 3 # 移除行人的阈值 + self.max_missing_frames = 25 / self.vid_stride # 报警的阈值 + self.disappear_threshold = 25 * 3 / self.vid_stride # 移除行人的阈值 self.frames_detected = 0 self.fps_ts = None def get_absolute_range(self): + if not self.range_points: + return None + fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: abs_points = [] @@ -226,6 +237,31 @@ else: return None + def gas_data_task(self, tree_device_code): + while not self.__stop_event.is_set(): + asyncio.run_coroutine_threadsafe( + self.handle_gas_alarm(tree_device_code), self.main_loop + ) + time.sleep(5) + + async def handle_gas_alarm(self, tree_device_code): + try: + async for db in get_db(): + gasService = DataGasService(db) + gas_data = await gasService.latest_query(tree_device_code) + if gas_data and time.time() - gas_data.ts.timestamp() < 60: + if gas_data.gas_value > 200.0: + alarm_dict = [d for d in ALARM_DICT if + d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], + alarm_np_img=None,alarm_value=gas_data.gas_value) + except Exception as e: + print(f"Error in latest_query: {e}") + # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): @@ -249,12 +285,12 @@ uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") if last_ts is None or (uptime.timestamp() - last_ts) > 0: self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() - if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 ch4 = data.get('ch4') co = data.get('co') h2s = data.get('h2s') o2 = data.get('o2') - self.handle_query_harmful_gas_alarm(harmful_device_code,ch4, co, h2s, o2) + self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2) time.sleep(5) def health_data_task(self, helmet_code): @@ -280,7 +316,7 @@ def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' f'upload_timestamp = {upload_timestamp}') - if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: + if heartrate < 50 or heartrate > 120 or blood_oxygen < 85: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) @@ -291,7 +327,7 @@ or float(co) > 10.0 \ or float(h2s) > 120.0 \ or float(o2) < 15: - alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) @@ -308,7 +344,7 @@ alarm = gas_value > 10 if alarm: - alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 @@ -343,7 +379,7 @@ color=box_color, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 1: # 检测不到报警 if object_boxes: @@ -362,7 +398,7 @@ color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 2: # 人未穿戴报警 person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] @@ -410,7 +446,7 @@ color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) def handle_break_in_alarm(self, frames, result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] @@ -486,9 +522,11 @@ if not frames: continue + t1 = time.time() self.device_status_manager.set_status(device_id=self.device.id) result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + t2 = time.time() for idx, frame_boxes in enumerate(result_boxes): current_person_ids = {int(box.id) for box in frame_boxes if box.cls is not None and box.id is not None and int( @@ -507,19 +545,23 @@ self.handle_behave_alarm(frames, result_boxes) self.handle_break_in_alarm(frames, result_boxes) + t3 = time.time() # for person_id in self.tracking_status.keys(): # print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') - # for idx, frame in enumerate(frames): - # annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") - # frame_boxes = result_boxes[idx] - # for s_box in frame_boxes: - # annotator.box_label(s_box.xyxy.cpu().squeeze(), - # f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", - # color=colors(int(s_box.cls)), - # rotated=False) - # self.display_frame_manager.add_frame(self.device.id, annotator.result()) + for idx, frame in enumerate(frames): + annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + frame_boxes = result_boxes[idx] + for s_box in frame_boxes: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + self.display_frame_manager.add_frame(self.device.id, annotator.result()) # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + + t4 = time.time() + print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms') self.log_fps(len(frames)) except Exception as ex: diff --git a/services/data_gas_service.py b/services/data_gas_service.py index 6ac5665..cf3ea14 100644 --- a/services/data_gas_service.py +++ b/services/data_gas_service.py @@ -20,12 +20,12 @@ return data_gas async def get_data_gas_page(self, - device_code: Optional[str] = None, - start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, - offset: int = 0, - limit: int = 10 - ) -> Tuple[Sequence[DataGasInfo], int]: + device_code: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + offset: int = 0, + limit: int = 10 + ) -> Tuple[Sequence[DataGasInfo], int]: statement = self.gas_query(device_code, end_time, start_time) # 查询总记录数 @@ -40,7 +40,6 @@ results = await self.db.execute(statement) rows = results.all() - data_gas_info_list = [ DataGasInfo( id=data_gas.id, @@ -55,10 +54,10 @@ return data_gas_info_list, total # 返回分页数据和总数 async def get_data_gas_list(self, - device_code: Optional[str] = None, - start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, - ) -> Sequence[DataGasInfo]: + device_code: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + ) -> Sequence[DataGasInfo]: statement = self.gas_query(device_code, end_time, start_time) results = await self.db.execute(statement) rows = results.all() @@ -87,3 +86,11 @@ statement = statement.where(DataGas.ts <= end_time) statement = statement.order_by(DataGas.ts.desc()) return statement + + async def latest_query(self, device_code) -> DataGas: + statement = (select(DataGas) + .where(DataGas.device_code == device_code) + .order_by(DataGas.ts.desc()) + .limit(1)) + result = await self.db.execute(statement) + return result.scalars().first() diff --git a/.gitignore b/.gitignore index cdefb40..b82300a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ storage weights logs -test* \ No newline at end of file +test* +*.zip +.idea/* \ No newline at end of file diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 202213c..2202399 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -125,36 +125,46 @@ def update(self): vid_n = 0 - log_n = 0 + # log_n = 0 while not self.__stop_event.is_set(): # if not self.init: # self.init_cap() if self.cap is None: continue - if vid_n % self.vid_stride == 0: - try: - ret, frame = self.cap.read() + + try: + grabbed = self.cap.grab() + if not grabbed: + logger.info(f"{self.url} disconnect, try to reconnect...") + self.cap.release() # 释放当前的捕获对象 + self.cap = self.get_connect() # 尝试重新连接 + self.frame = None + continue # 跳过当前循环的剩余部分 + + if vid_n % self.vid_stride == 0: + ret, frame = self.cap.retrieve() if not ret: logger.info(f"{self.url} disconnect, try to reconnect...") - self.cap.release() # 释放当前的捕获对象 - self.cap = self.get_connect() # 尝试重新连接 - self.frame = None - continue # 跳过当前循环的剩余部分 - else: - vid_n += 1 - self.frame = frame - self.frames_read += 1 - if not self.frame_queue.full(): - self.frame_queue.put(frame) - if log_n % 1000 == 0: - logger.debug(f'{self.url} cap success') - log_n = (log_n + 1) % 250 - except Exception as e: - logger.error(f"{self.url} update fail", exc_info=e) - if self.cap is not None: self.cap.release() - self.frame = None - self.cap = self.get_connect() # 尝试重新连接 + self.cap = self.get_connect() + self.frame = None + continue + + self.frame = frame + self.frames_read += 1 + if not self.frame_queue.full(): + self.frame_queue.put(frame) + # if log_n % 1000 == 0: + # logger.debug(f'{self.url} cap success') + # log_n = (log_n + 1) % 1000 + vid_n = (vid_n + 1) % self.vid_stride + except Exception as e: + logger.error(f"{self.url} update fail", exc_info=e) + if self.cap is not None: + self.cap.release() + self.frame = None + self.cap = self.get_connect() # 尝试重新连接 + time.sleep(0.1) self.log_fps() def __iter__(self): diff --git a/common/http_utils.py b/common/http_utils.py index a49c6ea..405d951 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -1,3 +1,5 @@ +import json + import requests from common.global_logger import logger @@ -6,18 +8,21 @@ async def send_request_async(push_url, data): try: + headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: logger.info(f"Push to {push_url}, data = {data}") - async with session.post(push_url, json=data) as response: + async with session.post(push_url, json=data, headers=headers) as response: response_text = await response.text() logger.info(f"Response: {response.status}, {response_text}") except aiohttp.ClientError as e: logger.error(f"Failed to push data: {e}") -def send_request(push_url, data): +def send_request(push_url, data, log_data = None): try: - logger.info(f"Push to {push_url}, data = {data}") + if log_data is None: + log_data = data + logger.info(f"Push to {push_url}, data = {log_data}") response = requests.post(push_url, json=data) logger.info(f"Response: {response.status_code}, {response.text}") except requests.RequestException as e: diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 668520b..191f99f 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 729d298..051927e 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -6,7 +6,7 @@ ''' 队列消息取出规则: -- 按 alarmCategory(优先级:2 > 1 > 3 > 0)和 category_order 从小到大排序。 +- 按 alarmCategory和 category_order 从小到大排序。 - 确保每个 alarmCategory 在 10 秒内只能发送一次。 消息定时清理: @@ -38,6 +38,7 @@ message = copy.deepcopy(message_ori) message['timestamp'] = int(time.time()) # 添加消息放入队列的时间 with self.lock: + print(message) self.queue.append(message) # 动态更新优先级映射 diff --git a/scene_handler/alarm_record_center.py b/scene_handler/alarm_record_center.py index 5373cd5..2607912 100644 --- a/scene_handler/alarm_record_center.py +++ b/scene_handler/alarm_record_center.py @@ -1,5 +1,6 @@ import asyncio import base64 +import copy import time from datetime import datetime @@ -34,13 +35,11 @@ def __init__(self, save_interval=-1, main_loop=None): """ 初始化报警记录中心 - :param upload_interval: 报警上传间隔(秒),如果 <= 0,则不报警 """ self.main_loop = main_loop self.save_interval = save_interval self.thread_pool = GlobalThreadPool() self.global_config = GlobalConfig() - # self.upload_interval = upload_interval self.device_alarm_upload_time = {} # key: device_code, value: {alarm_type: last upload time} self.device_alarm_save_time = {} @@ -70,13 +69,13 @@ def need_upload_alarm_record(self, device_code, current_time, alarm_type): push_config = self.global_config.get_alarm_push_config() - if not push_config or not push_config.push_url: + if not push_config or not push_config.push_url or push_config.push_interval <= 0: return False if device_code not in self.device_alarm_upload_time: self.device_alarm_upload_time[device_code] = {} last_upload_time = self.device_alarm_upload_time[device_code].get(alarm_type) - if last_upload_time is None or (current_time - last_upload_time) > push_config.upload_interval: + if last_upload_time is None or (current_time - last_upload_time) > push_config.push_interval: return True return False @@ -118,11 +117,14 @@ 'alarmType': alarm_dict['alarmType'], 'alarmContent': alarm_dict['alarmContent'], } - if alarm_value: + if alarm_value is not None: alarm_record['alarmValue'] = alarm_value - if alarm_np_img: + if alarm_np_img is not None: alarm_record['alarmImage'] = image_to_base64(alarm_np_img) + log_data = copy.deepcopy(alarm_record) + if log_data.get('alarmImage', None): + log_data.pop('alarmImage') push_config = self.global_config.get_alarm_push_config() - self.thread_pool.submit_task(send_request, push_config.upload_url, alarm_record) + self.thread_pool.submit_task(send_request, push_config.push_url, alarm_record, log_data) self.device_alarm_upload_time[device_code][alarm_type] = current_time diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index 1f25d14..c1696af 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -4,7 +4,8 @@ from copy import deepcopy from datetime import datetime -from flatbuffers.builder import np +import numpy as np +import asyncio from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad @@ -15,11 +16,13 @@ from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager from common.image_plotting import Annotator, colors +from db.database import get_db from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter from scene_handler.base_scene_handler import BaseSceneHandler from scene_handler.limit_space_scene_handler import is_overlapping +from services.data_gas_service import DataGasService from services.global_config import GlobalConfig from tcp.tcp_manager import TcpManager @@ -43,102 +46,102 @@ ALARM_DICT = [ { 'alarmCategory': 0, - 'alarmType': '1', + 'alarmType': '14', 'handelType': 1, 'category_order': 1, 'class_idx': [34], 'alarm_name': 'no_fire_extinguisher', 'alarmContent': '未检测到灭火器', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '2', + 'alarmType': '15', 'handelType': 1, 'category_order': 2, 'class_idx': [43], 'alarm_name': 'no_barrier_tape', 'alarmContent': '未检测到警戒线', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '3', + 'alarmType': '16', 'handelType': 1, 'category_order': 3, 'class_idx': [48], 'alarm_name': 'no_cone', 'alarmContent': '未检测到锥桶', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '4', + 'alarmType': '17', 'handelType': 1, 'category_order': 4, 'class_idx': [4, 5, 16], 'alarm_name': 'no_board', 'alarmContent': '未检测到指示牌', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', 'label': '', }, { 'alarmCategory': 0, - 'alarmType': '5', + 'alarmType': '2', 'handelType': 2, 'category_order': -1, 'class_idx': [18], 'alarm_name': 'no_helmet', 'alarmContent': '未佩戴安全帽', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', 'label': '未佩戴安全帽', }, # todo 明火 { 'alarmCategory': 1, - 'alarmType': '7', + 'alarmType': '1', 'handelType': 3, 'category_order': 1, 'class_idx': [], 'alarm_name': 'gas_alarm', 'alarmContent': '甲烷浓度超限', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8', 'label': '', }, { 'alarmCategory': 1, - 'alarmType': '8', + 'alarmType': '', 'handelType': 3, 'category_order': 2, 'class_idx': [], 'alarm_name': 'harmful_alarm', 'alarmContent': '有害气体浓度超标', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', 'label': '', }, { 'alarmCategory': 2, - 'alarmType': '9', + 'alarmType': '18', 'handelType': 3, 'category_order': -1, 'class_idx': [], 'alarm_name': 'health_alarm', 'alarmContent': '心率血氧异常', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', 'label': '', }, { 'alarmCategory': 3, - 'alarmType': '10', + 'alarmType': '3', 'handelType': 2, 'category_order': 4, 'class_idx': [24], 'alarm_name': 'break_in_alarm', 'alarmContent': '非法闯入', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', 'label': '非法闯入', }, @@ -158,7 +161,9 @@ self.thread_pool = GlobalThreadPool() self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, - category_priority={2: 0, 1: 1, 3: 2, 0: 3}) + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={2: 0, 1: 1, 3: 2, + 0: 3}) # (优先级:2 > 1 > 3 > 0) self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() @@ -168,6 +173,7 @@ self.health_device_codes = ['HWIH061000056395'] self.harmful_device_codes = ['862635063168165A'] + self.thread_pool.submit_task(self.gas_data_task, device.code) for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: @@ -176,15 +182,15 @@ self.thread_pool.submit_task(self.alarm_message_center.process_messages) # todo 明火 - self.model = YOLO('weights/labor-v8-20241114.pt') + self.model = YOLO('weights/labor-v8-20250115-fp16.engine') self.model_classes = { - 0: '三脚架', + # 0: '三脚架', 3: '人', 4: '作业信息公示牌', 6: '危险告知牌', 9: '反光衣', - 11: '呼吸面罩', - 13: '四合一', + # 11: '呼吸面罩', + # 13: '四合一', 15: '头', 16: '安全告知牌', 18: '安全帽', @@ -198,19 +204,24 @@ self.PERSON_CLASS_IDX = 3 self.HEAD_CLASS_IDX = 15 + self.vid_stride = 3 + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, - device_thread_id=thread_id) + device_thread_id=thread_id, vid_stride=self.vid_stride) self.range_points = range_points self.abs_range_points = self.get_absolute_range() self.tracking_status = {} # 跟踪每个行人的状态 - self.max_missing_frames = 25 # 报警的阈值 - self.disappear_threshold = 25 * 3 # 移除行人的阈值 + self.max_missing_frames = 25 / self.vid_stride # 报警的阈值 + self.disappear_threshold = 25 * 3 / self.vid_stride # 移除行人的阈值 self.frames_detected = 0 self.fps_ts = None def get_absolute_range(self): + if not self.range_points: + return None + fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: abs_points = [] @@ -226,6 +237,31 @@ else: return None + def gas_data_task(self, tree_device_code): + while not self.__stop_event.is_set(): + asyncio.run_coroutine_threadsafe( + self.handle_gas_alarm(tree_device_code), self.main_loop + ) + time.sleep(5) + + async def handle_gas_alarm(self, tree_device_code): + try: + async for db in get_db(): + gasService = DataGasService(db) + gas_data = await gasService.latest_query(tree_device_code) + if gas_data and time.time() - gas_data.ts.timestamp() < 60: + if gas_data.gas_value > 200.0: + alarm_dict = [d for d in ALARM_DICT if + d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], + alarm_np_img=None,alarm_value=gas_data.gas_value) + except Exception as e: + print(f"Error in latest_query: {e}") + # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): @@ -249,12 +285,12 @@ uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") if last_ts is None or (uptime.timestamp() - last_ts) > 0: self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() - if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 ch4 = data.get('ch4') co = data.get('co') h2s = data.get('h2s') o2 = data.get('o2') - self.handle_query_harmful_gas_alarm(harmful_device_code,ch4, co, h2s, o2) + self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2) time.sleep(5) def health_data_task(self, helmet_code): @@ -280,7 +316,7 @@ def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' f'upload_timestamp = {upload_timestamp}') - if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: + if heartrate < 50 or heartrate > 120 or blood_oxygen < 85: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) @@ -291,7 +327,7 @@ or float(co) > 10.0 \ or float(h2s) > 120.0 \ or float(o2) < 15: - alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) @@ -308,7 +344,7 @@ alarm = gas_value > 10 if alarm: - alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 @@ -343,7 +379,7 @@ color=box_color, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 1: # 检测不到报警 if object_boxes: @@ -362,7 +398,7 @@ color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 2: # 人未穿戴报警 person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] @@ -410,7 +446,7 @@ color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, - annotator.result()) + alarm_np_img=annotator.result()) def handle_break_in_alarm(self, frames, result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] @@ -486,9 +522,11 @@ if not frames: continue + t1 = time.time() self.device_status_manager.set_status(device_id=self.device.id) result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + t2 = time.time() for idx, frame_boxes in enumerate(result_boxes): current_person_ids = {int(box.id) for box in frame_boxes if box.cls is not None and box.id is not None and int( @@ -507,19 +545,23 @@ self.handle_behave_alarm(frames, result_boxes) self.handle_break_in_alarm(frames, result_boxes) + t3 = time.time() # for person_id in self.tracking_status.keys(): # print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') - # for idx, frame in enumerate(frames): - # annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") - # frame_boxes = result_boxes[idx] - # for s_box in frame_boxes: - # annotator.box_label(s_box.xyxy.cpu().squeeze(), - # f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", - # color=colors(int(s_box.cls)), - # rotated=False) - # self.display_frame_manager.add_frame(self.device.id, annotator.result()) + for idx, frame in enumerate(frames): + annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + frame_boxes = result_boxes[idx] + for s_box in frame_boxes: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + self.display_frame_manager.add_frame(self.device.id, annotator.result()) # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + + t4 = time.time() + print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms') self.log_fps(len(frames)) except Exception as ex: diff --git a/services/data_gas_service.py b/services/data_gas_service.py index 6ac5665..cf3ea14 100644 --- a/services/data_gas_service.py +++ b/services/data_gas_service.py @@ -20,12 +20,12 @@ return data_gas async def get_data_gas_page(self, - device_code: Optional[str] = None, - start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, - offset: int = 0, - limit: int = 10 - ) -> Tuple[Sequence[DataGasInfo], int]: + device_code: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + offset: int = 0, + limit: int = 10 + ) -> Tuple[Sequence[DataGasInfo], int]: statement = self.gas_query(device_code, end_time, start_time) # 查询总记录数 @@ -40,7 +40,6 @@ results = await self.db.execute(statement) rows = results.all() - data_gas_info_list = [ DataGasInfo( id=data_gas.id, @@ -55,10 +54,10 @@ return data_gas_info_list, total # 返回分页数据和总数 async def get_data_gas_list(self, - device_code: Optional[str] = None, - start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, - ) -> Sequence[DataGasInfo]: + device_code: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + ) -> Sequence[DataGasInfo]: statement = self.gas_query(device_code, end_time, start_time) results = await self.db.execute(statement) rows = results.all() @@ -87,3 +86,11 @@ statement = statement.where(DataGas.ts <= end_time) statement = statement.order_by(DataGas.ts.desc()) return statement + + async def latest_query(self, device_code) -> DataGas: + statement = (select(DataGas) + .where(DataGas.device_code == device_code) + .order_by(DataGas.ts.desc()) + .limit(1)) + result = await self.db.execute(statement) + return result.scalars().first() diff --git a/tcp/tcp_client_connector.py b/tcp/tcp_client_connector.py index de6205f..8dbd86c 100644 --- a/tcp/tcp_client_connector.py +++ b/tcp/tcp_client_connector.py @@ -1,5 +1,7 @@ import asyncio -from collections import deque +import copy +import json + from datetime import datetime from common.byte_utils import format_bytes @@ -97,6 +99,10 @@ logger.error(f"连接到 {self.ip}:{self.port} 失败,错误: {e}") logger.info(f"{self.reconnect_interval} 秒后将重连到 {self.ip}:{self.port}") await asyncio.sleep(self.reconnect_interval) + except Exception as e: + # 给未知异常一个兜底打印,方便排查 + logger.exception(f"连接到 {self.ip}:{self.port} 遇到未知错误: {e}") + await asyncio.sleep(self.reconnect_interval) async def reconnect(self): """处理断线重连""" @@ -175,7 +181,9 @@ # 检查是否需要推送数据 if last_ts is None or (current_time - last_ts).total_seconds() > gas_push_config.push_interval: - asyncio.create_task(send_request_async(gas_push_config.push_url, data_gas.json())) + send_data = json.loads(copy.deepcopy(data_gas.json())) + send_data.pop("id") + asyncio.create_task(send_request_async(gas_push_config.push_url, send_data)) self.push_ts_dict[data_gas.device_code] = current_time # 更新推送时间戳 async def send_message(self, message: bytes, have_response=True): @@ -213,7 +221,7 @@ if have_response: async with self.read_lock: # Ensure only one coroutine reads - data = await asyncio.wait_for(self.reader.read(1024), timeout=self.timeout) + data = await asyncio.wait_for(self.reader.read(1024 * 3), timeout=self.timeout) await self.parse_response(data) return # Exit loop on success