Contents
简介
在源码文件的首行就说明了该 Queue 是一个支持多生产者、多消费者的队列。
"""A multi-producer, multi-consumer queue."""
既然提到有多个生产者、多个消费者了,说明该 Queue 是支持线程安全的,并且内部的实现应该是在一个正常的队列数据结构基础加上了锁操作(这是我看到这句话所想到的,后面看代码也基本是按照这个思路来的)。
Queue
先入先出队列
支持创建一个有 maxsize 大小的队列,如果 maxsize <= 0,则该队列大小无限大(视可用内存而定)。
默认创建大小无限的队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
# 该队列中的核心互斥锁,任何操作都需要进行加锁解锁操作
self.mutex = _threading.Lock()
# 用于通知一个元素被放到队列里了
self.not_empty = _threading.Condition(self.mutex)
# 用于通知一个元素从队列里移出了
self.not_full = _threading.Condition(self.mutex)
# 用于通知队列中的所有任务都取出来并且完成了
self.all_tasks_done = _threading.Condition(self.mutex)
# 未完成的任务数 ( 该任务或者在队列里,或者被取出队列 )
self.unfinished_tasks = 0
def task_done(self):
""" 通常在消费者线程中取出任务并执行完成后
通知主线程我的任务完成了
"""
# 获取所有任务完成的条件变量
self.all_tasks_done.acquire()
try:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
# 每个任务完成后只能调用该方法一次
if unfinished < 0:
raise ValueError('task_done() called too many times')
# 通知所有任务已完成
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def join(self):
""" 通常在主线程里调用该方法,等待所有的消费者线程完成所有任务 """
# 获取所有任务完成的条件变量
self.all_tasks_done.acquire()
try:
# 如果还有任务未完成,
# 则一直等待所有任务完成的条件成立
while self.unfinished_tasks:
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()
def qsize(self):
self.mutex.acquire()
n = self._qsize()
self.mutex.release()
return n
def empty(self):
self.mutex.acquire()
n = not self._qsize()
self.mutex.release()
return n
def full(self):
self.mutex.acquire()
# 不是简单的非空逻辑,还要判断无限大小的情况
n = 0 < self.maxsize == self._qsize()
self.mutex.release()
return n
def put(self, item, block=True, timeout=None):
# 取得队列空闲的条件变量
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
# 如果队列一直为满,
# 则等待队列空闲这个条件满足
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
# 有限的时间等待队列空闲这个条件满足
self.not_full.wait(remaining)
self._put(item)
# 放入一元素,就相当于放入一个非完成的任务
self.unfinished_tasks += 1
# 通知当前队列非空
self.not_empty.notify()
finally:
self.not_full.release()
def put_nowait(self, item):
""" 将一个元素放置到队列里,非阻塞版本
如果队列满则抛出 `Full` 异常
"""
return self.put(item, False)
def get(self, block=True, timeout=None):
# 取得队列非空的条件变量
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
# 如果当前队列一直为空,
# 则等待队列非空这个条件成立
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
# 在有限的时间内等待队列非空这个条件成立
self.not_empty.wait(remaining)
item = self._get()
# 通知队列有空闲空间
self.not_full.notify()
return item
finally:
self.not_empty.release()
def get_nowait(self):
""" 从队列中取出一个元素,非阻塞版本
如果队列为空,则抛出 `Empty` 异常
"""
return self.get(False)
# 下面几个方法可以进行重载来实现新的队列类型。
# 比如下面的 `PriorityQueue` 和 `LifoQueue` 就重载了这些方法。
def _init(self, maxsize):
self.queue = deque()
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.popleft()
|
PriorityQueue
优先级队列
内部队列实现为一个 [] ,且优先级的排序使用的是堆排 heapq 。
传入的每个元素都为一个元组 ( 优先级序号, 数据 ) 。
优先级序号越小,其优先级越高。
1 2 3 4 5 6 7 8 9 10 11 12 13 | class PriorityQueue(Queue):
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
|
LifoQueue
后入先出队列
内部的队列实现为一个 [] 。
1 2 3 4 5 6 7 8 9 10 11 12 13 | class LifoQueue(Queue):
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
|
comments powered by Disqus