python - 多进程
发布日期:2021-06-30 19:50:34 浏览次数:2 分类:技术文章

本文共 5131 字,大约阅读时间需要 17 分钟。

问题导读:

  1. Process
  2. Process class
  3. Lock
  4. Semaphore
  5. Event
  6. Queue
  7. Pool

解决方案:

Process

#!/usr/bin/env python# coding=utf8import multiprocessingimport timedef sayHello(interval):    for i in range(5):        print 'The time is {0}'.format(time.ctime())        time.sleep(interval)if __name__=='__main__':    p = multiprocessing.Process(target=sayHello, args=(3,))    p.start()    p1 = multiprocessing.Process(target=sayHello, args=(3,))    p1.start()    print 'pid:', p.pid, ' ', p1.pid    print 'name:' ,p.name, ' ', p1.name    print 'is_alive:', p.is_alive(), ' ', p1.is_alive()

Process Class

#!/usr/bin/env python# coding=utf-8import multiprocessingimport timeclass SayHello(multiprocessing.Process):    def __init__(self, interval):        multiprocessing.Process.__init__(self)        self.interval = interval    def run(self):        for i in range(4):            print 'time is {0} {1}'.format(time.ctime(),self.pid)            time.sleep(self.interval)if __name__=='__main__':    for i in range(5):        p = SayHello(1)        p.start()

Lock

#!/usr/bin/env python# coding=utf-8import multiprocessingimport timedef sayHello(lock, i):    # 获取锁    lock.acquire()    print 'start:{0}'.format(time.ctime())    try:        with open('./data.txt','a') as f:            f.write(time.ctime() + ' id:' + str(i) + '\n')    finally:        # 释放锁        lock.release()    time.sleep(1)    print 'end:{0}'.format(time.ctime())if __name__=='__main__':    lock = multiprocessing.Lock()    p1 = multiprocessing.Process(target=sayHello, args = (lock,3))    p2 = multiprocessing.Process(target=sayHello, args = (lock,4))    # 主进程结束,子线程结束    p1.daemon = True    p2.daemon = True    p1.start()    p2.start()    # 主进程会在结束之前检查是否有子线程未完成    p1.join()    p2.join()    print 'end!'

Semaphore

#!/usr/bin/env python# coding=utf-8import multiprocessingimport timedef sayHello(s, i):    # 获取    s.acquire()    print multiprocessing.current_process().name + ' acquire'    time.sleep(i)    print multiprocessing.current_process().name + ' release'    # 释放    s.release()if __name__=='__main__':    # 控制共享资源的访问数量    s = multiprocessing.Semaphore(2)    for i in range(4):        p = multiprocessing.Process(target=sayHello, args=(s, i))        p.start()

Event

#!/usr/bin/env python# coding=utf-8import multiprocessingimport timedef wait_for1(e):    print 'wait_for_1: starting'    # 等待1s    e.wait(1)    print 'wait_for_1:{0}'.format(str(e.is_set()))def wait_for2(e):    print 'wait_for_2: starting'    # 等待5s     e.wait(5)    print 'wait_for_2:{0}'.format(str(e.is_set()))if __name__=='__main__':    # 在进程之间传递状态    e = multiprocessing.Event()    e1 = multiprocessing.Process(name = 'p1', target = wait_for1, args = (e,))    e2 = multiprocessing.Process(name = 'p2', target = wait_for2, args=(e,))        e1.start()    e2.start()    # 等待3s 之后设置状态    time.sleep(3)    e.set()    print 'Event Set Ok!'

Queue

  1. Queue.qsize() 返回队列的大小  
  2. Queue.empty() 如果队列为空,返回True,反之False  
  3. Queue.full() 如果队列满了,返回True,反之False 
  4. Queue.get([block[, timeout]]) 获取队列,timeout等待时间  
  5. Queue.get_nowait() 相当Queue.get(False) 
  6. 非阻塞 Queue.put(item) 写入队列,timeout等待时间  
  7. Queue.put_nowait(item) 相当Queue.put(item, False)
#!/usr/bin/env python# coding=utf-8import multiprocessingimport timedef writer_proc(q):    try:        for i in range(5):            time.sleep(1)                        q.put(i,timeout = 2)            print 'put:',i    except:        passdef reader_proc(q):    try:        for i in range(5):            j = q.get(timeout = 2)            print 'get:',j    except:        passif __name__=='__main__':    q = multiprocessing.Queue()    writer = multiprocessing.Process(target = writer_proc, args=(q,))    writer.start()    reader = multiprocessing.Process(target = reader_proc, args=(q,))    reader.start()

Pool

# coding: utf-8import multiprocessingimport os, time, randomdef Lee():    print "\nRun task Lee-%s" % (os.getpid())  # os.getpid()获取当前的进程的ID    start = time.time()    time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数    end = time.time()    print 'Task Lee, runs %0.2f seconds.' % (end - start)def Marlon():    print "\nRun task Marlon-%s" % (os.getpid())    start = time.time()    time.sleep(random.random() * 40)    end = time.time()    print 'Task Marlon runs %0.2f seconds.' % (end - start)def Allen():    print "\nRun task Allen-%s" % (os.getpid())    start = time.time()    time.sleep(random.random() * 30)    end = time.time()    print 'Task Allen runs %0.2f seconds.' % (end - start)def Frank():    print "\nRun task Frank-%s" % (os.getpid())    start = time.time()    time.sleep(random.random() * 20)    end = time.time()    print 'Task Frank runs %0.2f seconds.' % (end - start)if __name__ == '__main__':    function_list = [Lee, Marlon, Allen, Frank]    print "parent process %s" % (os.getpid())    pool = multiprocessing.Pool(4)    for func in function_list:        # apply 线程阻塞,执行线程代码的时候,会阻塞主进程        pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中    print 'Waiting for all subprocesses done...'    pool.close()    pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束    print 'All subprocesses done.'

转载地址:https://lipenglin.blog.csdn.net/article/details/53304328 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python - 多进程spider
下一篇:MAC - HBase(伪分布式)

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年05月01日 17时22分42秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章