前面学习了多线程,接下来学习多进程的创建和使用。多进程更适合计算密集型的操作,他的语法和多线程非常相像,唯一需要注意的是,多线程之间是可以直接共享内存数据的;但是多进程默认每个进程是不能访问其他进程(程序)的内容。我们可以通过一些特殊的方式(队列,数组和字典)来实现,注意这几个数据结构和平常使用的不太一样,是在多进程中特殊定义的。
例如:通过queue来共享数据
#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Alex Lifrom multiprocessing import Processfrom multiprocessing import queuesimport multiprocessingfrom multiprocessing import Arraydef foo(i,arg): arg.put(i) print('say hi',i,arg.qsize())if __name__ == "__main__": # li = [] li = queues.Queue(20,ctx=multiprocessing) for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() p.join()------------------say hi 0 1say hi 1 2say hi 2 3say hi 3 4say hi 4 5say hi 5 6say hi 6 7say hi 7 8say hi 8 9say hi 9 10
例2 通过array来共享数据,注意array初始化的时候就需要固定数据类型和长度
from multiprocessing import Processfrom multiprocessing import queuesimport multiprocessingfrom multiprocessing import Arraydef foo(i,arg): arg[i] = i + 100 for item in arg: print(item) print('================')if __name__ == "__main__": li = Array('i', 10) for i in range(10): p = Process(target=foo,args=(i,li,)) p.start()----------------000000010700================00000001071080================0101000001071080================010100001061071080================01010001051061071080================...(等等省略)
例3 通过字典方式进程间共享
#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Alex Lifrom multiprocessing import Processfrom multiprocessing import queuesimport multiprocessingfrom multiprocessing import Managerdef foo(i,arg): arg[i] = i + 100 print(arg.values())if __name__ == "__main__": # li = [] # li = queues.Queue(20,ctx=multiprocessing) obj = Manager() li = obj.dict() for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() p.join() ----------------[100][100, 101][100, 101, 102][100, 101, 102, 103][100, 101, 102, 103, 104][100, 101, 102, 103, 104, 105][100, 101, 102, 103, 104, 105, 106][100, 101, 102, 103, 104, 105, 106, 107][100, 101, 102, 103, 104, 105, 106, 107, 108][100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
和线程类似,当多个进程操作同一个全局变量的时候,需要加锁,不然可能错误;
比如
#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Alex Lifrom multiprocessing import Processfrom multiprocessing import queuesfrom multiprocessing import Arrayfrom multiprocessing import RLock, Lock, Event, Condition, Semaphoreimport multiprocessingimport timedef foo(i,lis): lis[0] = lis[0] - 1 time.sleep(1) print('say hi',lis[0])if __name__ == "__main__": # li = [] li = Array('i', 1) li[0] = 10 for i in range(10): p = Process(target=foo,args=(i,li)) p.start()-------------say hi 0say hi 0say hi 0say hi 0say hi 0say hi 0say hi 0say hi 0say hi 0say hi 0
如何修复?
两种方式,一个是p.start()下面加个p.join(),那真的就算按顺序一个个执行了;还有一个方式就是加锁
#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Alex Lifrom multiprocessing import Processfrom multiprocessing import queuesfrom multiprocessing import Arrayfrom multiprocessing import RLock, Lock, Event, Condition, Semaphoreimport multiprocessingimport timedef foo(i,lis,lc): lc.acquire() lis[0] = lis[0] - 1 time.sleep(1) print('say hi',lis[0]) lc.release()if __name__ == "__main__": # li = [] li = Array('i', 1) li[0] = 10 lock = RLock() for i in range(10): p = Process(target=foo,args=(i,li,lock)) p.start()--------------say hi 9say hi 8say hi 7say hi 6say hi 5say hi 4say hi 3say hi 2say hi 1say hi 0
和线程池相比,Python已经提供了完备的进程池模块,因此可以直接使用。进程池里面有2种方法,apply或apply_async;前者是阻塞,而后者是非阻塞的
例如下面例子我使用的apply_async,那么所有的进程是(非阻塞)同时执行的,当执行到time.sleep(5),每个子线程会卡5秒,而同时主线程执行到了pool.terminate(),这个时候就直接终止程序了
#!/usr/bin/env python# -*- coding:utf-8 -*-from multiprocessing import Poolimport timedef f1(arg): print(arg,'b') time.sleep(5) print(arg,'a')if __name__ == "__main__": pool = Pool(5) for i in range(30): # pool.apply(func=f1,args=(i,))#按照顺序执行 pool.apply_async(func=f1,args=(i,))#同时执行 # pool.close() # 所有的任务执行完毕 time.sleep(2) pool.terminate() # 立即终止 pool.join() pass--------------"C:\Program Files\Python3\python.exe" C:/temp/s13day11/day11/s16.py0 b1 b2 b3 b4 b
如果改成close(),那么他会等待pool中的任务执行完成之后再中止程序
from multiprocessing import Poolimport timedef f1(arg): print(arg,'b') time.sleep(5) print(arg,'a')if __name__ == "__main__": pool = Pool(5) for i in range(30): # pool.apply(func=f1,args=(i,))#按照顺序执行 pool.apply_async(func=f1,args=(i,))#同时执行 pool.close() # 所有的任务执行完毕 time.sleep(2) # pool.terminate() # 立即终止 pool.join() pass----------"C:\Program Files\Python3\python.exe" C:/temp/s13day11/day11/s16.py0 b1 b2 b3 b4 b0 a5 b1 a6 b2 a7 b3 a8 b4 a9 b5 a10 b6 a11 b7 a8 a12 b13 b9 a14 b10 a15 b11 a16 b13 a12 a18 b17 b14 a19 b15 a20 b16 a21 b17 a18 a22 b23 b19 a24 b20 a25 b21 a26 b22 a27 b23 a28 b24 a29 b25 a26 a27 a28 a29 a
注意和线程类似,进程里面也可以使用join(),确保主进程阻塞在这里直到所有的子进程都结束。