diff --git a/scene_handler/helmet_data_processor.py b/scene_handler/helmet_data_processor.py index 5d19c54..af46aec 100644 --- a/scene_handler/helmet_data_processor.py +++ b/scene_handler/helmet_data_processor.py @@ -39,7 +39,7 @@ } self.last_ts = None # 上次读取的数据的生成时间戳 - def getNewData(self): + def getNewData(self, retry_until_success=True): """ 阻塞进程 :return: @@ -60,7 +60,11 @@ return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') else: print("无法访问到心率血氧数据") - time.sleep(5) + + if not retry_until_success: + return None, None # 只尝试一次,直接返回空值 + else: + time.sleep(5) def isDataNormal(self, blood_oxygen, heartrate): if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 diff --git a/scene_handler/helmet_data_processor.py b/scene_handler/helmet_data_processor.py index 5d19c54..af46aec 100644 --- a/scene_handler/helmet_data_processor.py +++ b/scene_handler/helmet_data_processor.py @@ -39,7 +39,7 @@ } self.last_ts = None # 上次读取的数据的生成时间戳 - def getNewData(self): + def getNewData(self, retry_until_success=True): """ 阻塞进程 :return: @@ -60,7 +60,11 @@ return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') else: print("无法访问到心率血氧数据") - time.sleep(5) + + if not retry_until_success: + return None, None # 只尝试一次,直接返回空值 + else: + time.sleep(5) def isDataNormal(self, blood_oxygen, heartrate): if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 diff --git a/scene_handler/internet_limit_space_scene_handler.py b/scene_handler/internet_limit_space_scene_handler.py index cf9199e..5213e22 100644 --- a/scene_handler/internet_limit_space_scene_handler.py +++ b/scene_handler/internet_limit_space_scene_handler.py @@ -355,7 +355,8 @@ if float(ch4) > 10.0 \ or float(co) > 10.0 \ or float(h2s) > 120.0 \ - or float(o2) < 15: + or float(o2) < 19: + logger.info("四合一数据异常") return False # 气体异常 else: return True # 气体正常 @@ -404,8 +405,9 @@ class Laobaocheck(): def __init__(self, eventController=None, alarm=None): - self.laobao_model = YOLO("weights/labor-v8-20241114.pt") - self.jiaodi_model = YOLO("weights/jiaodi.pt") + # self.laobao_model = YOLO("weights/labor-v8-20250115.engine",task='detect') + self.laobao_model = YOLO("weights/labor-v8-20241114.pt", task='detect') + self.jiaodi_model = YOLO("weights/jiaodi.pt", task='classify') self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} # OD 模型有无检测这些目标 @@ -590,7 +592,8 @@ class YinHuanCheck: def __init__(self, device_code, eventController, alarm, frame_threshold=20, batch_size=1): # 初始化YOLO模型及其他参数 - self.model = YOLO("weights/yinhuan.pt") + # self.model = YOLO("weights/yinhuan.engine",task='detect') + self.model = YOLO("weights/yinhuan.pt", task='detect') self.eventController = eventController self.alarm = alarm self.device_code = device_code @@ -633,7 +636,7 @@ skip = True if skip: return [] - + people_results = self.model.track(source=frames, conf=0.6, classes=[0], save=False, verbose=False) # 检测人(类别0) results = [] @@ -655,32 +658,61 @@ 针对每个人的图像区域进行目标检测(安全帽、工服、烟头、电话、袖标)。 返回格式:list,每项为字典,键为 person_id,值为该人身上检测到的目标字典 {label: xyxy} """ - # 收集所有人的图像区域 - person_images = [] - for frame_persons in people_results: - for info in frame_persons.values(): - person_images.append(info['crop']) - if len(person_images) == 0: - return [] - # 对所有人的区域进行目标检测 - results = self.model.predict(source=person_images, conf=0.6, classes=[2, 3, 4, 5, 6], - save=False, verbose=False) - person_detect_targets = [] - result_idx = 0 - for frame_persons in people_results: - frame_detection = {} - for person_id in frame_persons.keys(): - person_result = results[result_idx] + batch_size = self.batch_size # 设定固定的批次大小 + + # 收集所有人的图像和ID + all_images = [] + all_ids = [] + all_frame_indices = [] + + for frame_idx, frame_persons in enumerate(people_results): + for person_id, info in frame_persons.items(): + all_images.append(info['crop']) + all_ids.append(person_id) + all_frame_indices.append(frame_idx) + + # 如果没有检测到人,直接返回空结果 + if not all_images: + return [{}] * len(people_results) + + # 初始化返回结果列表 + person_detect_targets = [{} for _ in range(len(people_results))] + + # 将图像列表按batch_size分组处理 + for i in range(0, len(all_images), batch_size): + batch_images = all_images[i:i + batch_size] + batch_ids = all_ids[i:i + batch_size] + batch_frames = all_frame_indices[i:i + batch_size] + + # 如果这个批次不足batch_size,用第一张图填充 + while len(batch_images) < batch_size: + batch_images.append(batch_images[0]) + + # 进行目标检测 + results = self.model.predict(source=batch_images[:batch_size], + conf=0.6, + classes=[2, 3, 4, 5, 6], + save=False, + verbose=False) + + # 只处理实际的图像结果(忽略填充的结果) + valid_count = min(len(batch_ids), len(results)) + for j in range(valid_count): + frame_idx = batch_frames[j] + person_id = batch_ids[j] + result = results[j] + + # 为当前人物创建检测结果字典 detection_dict = {} - for box in person_result.boxes: + for box in result.boxes: label = int(box.cls.item()) label_name = self.id2name(label) xyxy = box.xyxy.squeeze().tolist() - # 假设每个box只对应一种类别,直接存入字典 detection_dict[label_name[0]] = xyxy - frame_detection[person_id] = detection_dict - result_idx += 1 - person_detect_targets.append(frame_detection) + + # 将结果添加到对应帧的字典中 + person_detect_targets[frame_idx][person_id] = detection_dict + return person_detect_targets def annotate_alarm(self, frame, condition, person_box=None, detection=None): @@ -1042,8 +1074,10 @@ self.start_time = time.time() # 脚本启动时间戳 print(f'start time = {self.start_time}') + self.batch_size = 1 + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, - device_thread_id=thread_id) + device_thread_id=thread_id, batch_size=self.batch_size) self.executor = ThreadPoolExecutor(max_workers=10) self.loop = asyncio.get_running_loop() @@ -1052,7 +1086,7 @@ self.alarm = Alarm(device, thread_id, tcp_manager, main_loop, self.eventController) self.laobao_check = Laobaocheck(self.eventController, self.alarm) self.yinhuan_check = YinHuanCheck(device_code=device.code, eventController=self.eventController, - alarm=self.alarm) + alarm=self.alarm, batch_size=self.batch_size) self.anQuanMaoList = [] self.siHeyiList = [] @@ -1062,12 +1096,12 @@ for health_device_code in health_device_codes: self.anQuanMaoList.append(HelmetDataProcessor(health_device_code, self.alarm.alarm_record_center)) for harmful_device_code in harmful_device_codes: - self.siHeyiList.append(SiHeYi(harmful_device_code,self.harmful_gas_manager)) + self.siHeyiList.append(SiHeYi(harmful_device_code, self.harmful_gas_manager)) if self.siHeyiList: self.siHeyiUmd = self.siHeyiList[0] # todo 暂时先用第一个,后期要有标识标明用哪个 else: - self.siHeyiUmd = SiHeYi('ZA0024587CC4CA98',self.harmful_gas_manager) + self.siHeyiUmd = SiHeYi('ZA0024587CC4CA98', self.harmful_gas_manager) async def laobaoCheck_task(self): # executor = ThreadPoolExecutor(max_workers=3) @@ -1087,8 +1121,9 @@ while True: # 模拟循环检测气体 if eventController.timeout_event.is_set(): # 超时退出 return - - ch4, co, h2s, o2 = await self.loop.run_in_executor(self.executor, self.siHeyiUmd.getNewDataRemote) # 判断气体是否合规 + print('uMDGasCheck_task') + ch4, co, h2s, o2 = await self.loop.run_in_executor(self.executor, + self.siHeyiUmd.getNewDataRemote) # 判断气体是否合规 flag = self.siHeyiUmd.isDataNormal(ch4, co, h2s, o2) if flag == False: tflag_pool.clear() @@ -1122,8 +1157,9 @@ async def xinlvCheck_task(self): def fun(anQuanMao): - blood_oxygen, heartrate = anQuanMao.getNewData() - if not anQuanMao.isDataNormal(blood_oxygen, heartrate): + print(f'xinlvCheck_task for helmet: {anQuanMao.helmet_code}') + blood_oxygen, heartrate = anQuanMao.getNewData(retry_until_success=False) + if blood_oxygen and heartrate and not anQuanMao.isDataNormal(blood_oxygen, heartrate): self.alarm.addAlarm(ALARM_DICT['health']) anQuanMao.sendAlarmRecord(blood_oxygen, heartrate) # @@ -1133,8 +1169,17 @@ # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() - for aqm in self.anQuanMaoList: - await self.loop.run_in_executor(self.executor, fun, aqm) + + # for aqm in self.anQuanMaoList: + # await self.loop.run_in_executor(self.executor, fun, aqm) + while True: + tasks = [] + for aqm in self.anQuanMaoList: + # 为每个 siHeYi 创建一个任务 + task = self.loop.run_in_executor(self.executor, fun, aqm) + tasks.append(task) + await asyncio.gather(*tasks) # 并行执行所有任务 + await asyncio.sleep(60) async def gasCheck(self): """ @@ -1143,6 +1188,7 @@ """ def fun(siHeyi): + print(f'gasCheck for siHeyi: {siHeyi.harmful_device_code}') ch4, co, h2s, o2 = siHeyi.getNewDataRemote() flag = siHeyi.isDataNormal(ch4, co, h2s, o2) if flag == False: @@ -1150,8 +1196,17 @@ # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() - for siHeYi in self.siHeyiList: - await self.loop.run_in_executor(self.executor, fun, siHeYi) + + # for siHeYi in self.siHeyiList: + # await self.loop.run_in_executor(self.executor, fun, siHeYi) + while True: + tasks = [] + for siHeYi in self.siHeyiList: + # 为每个 siHeYi 创建一个任务 + task = self.loop.run_in_executor(self.executor, fun, siHeYi) + tasks.append(task) + await asyncio.gather(*tasks) # 并行执行所有任务 + await asyncio.sleep(60) def run(self): async def fun(): @@ -1176,7 +1231,7 @@ task.add_done_callback(handle_task_exception) # alarm_task.add_done_callback(handle_task_exception) - done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=60*10) + done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=60 * 1) if uMDGasCheck_task in done and laobaoCheck_task in done: await uMDGasCheck_task diff --git a/scene_handler/helmet_data_processor.py b/scene_handler/helmet_data_processor.py index 5d19c54..af46aec 100644 --- a/scene_handler/helmet_data_processor.py +++ b/scene_handler/helmet_data_processor.py @@ -39,7 +39,7 @@ } self.last_ts = None # 上次读取的数据的生成时间戳 - def getNewData(self): + def getNewData(self, retry_until_success=True): """ 阻塞进程 :return: @@ -60,7 +60,11 @@ return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') else: print("无法访问到心率血氧数据") - time.sleep(5) + + if not retry_until_success: + return None, None # 只尝试一次,直接返回空值 + else: + time.sleep(5) def isDataNormal(self, blood_oxygen, heartrate): if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 diff --git a/scene_handler/internet_limit_space_scene_handler.py b/scene_handler/internet_limit_space_scene_handler.py index cf9199e..5213e22 100644 --- a/scene_handler/internet_limit_space_scene_handler.py +++ b/scene_handler/internet_limit_space_scene_handler.py @@ -355,7 +355,8 @@ if float(ch4) > 10.0 \ or float(co) > 10.0 \ or float(h2s) > 120.0 \ - or float(o2) < 15: + or float(o2) < 19: + logger.info("四合一数据异常") return False # 气体异常 else: return True # 气体正常 @@ -404,8 +405,9 @@ class Laobaocheck(): def __init__(self, eventController=None, alarm=None): - self.laobao_model = YOLO("weights/labor-v8-20241114.pt") - self.jiaodi_model = YOLO("weights/jiaodi.pt") + # self.laobao_model = YOLO("weights/labor-v8-20250115.engine",task='detect') + self.laobao_model = YOLO("weights/labor-v8-20241114.pt", task='detect') + self.jiaodi_model = YOLO("weights/jiaodi.pt", task='classify') self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} # OD 模型有无检测这些目标 @@ -590,7 +592,8 @@ class YinHuanCheck: def __init__(self, device_code, eventController, alarm, frame_threshold=20, batch_size=1): # 初始化YOLO模型及其他参数 - self.model = YOLO("weights/yinhuan.pt") + # self.model = YOLO("weights/yinhuan.engine",task='detect') + self.model = YOLO("weights/yinhuan.pt", task='detect') self.eventController = eventController self.alarm = alarm self.device_code = device_code @@ -633,7 +636,7 @@ skip = True if skip: return [] - + people_results = self.model.track(source=frames, conf=0.6, classes=[0], save=False, verbose=False) # 检测人(类别0) results = [] @@ -655,32 +658,61 @@ 针对每个人的图像区域进行目标检测(安全帽、工服、烟头、电话、袖标)。 返回格式:list,每项为字典,键为 person_id,值为该人身上检测到的目标字典 {label: xyxy} """ - # 收集所有人的图像区域 - person_images = [] - for frame_persons in people_results: - for info in frame_persons.values(): - person_images.append(info['crop']) - if len(person_images) == 0: - return [] - # 对所有人的区域进行目标检测 - results = self.model.predict(source=person_images, conf=0.6, classes=[2, 3, 4, 5, 6], - save=False, verbose=False) - person_detect_targets = [] - result_idx = 0 - for frame_persons in people_results: - frame_detection = {} - for person_id in frame_persons.keys(): - person_result = results[result_idx] + batch_size = self.batch_size # 设定固定的批次大小 + + # 收集所有人的图像和ID + all_images = [] + all_ids = [] + all_frame_indices = [] + + for frame_idx, frame_persons in enumerate(people_results): + for person_id, info in frame_persons.items(): + all_images.append(info['crop']) + all_ids.append(person_id) + all_frame_indices.append(frame_idx) + + # 如果没有检测到人,直接返回空结果 + if not all_images: + return [{}] * len(people_results) + + # 初始化返回结果列表 + person_detect_targets = [{} for _ in range(len(people_results))] + + # 将图像列表按batch_size分组处理 + for i in range(0, len(all_images), batch_size): + batch_images = all_images[i:i + batch_size] + batch_ids = all_ids[i:i + batch_size] + batch_frames = all_frame_indices[i:i + batch_size] + + # 如果这个批次不足batch_size,用第一张图填充 + while len(batch_images) < batch_size: + batch_images.append(batch_images[0]) + + # 进行目标检测 + results = self.model.predict(source=batch_images[:batch_size], + conf=0.6, + classes=[2, 3, 4, 5, 6], + save=False, + verbose=False) + + # 只处理实际的图像结果(忽略填充的结果) + valid_count = min(len(batch_ids), len(results)) + for j in range(valid_count): + frame_idx = batch_frames[j] + person_id = batch_ids[j] + result = results[j] + + # 为当前人物创建检测结果字典 detection_dict = {} - for box in person_result.boxes: + for box in result.boxes: label = int(box.cls.item()) label_name = self.id2name(label) xyxy = box.xyxy.squeeze().tolist() - # 假设每个box只对应一种类别,直接存入字典 detection_dict[label_name[0]] = xyxy - frame_detection[person_id] = detection_dict - result_idx += 1 - person_detect_targets.append(frame_detection) + + # 将结果添加到对应帧的字典中 + person_detect_targets[frame_idx][person_id] = detection_dict + return person_detect_targets def annotate_alarm(self, frame, condition, person_box=None, detection=None): @@ -1042,8 +1074,10 @@ self.start_time = time.time() # 脚本启动时间戳 print(f'start time = {self.start_time}') + self.batch_size = 1 + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, - device_thread_id=thread_id) + device_thread_id=thread_id, batch_size=self.batch_size) self.executor = ThreadPoolExecutor(max_workers=10) self.loop = asyncio.get_running_loop() @@ -1052,7 +1086,7 @@ self.alarm = Alarm(device, thread_id, tcp_manager, main_loop, self.eventController) self.laobao_check = Laobaocheck(self.eventController, self.alarm) self.yinhuan_check = YinHuanCheck(device_code=device.code, eventController=self.eventController, - alarm=self.alarm) + alarm=self.alarm, batch_size=self.batch_size) self.anQuanMaoList = [] self.siHeyiList = [] @@ -1062,12 +1096,12 @@ for health_device_code in health_device_codes: self.anQuanMaoList.append(HelmetDataProcessor(health_device_code, self.alarm.alarm_record_center)) for harmful_device_code in harmful_device_codes: - self.siHeyiList.append(SiHeYi(harmful_device_code,self.harmful_gas_manager)) + self.siHeyiList.append(SiHeYi(harmful_device_code, self.harmful_gas_manager)) if self.siHeyiList: self.siHeyiUmd = self.siHeyiList[0] # todo 暂时先用第一个,后期要有标识标明用哪个 else: - self.siHeyiUmd = SiHeYi('ZA0024587CC4CA98',self.harmful_gas_manager) + self.siHeyiUmd = SiHeYi('ZA0024587CC4CA98', self.harmful_gas_manager) async def laobaoCheck_task(self): # executor = ThreadPoolExecutor(max_workers=3) @@ -1087,8 +1121,9 @@ while True: # 模拟循环检测气体 if eventController.timeout_event.is_set(): # 超时退出 return - - ch4, co, h2s, o2 = await self.loop.run_in_executor(self.executor, self.siHeyiUmd.getNewDataRemote) # 判断气体是否合规 + print('uMDGasCheck_task') + ch4, co, h2s, o2 = await self.loop.run_in_executor(self.executor, + self.siHeyiUmd.getNewDataRemote) # 判断气体是否合规 flag = self.siHeyiUmd.isDataNormal(ch4, co, h2s, o2) if flag == False: tflag_pool.clear() @@ -1122,8 +1157,9 @@ async def xinlvCheck_task(self): def fun(anQuanMao): - blood_oxygen, heartrate = anQuanMao.getNewData() - if not anQuanMao.isDataNormal(blood_oxygen, heartrate): + print(f'xinlvCheck_task for helmet: {anQuanMao.helmet_code}') + blood_oxygen, heartrate = anQuanMao.getNewData(retry_until_success=False) + if blood_oxygen and heartrate and not anQuanMao.isDataNormal(blood_oxygen, heartrate): self.alarm.addAlarm(ALARM_DICT['health']) anQuanMao.sendAlarmRecord(blood_oxygen, heartrate) # @@ -1133,8 +1169,17 @@ # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() - for aqm in self.anQuanMaoList: - await self.loop.run_in_executor(self.executor, fun, aqm) + + # for aqm in self.anQuanMaoList: + # await self.loop.run_in_executor(self.executor, fun, aqm) + while True: + tasks = [] + for aqm in self.anQuanMaoList: + # 为每个 siHeYi 创建一个任务 + task = self.loop.run_in_executor(self.executor, fun, aqm) + tasks.append(task) + await asyncio.gather(*tasks) # 并行执行所有任务 + await asyncio.sleep(60) async def gasCheck(self): """ @@ -1143,6 +1188,7 @@ """ def fun(siHeyi): + print(f'gasCheck for siHeyi: {siHeyi.harmful_device_code}') ch4, co, h2s, o2 = siHeyi.getNewDataRemote() flag = siHeyi.isDataNormal(ch4, co, h2s, o2) if flag == False: @@ -1150,8 +1196,17 @@ # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() - for siHeYi in self.siHeyiList: - await self.loop.run_in_executor(self.executor, fun, siHeYi) + + # for siHeYi in self.siHeyiList: + # await self.loop.run_in_executor(self.executor, fun, siHeYi) + while True: + tasks = [] + for siHeYi in self.siHeyiList: + # 为每个 siHeYi 创建一个任务 + task = self.loop.run_in_executor(self.executor, fun, siHeYi) + tasks.append(task) + await asyncio.gather(*tasks) # 并行执行所有任务 + await asyncio.sleep(60) def run(self): async def fun(): @@ -1176,7 +1231,7 @@ task.add_done_callback(handle_task_exception) # alarm_task.add_done_callback(handle_task_exception) - done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=60*10) + done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=60 * 1) if uMDGasCheck_task in done and laobaoCheck_task in done: await uMDGasCheck_task diff --git a/services/scene_service.py b/services/scene_service.py index 3fbdf0a..764b389 100644 --- a/services/scene_service.py +++ b/services/scene_service.py @@ -175,7 +175,7 @@ select(DeviceSceneRelation) .where(DeviceSceneRelation.scene_id == scene_id) ) - relation_in_use_exec = self.db.execute(statement) + relation_in_use_exec = await self.db.execute(statement) relation_in_use = relation_in_use_exec.scalars().first() # 如果存在启用的绑定关系,提示无法删除