import asyncio import os import shutil from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta from common.global_logger import logger from db.database import get_db from services.alarm_record_service import AlarmRecordService from services.device_frame_service import DeviceFrameService import re import gzip async def start_scheduler(): await delete_frames() await delete_alarms() await compress_day_logs() await organize_and_compress_month_logs() async def delete_frames(save_month=3): ''' 删除3个月前的帧数据 :param save_month: 默认保留3个月的帧数据 :return: ''' base_folder = './storage/frames' if not os.path.exists(base_folder): return async for db in get_db(): frame_service = DeviceFrameService(db) now = datetime.now() latest_time = now - relativedelta(months=save_month) latest_time = latest_time.replace(hour=0, minute=0, second=0, microsecond=0) max_frame_id = await frame_service.select_max_frame_id(latest_time) if max_frame_id > 0: await frame_service.delete_frame(max_frame_id) logger.info(f"Delete frames before {latest_time}, max_frame_id = {max_frame_id}") # 遍历子文件夹 for folder_name in os.listdir(base_folder): folder_path = os.path.join(base_folder, folder_name) if os.path.isdir(folder_path): try: # 尝试将文件夹名称解析为日期 folder_date = datetime.strptime(folder_name, "%Y-%m-%d") folder_date = folder_date.replace(hour=0, minute=0, second=0, microsecond=0) # 比较日期并删除早于3个月前的文件夹 if folder_date < latest_time: logger.info(f"Deleting folder: {folder_path}") shutil.rmtree(folder_path) # 递归删除文件夹及其内容 except ValueError: # 忽略无法解析为日期的文件夹名称 logger.warning(f"Skipping non-date folder: {folder_name}") async def delete_alarms(save_month=3): ''' 删除3个月前的报警记录 :param save_month: 默认保留3个月的报警记录 :return: ''' base_folder = './storage/alarms' if not os.path.exists(base_folder): return async for db in get_db(): alarm_service = AlarmRecordService(db) now = datetime.now() latest_time = now - relativedelta(months=save_month) latest_time = latest_time.replace(hour=0, minute=0, second=0, microsecond=0) max_alarm_id = await alarm_service.select_max_id(latest_time) if max_alarm_id > 0: await alarm_service.delete_frame(max_alarm_id) logger.info(f"Delete alarms before {latest_time}, max_alarm_id = {max_alarm_id}") # 遍历子文件夹 for folder_name in os.listdir(base_folder): folder_path = os.path.join(base_folder, folder_name) if os.path.isdir(folder_path): try: # 尝试将文件夹名称解析为日期 folder_date = datetime.strptime(folder_name, "%Y-%m-%d") folder_date = folder_date.replace(hour=0, minute=0, second=0, microsecond=0) # 比较日期并删除早于3个月前的文件夹 if folder_date < latest_time: logger.info(f"Deleting folder: {folder_path}") shutil.rmtree(folder_path) # 递归删除文件夹及其内容 except ValueError: # 忽略无法解析为日期的文件夹名称 logger.warning(f"Skipping non-date folder: {folder_name}") async def compress_day_logs(): """ 异步压缩日志目录中符合特定日期格式 (年-月-日) 的日志文件,并删除原始文件。 :param log_dir: 日志目录路径 :param base_filename: 日志的基础文件名,例如 'app.log' """ log_dir = 'logs' base_filename = 'app.log' loop = asyncio.get_running_loop() base_filename_pattern = re.escape(base_filename) + r"\.\d{4}-\d{2}-\d{2}" # 匹配 app.log.年-月-日 格式 for filename in os.listdir(log_dir): file_path = os.path.join(log_dir, filename) if os.path.isfile(file_path) and re.fullmatch(base_filename_pattern, filename): try: compressed_file_path = f"{file_path}.gz" # 使用线程池在后台执行文件操作以避免阻塞事件循环 await loop.run_in_executor(None, compress_and_remove, file_path, compressed_file_path) print(f"Compressed and removed: {file_path}") except Exception as e: print(f"Error compressing {file_path}: {e}") def compress_and_remove(file_path, compressed_file_path): """同步函数,用于压缩并删除文件""" with open(file_path, 'rb') as f_in: with gzip.open(compressed_file_path, 'wb') as f_out: f_out.writelines(f_in) os.remove(file_path) # 删除未压缩的旧文件 async def organize_and_compress_month_logs(): loop = asyncio.get_running_loop() base_log_dir = 'logs' # 日志目录路径 archive_dir = 'logs/archive' # 日志归档目录路径 os.makedirs(archive_dir, exist_ok=True) # 获取当前日期以确定要处理的月份 today = datetime.now() year_month_pattern = re.compile(r'\d{4}-\d{2}') # 匹配 YYYY-MM 格式 # 遍历日志目录并识别所有符合 YYYY-MM 格式的日志文件 processed_months = set() for filename in os.listdir(base_log_dir): file_path = os.path.join(base_log_dir, filename) if os.path.isfile(file_path): match = year_month_pattern.search(filename) if match: month_str = match.group(0) # 忽略当前月份 if month_str < today.strftime('%Y-%m'): processed_months.add(month_str) # 逐月处理日志文件 for month_str in sorted(processed_months): archive_path = os.path.join(archive_dir, month_str) os.makedirs(archive_path, exist_ok=True) # 移动符合该月份的日志文件 for filename in os.listdir(base_log_dir): file_path = os.path.join(base_log_dir, filename) if os.path.isfile(file_path) and month_str in filename: try: # 使用线程池执行同步的移动操作 await loop.run_in_executor(None, shutil.move, file_path, os.path.join(archive_path, filename)) except Exception as e: logger.error(f"Error moving file {filename}: {e}") # 压缩归档文件夹 archive_tar_path = os.path.join(archive_dir, f"{month_str}.tar.gz") try: # 使用线程池执行同步的压缩操作 await loop.run_in_executor(None, shutil.make_archive, archive_tar_path.replace('.tar.gz', ''), 'gztar', archive_dir, month_str) # 使用线程池执行同步的删除操作 await loop.run_in_executor(None, shutil.rmtree, archive_path) logger.info(f"Archived and compressed logs for {month_str} to {archive_tar_path}") except Exception as e: logger.error(f"Error compressing logs for {month_str}: {e}")