Newer
Older
lynxi-casic-demo / blockqueue.py
# -*- coding: utf-8 -*-
"""
 * @file bufferpool.py
 * @author SDK_TEAM
 * @brief
 * @version 0.1
 * @date 2022-11-3
 * Copyright:
 * © 2018 北京灵汐科技有限公司 版权所有。
 * 注意:以下内容均为北京灵汐科技有限公司原创,未经本公司允许,不得转载,否则将视为侵权;对于不遵守此声明或者其他违法使用以下内容者,本公司依法保留追究权。
 * © 2018 Lynxi Technologies Co., Ltd. All rights reserved.
 * NOTICE: All information contained here is, and remains the property of Lynxi. This file can not
 * be copied or distributed without the permission of Lynxi Technologies Co., Ltd.
"""

from collections import deque
import threading


class block_queue:
    def __init__(self) -> None:
        self.__mutex = threading.Lock()
        self.__condtion = threading.Condition(self.__mutex)
        self.__queue = deque()

    def put(self, item):
        with self.__mutex:
            self.__queue.append(item)
            self.__condtion.notify()

    def take(self):
        with self.__mutex:
            while len(self.__queue) == 0:
                self.__condtion.wait()
            item = self.__queue.popleft()
            return item

    def size(self):
        with self.__mutex:
            ret = len(self.__queue)
            return ret