Newer
Older
safe-algo-pro / services / schedule_job.py
import asyncio
import os
import shutil
from datetime import datetime, timedelta

from dateutil.relativedelta import relativedelta

from common.global_logger import logger
from db.database import get_db
from services.alarm_record_service import AlarmRecordService
from services.device_frame_service import DeviceFrameService


import re
import gzip


async def start_scheduler():
    await delete_frames()
    await delete_alarms()
    await compress_day_logs()
    await organize_and_compress_month_logs()


async def delete_frames(save_month=3):
    '''
    删除3个月前的帧数据
    :param save_month: 默认保留3个月的帧数据
    :return:
    '''
    base_folder = './storage/frames'
    if not os.path.exists(base_folder):
        return

    async for db in get_db():
        frame_service = DeviceFrameService(db)

        now = datetime.now()
        latest_time = now - relativedelta(months=save_month)
        latest_time = latest_time.replace(hour=0, minute=0, second=0, microsecond=0)

        max_frame_id = await frame_service.select_max_frame_id(latest_time)
        if max_frame_id > 0:
            await frame_service.delete_frame(max_frame_id)
        logger.info(f"Delete frames before {latest_time}, max_frame_id = {max_frame_id}")

        # 遍历子文件夹
        for folder_name in os.listdir(base_folder):
            folder_path = os.path.join(base_folder, folder_name)
            if os.path.isdir(folder_path):
                try:
                    # 尝试将文件夹名称解析为日期
                    folder_date = datetime.strptime(folder_name, "%Y-%m-%d")
                    folder_date = folder_date.replace(hour=0, minute=0, second=0, microsecond=0)
                    # 比较日期并删除早于3个月前的文件夹
                    if folder_date < latest_time:
                        logger.info(f"Deleting folder: {folder_path}")
                        shutil.rmtree(folder_path)  # 递归删除文件夹及其内容
                except ValueError:
                    # 忽略无法解析为日期的文件夹名称
                    logger.warning(f"Skipping non-date folder: {folder_name}")

async def delete_alarms(save_month=3):
    '''
    删除3个月前的报警记录
    :param save_month: 默认保留3个月的报警记录
    :return:
    '''
    base_folder = './storage/alarms'
    if not os.path.exists(base_folder):
        return

    async for db in get_db():
        alarm_service = AlarmRecordService(db)

        now = datetime.now()
        latest_time = now - relativedelta(months=save_month)
        latest_time = latest_time.replace(hour=0, minute=0, second=0, microsecond=0)

        max_alarm_id = await alarm_service.select_max_id(latest_time)
        if max_alarm_id > 0:
            await alarm_service.delete_frame(max_alarm_id)
        logger.info(f"Delete alarms before {latest_time}, max_alarm_id = {max_alarm_id}")

        # 遍历子文件夹
        for folder_name in os.listdir(base_folder):
            folder_path = os.path.join(base_folder, folder_name)
            if os.path.isdir(folder_path):
                try:
                    # 尝试将文件夹名称解析为日期
                    folder_date = datetime.strptime(folder_name, "%Y-%m-%d")
                    folder_date = folder_date.replace(hour=0, minute=0, second=0, microsecond=0)
                    # 比较日期并删除早于3个月前的文件夹
                    if folder_date < latest_time:
                        logger.info(f"Deleting folder: {folder_path}")
                        shutil.rmtree(folder_path)  # 递归删除文件夹及其内容
                except ValueError:
                    # 忽略无法解析为日期的文件夹名称
                    logger.warning(f"Skipping non-date folder: {folder_name}")

async def compress_day_logs():
    """
    异步压缩日志目录中符合特定日期格式 (年-月-日) 的日志文件,并删除原始文件。

    :param log_dir: 日志目录路径
    :param base_filename: 日志的基础文件名,例如 'app.log'
    """
    log_dir = 'logs'
    base_filename = 'app.log'
    loop = asyncio.get_running_loop()
    base_filename_pattern = re.escape(base_filename) + r"\.\d{4}-\d{2}-\d{2}"  # 匹配 app.log.年-月-日 格式

    for filename in os.listdir(log_dir):
        file_path = os.path.join(log_dir, filename)
        if os.path.isfile(file_path) and re.fullmatch(base_filename_pattern, filename):
            try:
                compressed_file_path = f"{file_path}.gz"
                # 使用线程池在后台执行文件操作以避免阻塞事件循环
                await loop.run_in_executor(None, compress_and_remove, file_path, compressed_file_path)
                print(f"Compressed and removed: {file_path}")
            except Exception as e:
                print(f"Error compressing {file_path}: {e}")


def compress_and_remove(file_path, compressed_file_path):
    """同步函数,用于压缩并删除文件"""
    with open(file_path, 'rb') as f_in:
        with gzip.open(compressed_file_path, 'wb') as f_out:
            f_out.writelines(f_in)
    os.remove(file_path)  # 删除未压缩的旧文件


async def organize_and_compress_month_logs():
    loop = asyncio.get_running_loop()

    base_log_dir = 'logs'  # 日志目录路径
    archive_dir = 'logs/archive'  # 日志归档目录路径
    os.makedirs(archive_dir, exist_ok=True)

    # 获取当前日期以确定要处理的月份
    today = datetime.now()
    year_month_pattern = re.compile(r'\d{4}-\d{2}')  # 匹配 YYYY-MM 格式

    # 遍历日志目录并识别所有符合 YYYY-MM 格式的日志文件
    processed_months = set()
    for filename in os.listdir(base_log_dir):
        file_path = os.path.join(base_log_dir, filename)
        if os.path.isfile(file_path):
            match = year_month_pattern.search(filename)
            if match:
                month_str = match.group(0)
                # 忽略当前月份
                if month_str < today.strftime('%Y-%m'):
                    processed_months.add(month_str)

    # 逐月处理日志文件
    for month_str in sorted(processed_months):
        archive_path = os.path.join(archive_dir, month_str)
        os.makedirs(archive_path, exist_ok=True)

        # 移动符合该月份的日志文件
        for filename in os.listdir(base_log_dir):
            file_path = os.path.join(base_log_dir, filename)
            if os.path.isfile(file_path) and month_str in filename:
                try:
                    # 使用线程池执行同步的移动操作
                    await loop.run_in_executor(None, shutil.move, file_path, os.path.join(archive_path, filename))
                except Exception as e:
                    logger.error(f"Error moving file {filename}: {e}")

        # 压缩归档文件夹
        archive_tar_path = os.path.join(archive_dir, f"{month_str}.tar.gz")
        try:
            # 使用线程池执行同步的压缩操作
            await loop.run_in_executor(None, shutil.make_archive, archive_tar_path.replace('.tar.gz', ''), 'gztar',
                                       archive_dir, month_str)
            # 使用线程池执行同步的删除操作
            await loop.run_in_executor(None, shutil.rmtree, archive_path)
            logger.info(f"Archived and compressed logs for {month_str} to {archive_tar_path}")
        except Exception as e:
            logger.error(f"Error compressing logs for {month_str}: {e}")