Newer
Older
safe-algo-pro / common / ipc_utils.py
zhangyingjie 24 days ago 591 bytes 服务拆分(过程版本)
from multiprocessing.connection import Client


def send_to_tcp(device_code, message):
    try:

        text_bytes = str(device_code).encode('utf-8')
        text_len = len(text_bytes).to_bytes(4, byteorder='big')  # 4字节长度前缀

        payload = text_len + text_bytes + message

        address = ('127.0.0.1', 6000)
        secret = b'from_algo_to_tcp'
        conn = Client(address, authkey=secret)
        print(f"[IPC] send to {secret}:{payload}")
        conn.send_bytes(payload)
        conn.close()
    except Exception as e:
        print(f"[IPC] send to tcp failed: {e}")