python系列——多线程之queue及线程池
发布日期:2021-09-30 09:33:40 浏览次数:5 分类:技术文章

本文共 5922 字,大约阅读时间需要 19 分钟。

参考博客:

1、作用

Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。

2、种类

  • Queue.Queue(maxsize)  FIFO(先进先出队列)
  •     Queue.LifoQueue(maxsize)  LIFO(先进后出队列)
  •     Queue.PriorityQueue(maxsize)  为优先级越高的越先出来,对于一个队列中的所有元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于一般的数据,优先队列取什么东西作为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority作为优先度。

3、一些方法:

FIFO是常用的队列,其一些常用的方法有:

  •     Queue.qsize()  返回队列大小
  •     Queue.empty()  判断队列是否为空
  •     Queue.full()  判断队列是否满了
  •     Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。
  •     Queue.put(...[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
  •     Queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成
  •     Queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

消息队列实例中维护的有待完成任务变量。每接收到一个消息该值自增一次。每调用一次.task_done()可以使该值减1,当待完成任务值为0的时候,join函数才会返回。

4、queue例子:

import threadingimport Queueimport timeclass worker(threading.Thread): def __init__(self,queue):  threading.Thread.__init__(self)  self.queue=queue  self.thread_stop=False  def run(self):  while not self.thread_stop:   print("thread%d %s: waiting for tast" %(self.ident,self.name))   try:    task=q.get(block=True, timeout=20)#接收消息   except Queue.Empty:    print("Nothing to do!i will go home!")    self.thread_stop=True    break   print("task recv:%s ,task No:%d" % (task[0],task[1]))   print("i am working")   time.sleep(3)   print("work finished!")   q.task_done()#完成一个任务   res=q.qsize()#判断消息队列大小   if res>0:    print("fuck!There are still %d tasks to do" % (res))  def stop(self):  self.thread_stop = True if __name__ == "__main__": q=Queue.Queue(3) worker=worker(q) worker.start() q.put(["produce one cup!",1], block=True, timeout=None)#产生任务消息 q.put(["produce one desk!",2], block=True, timeout=None) q.put(["produce one apple!",3], block=True, timeout=None) q.put(["produce one banana!",4], block=True, timeout=None) q.put(["produce one bag!",5], block=True, timeout=None) print("***************leader:wait for finish!") q.join()#等待所有任务完成 print("***************leader:all task finished!")

输出:

thread139958685849344 Thread-1: waiting for tast 1    task recv:produce one cup! ,task No:1    i am working    work finished!    fuck!There are still 3 tasks to do    thread139958685849344 Thread-1: waiting for tast 1    task recv:produce one desk! ,task No:2    i am workingleader:wait for finish!    work finished!    fuck!There are still 3 tasks to do    thread139958685849344 Thread-1: waiting for tast 1    task recv:produce one apple! ,task No:3    i am working    work finished!    fuck!There are still 2 tasks to do    thread139958685849344 Thread-1: waiting for tast 1    task recv:produce one banana! ,task No:4    i am working    work finished!    fuck!There are still 1 tasks to do    thread139958685849344 Thread-1: waiting for tast 1    task recv:produce one bag! ,task No:5    i am working    work finished!    thread139958685849344 Thread-1: waiting for tast 1     ***************leader:all task finished!    Nothing to do!i will go home!

运行一下就知道,上例中并没有性能的提升(毕竟还是只有一个线程在跑)。线程队列的意义并不是进一步提高运行效率,而是使线程的并发更加有组织。可以看到,在增加了线程队列之后,程序对于线程的并发数量就有了控制。新线程想要加入队列开始执行,必须等一个既存的线程完成之后才可以。

5、线程池:

# -*- coding:utf-8 -*-import threadingimport Queueimport timeimport randomfrom faker import Fakerclass MyThread(threading.Thread):  '''  线程模型  '''  def __init__(self,queue):    threading.Thread.__init__(self)    self.queue = queue    self.start()  # 因为作为一个工具,线程必须永远“在线”,所以不如让它在创建完成后直接运行,省得我们手动再去start它    def run(self):        while True:  # 除非确认队列中已经无任务,否则时刻保持线程在运行            try:                task = self.queue.get(block=False)    # 如果队列空了,直接结束线程。根据具体场景不同可能不合理,可以修改                time.sleep(random.random())  # 假设处理了一段时间                print 'Task %s Done' % task  # 提示信息而已                self.queue.task_done()            except Exception,e:                breakclass MyThreadPool():    def __init__(self,queue,size):        self.queue = queue        self.pool = []        for i in range(size):            self.pool.append(MyThread(queue))    def joinAll(self):        for thd in self.pool:            if thd.isAlive():  thd.join()if __name__ == '__main__':    q = Queue.Queue(10)    fake = Faker()    for i in range(5):        q.put(fake.word())    pool = MyThreadPool(queue=q,size=2)    pool.joinAll()

网上有一部分示例,将队列作为一个属性维护在了线程池类中,也不失为一种办法,我这里为了能够条理清晰,没有放在类里面。这段程序首先生成了一个maxsize是10的队列。fake.word()可以随机生成一个单词,这里仅作测试用。所以向队列中添加了5个task。

  这里有个坑: 如果put的数量大于队列最大长度,而且put没有设置block=False的话,那么显然程序会阻塞在put这边。此时ThreadPool未被建立,也就是说工作线程都还没有启动,因此会引起这样一个死锁。如果把线程池的建立放到put之前也不行,此时线程发现队列为空,所以所有线程都会直接结束(当然这是线程中get的block是False的时候,如果为True那么也是死锁),最终队列中的task没人处理,程序输出为空。解决这个坑的办法,一个是像上面一样保持最开始put的量小于队列长度;第二个就是干脆不要限制队列长度,用q = Queue.Queue()生产队列即可。

  好的,继续往下,进入了线程池的生成。线程池内部的列表才是真·线程池,另外其关联了queue对象,所以在创建的时候可以将队列对象传递给线程对象。线程对象在创建时就启动了,并且被添加到线程池的那个列表中。线程池的大小由参数给出,线程启动后会去队列里面get任务,并且进行处理。处理完成后进行task_done声明并且再次去尝试get。如果队列为空那么就直接抛出异常,也就是跳出循环,线程结束。

  通过这样一个模型,根据线程池的大小,这才真正地给线程并发做了一个限制,可促进较大程度的资源利用。

6、进阶:

在上面这个示例中,实际上处理任务的实际逻辑是被写在了MyThread类里面。如果我们想要一个通用性更加高的工具类,那么势必要想想如何将这个线程类解耦具体逻辑。另一方面,队列中的任务的内容,不仅仅可以是字符串,也可以是任何python对象。这就使得灵活性大大提高。

  比如我们可以在队列中put内容是(func, args, kwargs)这样一个元组。其中func是一个函数对象,描述了任务的处理逻辑过程,args是一个元组,代表所有func函数的匿名参数,kwargs则是func函数的所有具名参数。如此,可以将线程类的run方法改写成这样:

def run(self):    while True:        try:            func,args,kwargs = self.queue.get()            try:                func(*args,**kwargs)            except Exception,e:                raise ('bad execution: %s' % str(e))            self.queue.task_done()        except Exception,e:            break

这样一个run就可以做到很大程度的解耦了。

  类似的思想,线程池类和线程类也不必是一一对应的。可以将线程类作为一个参数传递给线程池类。这样一个线程池类就可以作为容器容纳各种各样的线程了。具体实例就不写了。

 

转载地址:https://blog.csdn.net/h_jlwg6688/article/details/108506960 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python系列——多进程之multiprocessing多进程管理包
下一篇:python系列——多线程之条件变量condition

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年03月24日 20时18分27秒