# -*- coding: utf-8 -*- """ * @file bufferpool.py * @author SDK_TEAM * @brief * @version 0.1 * @date 2022-11-3 * Copyright: * © 2018 北京灵汐科技有限公司 版权所有。 * 注意:以下内容均为北京灵汐科技有限公司原创,未经本公司允许,不得转载,否则将视为侵权;对于不遵守此声明或者其他违法使用以下内容者,本公司依法保留追究权。 * © 2018 Lynxi Technologies Co., Ltd. All rights reserved. * NOTICE: All information contained here is, and remains the property of Lynxi. This file can not * be copied or distributed without the permission of Lynxi Technologies Co., Ltd. """ from collections import deque import threading class block_queue: def __init__(self) -> None: self.__mutex = threading.Lock() self.__condtion = threading.Condition(self.__mutex) self.__queue = deque() def put(self, item): with self.__mutex: self.__queue.append(item) self.__condtion.notify() def take(self): with self.__mutex: while len(self.__queue) == 0: self.__condtion.wait() item = self.__queue.popleft() return item def size(self): with self.__mutex: ret = len(self.__queue) return ret