本文共 5922 字,大约阅读时间需要 19 分钟。
参考博客:
1、作用
Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。
2、种类
- Queue.Queue(maxsize) FIFO(先进先出队列)
- Queue.LifoQueue(maxsize) LIFO(先进后出队列)
- Queue.PriorityQueue(maxsize) 为优先级越高的越先出来,对于一个队列中的所有元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于一般的数据,优先队列取什么东西作为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority作为优先度。
3、一些方法:
FIFO是常用的队列,其一些常用的方法有:
- Queue.qsize() 返回队列大小
- Queue.empty() 判断队列是否为空
- Queue.full() 判断队列是否满了
- Queue.get([block[,timeout]]) 从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。
- Queue.put(...[,block[,timeout]]) 向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
- Queue.task_done() 从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成
- Queue.join() 监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。
消息队列实例中维护的有待完成任务变量。每接收到一个消息该值自增一次。每调用一次.task_done()可以使该值减1,当待完成任务值为0的时候,join函数才会返回。
4、queue例子:
import threadingimport Queueimport timeclass worker(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue=queue self.thread_stop=False def run(self): while not self.thread_stop: print("thread%d %s: waiting for tast" %(self.ident,self.name)) try: task=q.get(block=True, timeout=20)#接收消息 except Queue.Empty: print("Nothing to do!i will go home!") self.thread_stop=True break print("task recv:%s ,task No:%d" % (task[0],task[1])) print("i am working") time.sleep(3) print("work finished!") q.task_done()#完成一个任务 res=q.qsize()#判断消息队列大小 if res>0: print("fuck!There are still %d tasks to do" % (res)) def stop(self): self.thread_stop = True if __name__ == "__main__": q=Queue.Queue(3) worker=worker(q) worker.start() q.put(["produce one cup!",1], block=True, timeout=None)#产生任务消息 q.put(["produce one desk!",2], block=True, timeout=None) q.put(["produce one apple!",3], block=True, timeout=None) q.put(["produce one banana!",4], block=True, timeout=None) q.put(["produce one bag!",5], block=True, timeout=None) print("***************leader:wait for finish!") q.join()#等待所有任务完成 print("***************leader:all task finished!")
输出:
thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one cup! ,task No:1 i am working work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one desk! ,task No:2 i am workingleader:wait for finish! work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one apple! ,task No:3 i am working work finished! fuck!There are still 2 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one banana! ,task No:4 i am working work finished! fuck!There are still 1 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one bag! ,task No:5 i am working work finished! thread139958685849344 Thread-1: waiting for tast 1 ***************leader:all task finished! Nothing to do!i will go home!
运行一下就知道,上例中并没有性能的提升(毕竟还是只有一个线程在跑)。线程队列的意义并不是进一步提高运行效率,而是使线程的并发更加有组织。可以看到,在增加了线程队列之后,程序对于线程的并发数量就有了控制。新线程想要加入队列开始执行,必须等一个既存的线程完成之后才可以。
5、线程池:
# -*- coding:utf-8 -*-import threadingimport Queueimport timeimport randomfrom faker import Fakerclass MyThread(threading.Thread): ''' 线程模型 ''' def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue self.start() # 因为作为一个工具,线程必须永远“在线”,所以不如让它在创建完成后直接运行,省得我们手动再去start它 def run(self): while True: # 除非确认队列中已经无任务,否则时刻保持线程在运行 try: task = self.queue.get(block=False) # 如果队列空了,直接结束线程。根据具体场景不同可能不合理,可以修改 time.sleep(random.random()) # 假设处理了一段时间 print 'Task %s Done' % task # 提示信息而已 self.queue.task_done() except Exception,e: breakclass MyThreadPool(): def __init__(self,queue,size): self.queue = queue self.pool = [] for i in range(size): self.pool.append(MyThread(queue)) def joinAll(self): for thd in self.pool: if thd.isAlive(): thd.join()if __name__ == '__main__': q = Queue.Queue(10) fake = Faker() for i in range(5): q.put(fake.word()) pool = MyThreadPool(queue=q,size=2) pool.joinAll()
网上有一部分示例,将队列作为一个属性维护在了线程池类中,也不失为一种办法,我这里为了能够条理清晰,没有放在类里面。这段程序首先生成了一个maxsize是10的队列。fake.word()可以随机生成一个单词,这里仅作测试用。所以向队列中添加了5个task。
这里有个坑: 如果put的数量大于队列最大长度,而且put没有设置block=False的话,那么显然程序会阻塞在put这边。此时ThreadPool未被建立,也就是说工作线程都还没有启动,因此会引起这样一个死锁。如果把线程池的建立放到put之前也不行,此时线程发现队列为空,所以所有线程都会直接结束(当然这是线程中get的block是False的时候,如果为True那么也是死锁),最终队列中的task没人处理,程序输出为空。解决这个坑的办法,一个是像上面一样保持最开始put的量小于队列长度;第二个就是干脆不要限制队列长度,用q = Queue.Queue()生产队列即可。
好的,继续往下,进入了线程池的生成。线程池内部的列表才是真·线程池,另外其关联了queue对象,所以在创建的时候可以将队列对象传递给线程对象。线程对象在创建时就启动了,并且被添加到线程池的那个列表中。线程池的大小由参数给出,线程启动后会去队列里面get任务,并且进行处理。处理完成后进行task_done声明并且再次去尝试get。如果队列为空那么就直接抛出异常,也就是跳出循环,线程结束。
通过这样一个模型,根据线程池的大小,这才真正地给线程并发做了一个限制,可促进较大程度的资源利用。
6、进阶:
在上面这个示例中,实际上处理任务的实际逻辑是被写在了MyThread类里面。如果我们想要一个通用性更加高的工具类,那么势必要想想如何将这个线程类解耦具体逻辑。另一方面,队列中的任务的内容,不仅仅可以是字符串,也可以是任何python对象。这就使得灵活性大大提高。
比如我们可以在队列中put内容是(func, args, kwargs)这样一个元组。其中func是一个函数对象,描述了任务的处理逻辑过程,args是一个元组,代表所有func函数的匿名参数,kwargs则是func函数的所有具名参数。如此,可以将线程类的run方法改写成这样:
def run(self): while True: try: func,args,kwargs = self.queue.get() try: func(*args,**kwargs) except Exception,e: raise ('bad execution: %s' % str(e)) self.queue.task_done() except Exception,e: break
这样一个run就可以做到很大程度的解耦了。
类似的思想,线程池类和线程类也不必是一一对应的。可以将线程类作为一个参数传递给线程池类。这样一个线程池类就可以作为容器容纳各种各样的线程了。具体实例就不写了。
转载地址:https://blog.csdn.net/h_jlwg6688/article/details/108506960 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!