Newer
Older
lynxi-casic-demo / server.py
zhangyingjie on 24 Jan 3 KB 增加后台接口调用
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import os
import subprocess
import time

from constants import BOX_PRIVATE_KEY
from rsa_utils import decrypt_message_with_private_key
from urllib.parse import urlparse, parse_qs

from global_logger import logger

def check_secret(encrypt_message):
    try:
        message = decrypt_message_with_private_key(BOX_PRIVATE_KEY, encrypt_message)
        command, timestamp = message.rsplit('_', 1)
        current_time_ms = int(time.time() * 1000)
        time_difference = current_time_ms - int(timestamp)
        if time_difference <= 60 * 1000 * 5:
            return command
        else:
            return 'timeout'
    except Exception as e:
        logger.exception(f"Failed to check secret: {e}")
        return None
    
def reboot():
    # 执行重启命令
    logger.info('trying to reboot')
    try:
        # 判断操作系统类型并执行相应的重启命令
        if os.name == "nt":  # Windows 系统
            subprocess.run(["shutdown", "/r", "/t", "0"])
        else:  # Linux / macOS
            subprocess.run(["sudo", "reboot"])
    except Exception as e:
        logger.exception(f"Failed to reboot: {e}")

def sync_data():
    # todo
    pass


# 定义HTTP请求处理器
class RequestHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        # 解析 URL 查询参数
        parsed_url = urlparse(self.path)
        query_params = parse_qs(parsed_url.query)
        
        # 读取并解析请求体 JSON 数据
        content_length = int(self.headers.get('Content-Length', 0))
        post_data = self.rfile.read(content_length)
        try:
            if self.headers.get("Content-Type") == "application/json":
                json_data = json.loads(post_data.decode('utf-8')) if post_data else {}
            else:
                json_data = {}
        except json.JSONDecodeError:
            self.send_response(400)
            self.send_header("Content-type", "application/json")
            self.end_headers()
            self.wfile.write(b'{"message": "Invalid JSON format"}')
            return
        
        # 检查路径是否为 /command
        if parsed_url.path == "/command":
            self.send_response(200)
            self.send_header("Content-type", "application/json")

            res_data = {}
            secret = query_params.get("secret", [None])[0]
            secret = json_data['secret']
            command = check_secret(secret)
            logger.info(f'command={command}')
            if not command:
                res_data['code'] = 500
                res_data['message'] = 'Failed to decrypt'
            elif command == 'timeout':
                res_data['code'] = 500
                res_data['message'] = 'invalid timestamp'
            else:
                res_data['code'] = 200
                res_data['message'] = 'success'
            
            response_body = json.dumps(res_data).encode('utf-8')
            self.send_header("Content-Length", str(len(response_body)))
            self.end_headers()
            self.wfile.write(response_body)

            if command == 'restart':
                reboot()
            if command == 'sync':
                sync_data()

        else:
            # 非 /reboot 请求返回 404
            self.send_response(404)
            self.end_headers()
            self.wfile.write(b'{"message": "Not Found"}')

# 启动HTTP服务器
def run_server(host="0.0.0.0", port=5000):
    server_address = (host, port)
    httpd = HTTPServer(server_address, RequestHandler)
    logger.info(f"HTTP server running on {host}:{port}")
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        logger.error("\nServer is stopping...")
        httpd.server_close()

if __name__ == "__main__":
    run_server()