January Star
  • Home
  • Categories
  • Tags
  • Archives

Queue标准库源码学习

Contents

  • 简介
  • Queue
  • PriorityQueue
  • LifoQueue

简介

在源码文件的首行就说明了该 Queue 是一个支持多生产者、多消费者的队列。

"""A multi-producer, multi-consumer queue."""

既然提到有多个生产者、多个消费者了,说明该 Queue 是支持线程安全的,并且内部的实现应该是在一个正常的队列数据结构基础加上了锁操作(这是我看到这句话所想到的,后面看代码也基本是按照这个思路来的)。

Queue

先入先出队列

支持创建一个有 maxsize 大小的队列,如果 maxsize <= 0,则该队列大小无限大(视可用内存而定)。

默认创建大小无限的队列。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        # 该队列中的核心互斥锁,任何操作都需要进行加锁解锁操作 
        self.mutex = _threading.Lock()
        # 用于通知一个元素被放到队列里了 
        self.not_empty = _threading.Condition(self.mutex)
        # 用于通知一个元素从队列里移出了 
        self.not_full = _threading.Condition(self.mutex)
        # 用于通知队列中的所有任务都取出来并且完成了 
        self.all_tasks_done = _threading.Condition(self.mutex)
        # 未完成的任务数 ( 该任务或者在队列里,或者被取出队列 )
        self.unfinished_tasks = 0

    def task_done(self):
        """ 通常在消费者线程中取出任务并执行完成后 
        通知主线程我的任务完成了 
        """
        # 获取所有任务完成的条件变量 
        self.all_tasks_done.acquire()
        try:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                # 每个任务完成后只能调用该方法一次 
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                # 通知所有任务已完成 
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished
        finally:
            self.all_tasks_done.release()

    def join(self):
        """ 通常在主线程里调用该方法,等待所有的消费者线程完成所有任务 """
        # 获取所有任务完成的条件变量 
        self.all_tasks_done.acquire()
        try:
            # 如果还有任务未完成,
            # 则一直等待所有任务完成的条件成立 
            while self.unfinished_tasks:
                self.all_tasks_done.wait()
        finally:
            self.all_tasks_done.release()

    def qsize(self):
        self.mutex.acquire()
        n = self._qsize()
        self.mutex.release()
        return n

    def empty(self):
        self.mutex.acquire()
        n = not self._qsize()
        self.mutex.release()
        return n

    def full(self):
        self.mutex.acquire()
        # 不是简单的非空逻辑,还要判断无限大小的情况 
        n = 0 < self.maxsize == self._qsize()
        self.mutex.release()
        return n

    def put(self, item, block=True, timeout=None):
        # 取得队列空闲的条件变量 
        self.not_full.acquire()
        try:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() == self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() == self.maxsize:
                        # 如果队列一直为满,
                        # 则等待队列空闲这个条件满足 
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        # 有限的时间等待队列空闲这个条件满足 
                        self.not_full.wait(remaining)
            self._put(item)
            # 放入一元素,就相当于放入一个非完成的任务 
            self.unfinished_tasks += 1
            # 通知当前队列非空 
            self.not_empty.notify()
        finally:
            self.not_full.release()

    def put_nowait(self, item):
        """ 将一个元素放置到队列里,非阻塞版本 
        如果队列满则抛出 `Full` 异常 
        """
        return self.put(item, False)

    def get(self, block=True, timeout=None):
        # 取得队列非空的条件变量 
        self.not_empty.acquire()
        try:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    # 如果当前队列一直为空,
                    # 则等待队列非空这个条件成立 
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = _time() + timeout
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    # 在有限的时间内等待队列非空这个条件成立 
                    self.not_empty.wait(remaining)
            item = self._get()
            # 通知队列有空闲空间 
            self.not_full.notify()
            return item
        finally:
            self.not_empty.release()

    def get_nowait(self):
        """ 从队列中取出一个元素,非阻塞版本 
        如果队列为空,则抛出 `Empty` 异常 
        """
        return self.get(False)

    # 下面几个方法可以进行重载来实现新的队列类型。
    # 比如下面的 `PriorityQueue` 和 `LifoQueue` 就重载了这些方法。
    def _init(self, maxsize):
        self.queue = deque()

    def _qsize(self, len=len):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.popleft()

PriorityQueue

优先级队列

内部队列实现为一个 [] ,且优先级的排序使用的是堆排 heapq 。

传入的每个元素都为一个元组 ( 优先级序号, 数据 ) 。

优先级序号越小,其优先级越高。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class PriorityQueue(Queue):

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self, len=len):
        return len(self.queue)

    def _put(self, item, heappush=heapq.heappush):
        heappush(self.queue, item)

    def _get(self, heappop=heapq.heappop):
        return heappop(self.queue)

LifoQueue

后入先出队列

内部的队列实现为一个 [] 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class LifoQueue(Queue):

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self, len=len):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()
Comments
comments powered by Disqus

Published

Sep 15, 2014

Category

Python

Tags

  • python 23
  • stdlibs 15

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor