本文共 12029 字,大约阅读时间需要 40 分钟。
Python标准库queue模块原理浅析
本文环境python3.5.2
queue模块的实现思路
作为一个线程安全的队列模块,该模块提供了线程安全的一个队列,该队列底层的实现基于Python线程threading中的Condition原理来实现的队列(对该原理的讲解可参考以前博文)。本文就先概述一下queue队列的使用示例;
import queueimport threadingq = queue.Queue()def worker(): while True: item = q.get() if item is None: break print('item ', item) q.task_done()threads = []for i in range(10): t = threading.Thread(target=worker) t.start() threads.append(t)for item in range(20): q.put(item)q.join()for i in range(10): q.put(None)for t in threads: t.join()
输出结果如下;
item 0item 1item 2item 3item 4item 5item 6item 7item 8item 9item 10item 11item 12item 13item 14item 15item 16item 17item 18item 19
由输出结果可知,队列保证了线程之间数据的安全性,通过初始化一个Queue的实例,然后向该实例中put数据,让线程get等待,此时就保证了各个线程之间互斥的取数据并进行相关的业务处理。本文来简述一下Queue类的大概流程。
Queue类的实现原理
首先查看Queue类的定义;
class Queue: '''Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. ''' def __init__(self, maxsize=0): self.maxsize = maxsize # 初始化队列的大小, self._init(maxsize) # 初始化一个队列deque实例 # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = threading.Lock() # 获取线程锁 # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = threading.Condition(self.mutex) # 获取为空的条件变量 # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) # 获取未满的条件变量 # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = threading.Condition(self.mutex) # 获取全部任务完成的条件变量 self.unfinished_tasks = 0 # 未完成的任务数量 def task_done(self): '''Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. ''' with self.all_tasks_done: # 获取完成任务对应的条件变量的锁 unfinished = self.unfinished_tasks - 1 # 未完成任务数量减一 if unfinished <= 0: # 如果减到最后小于等于0 if unfinished < 0: raise ValueError('task_done() called too many times') # 为0则报错 self.all_tasks_done.notify_all() # 如果为1 则通知等待的任务的条件变量唤醒所有等待该条件变量的线程 self.unfinished_tasks = unfinished # 重置未完成任务数量 def join(self): '''Blocks until all items in the Queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. ''' with self.all_tasks_done: # 获取任务完成条件变量 while self.unfinished_tasks: # 只要有未完成的任务 self.all_tasks_done.wait() # 则阻塞等待 def qsize(self): '''Return the approximate size of the queue (not reliable!).''' with self.mutex: # 获取线程锁 return self._qsize() # 返回队列的大小 def empty(self): '''Return True if the queue is empty, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() == 0 as a direct substitute, but be aware that either approach risks a race condition where a queue can grow before the result of empty() or qsize() can be used. To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join() method. ''' with self.mutex: # 获取线程锁 return not self._qsize() # 判断是否队列为空 def full(self): '''Return True if the queue is full, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() >= n as a direct substitute, but be aware that either approach risks a race condition where a queue can shrink before the result of full() or qsize() can be used. ''' with self.mutex: # 获取线程锁 return 0 < self.maxsize <= self._qsize() # 判断队列是否满了 def put(self, item, block=True, timeout=None): '''Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Full exception if no free slot was available within that time. Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' with self.not_full: # 获取没有满的锁 if self.maxsize > 0: # 如果次数大于0 if not block: # 是否为阻塞 if self._qsize() >= self.maxsize: # 如果现在的队列数量大于等于最大数量则报已经满了的错误 raise Full elif timeout is None: # 如果没有传入过期时间 while self._qsize() >= self.maxsize: # 如果当前的队列数量大于等于最大队列数量 self.not_full.wait() # 则阻塞直到可以往队列插入 elif timeout < 0: # 如果过期时间小于0 则报错 raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout # 获取指定的具体时间节点 while self._qsize() >= self.maxsize: # 如果当前队列数量大于等于最大队列数量 remaining = endtime - time() # 获取剩余的时间 if remaining <= 0.0: # 如果小于等于0 raise Full # 报满了错误 self.not_full.wait(remaining) # 否则等待remaining事件 self._put(item) # 添加到队列中 self.unfinished_tasks += 1 # 未完成任务数加1 self.not_empty.notify() # 调用未满条件变量唤醒等待该条件变量的线程 def get(self, block=True, timeout=None): '''Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). ''' with self.not_empty: # 获取未空的条件变量的锁 if not block: # 是否为阻塞 if not self._qsize(): # 如果为非阻塞 raise Empty # 当队列为空的时候就直接报错 elif timeout is None: # 检查传入的过期时间是否为空 while not self._qsize(): # 检查队列事务为空 self.not_empty.wait() # 队列为空则等待not_empty条件变量被唤醒,此时就陷入阻塞 elif timeout < 0: # 如果传入过期时间小于0则报错 raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout # 获取过期时间 while not self._qsize(): # 如果队列为零 remaining = endtime - time() # 计算等待的时间 if remaining <= 0.0: # 如果时间小于0 则报队列已经为空的错误 raise Empty self.not_empty.wait(remaining) # 否则阻塞等待remaining秒 item = self._get() # 获取队列的值 self.not_full.notify() # 未满的条件变量 唤醒所有等待该条件变量的线程 return item # 返回从队列中获取的值 def put_nowait(self, item): '''Put an item into the queue without blocking. Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception. ''' return self.put(item, block=False) # 如果队列已满则直接报错 def get_nowait(self): '''Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. ''' return self.get(block=False) # 如果队列为空则直接报错返回 # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held # Initialize the queue representation def _init(self, maxsize): self.queue = deque() # 初始化一个队列 def _qsize(self): return len(self.queue) # 获取队列的长度 # Put a new item in the queue def _put(self, item): self.queue.append(item) # 添加一个元素进队列 # Get an item from the queue def _get(self): return self.queue.popleft() # 弹出一个元素出队列
Queue的类的全部定义如上所述,当调用q.task_done的时候就会减少一个任务数量,然后通知join()处继续循环遍历,直到所有任务都完成之后,调用join处的线程就不会阻塞,就执行完成,这是all_task_done条件变量的主要用途;在q的put中的时候,首先先检查是否可以讲数据插入到队列中,如果能够插入则直接插入后调用not_empty的条件变量通知多有调用get的线程可以取数据,如果队列满了则等待not_full的条件变量阻塞在此,此时当一个数据被get出去的时候,则会唤醒等待插入的线程然后继续向队列中插入数据;调用q的get方法同理。本文主要就是简单的分析了一下Queue的类的实现原理,主要还是依赖于Condition类的实现来保证了队列在线程之间的数据交互的问题。
总结
本文的Queue线程安全的队列主要基于Condition的实现来完成的,实现的原理相对不算复杂,主要就是三个条件变量之间的阻塞与唤醒操作,这样保证了当达到了队列最大长度或者队列为空时,各个线程对队列的读或写的操作的原子性。本文内容相对较少,除了示例之外大家可自行查阅相关案例进行分析,鉴于本人才疏学浅,如有疏漏请批评指正。
转载地址:https://blog.csdn.net/qq_33339479/article/details/86706626 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!