multiprocessing模块、进程间通信、进程Queue、进程池

鳄鱼君

发表文章数:523

热门标签

, , , ,

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python教程 » multiprocessing模块、进程间通信、进程Queue、进程池

计算占用CPU(1+1),IO操作不占用CPU(读取网上数据、从内存读取数据)。Python的多线程,就是单线程在不断地进行上下文切换,上下文切换也耗费资源,它需要保存当前线程的现场,然后切换到另一个线程,再切回来。大量的计算,如果使用多线程,效率更低,甚至比单线程还慢!

Python的多线程不适合CPU操作密集型的任务,适合IO操作密集型的任务!

对于CPU密集型操作就需要使用多进程。多进程的进程之间是独立的。Python的进程和线程都是使用操作系统的原生进程和线程,原生进程和线程是由操作系统自己维护的,Python解释器只是通过C语言的接口启动进程和线程。操作系统本身没有GIL全局解释器锁,进程与进程之间的数据是完全独立的,不能互相访问,也就不需要锁!

假设我现在有8核的机器,启动8个进程,每个进程都有一个主线程,也就是8个线程,这就可以利用多核了。唯一的缺点就是这8个线程之间是相互独立的!

Process

Python中开启多进程需要使用multiprocessing模块下的Process

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello',name)

if __name__=='__main__':
    # # 一个进程
    # p=Process(target=f,args=('zjj',))
    # p.start()
    # p.join()

    # 多个进程
    for i in range(10):
        t=Process(target=f,args=('eyujun %s' % str(i),))
        t.start()

每个进程里面还可以启动线程:

from multiprocessing import Process
import time
import threading
def thread_run():
    print(threading.get_ident()) # 打印线程ID

def f(name):
    time.sleep(2)
    print('hello',name)
    t=threading.Thread(target=thread_run,)
    t.start()

if __name__=='__main__':
    # 多个进程
    for i in range(10):
        t=Process(target=f,args=('eyujun %s' % str(i),))
        t.start()

每一个进程都有一个父进程,显示各个进程ID:

from multiprocessing import Process
import os
def info():
    print('module name:', __name__)
    print('parent process:', os.getppid()) # 父进程的id
    print('process id:', os.getpid()) # 自身id

def f(name):
    info() # 在紫金城调用info
    print('hello', name)

if __name__ == '__main__':
    info() # 在主进程调用info
    p = Process(target=f, args=('bob',)) # 启动一个进程
    p.start()
    p.join()

进程之间通讯

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以使用进程Queue,不是线程里的queue。

from multiprocessing import Process, Queue
def f(q):
    q.put([42, None, 'hello']) # 在q里面放数据

if __name__ == '__main__':
    q = Queue() #一个queue
    p = Process(target=f, args=(q,)) # 传给子进程
    p.start() # 启动子进程

    print(q.get())
    p.join()

线程和线程之间是可以通讯的:

import threading
import queue
def f():
    q.put([42, None, 'hello']) # 在q里面放数据

if __name__ == '__main__':
    q = queue.Queue() #一个queue
    p = threading.Thread(target=f,)
    p.start() # 启动子线程

    print(q.get()) #
    p.join()

使用线程的queue实现主进程跟子进程之间通讯:

from multiprocessing import Process
import queue
def f(q):
    q.put([42, None, 'hello']) # 在q里面放数据

if __name__ == '__main__':
    q = queue.Queue() #一个queue
    p = Process(target=f,args=(q,)) # 将queue传递给子进程
    p.start() # 启动子进程

    print(q.get()) #
    p.join()

# TypeError: can't pickle _thread.lock objects

代码中,将一个线程queue传递给子进程,不可以的!只能使用进程queue

我们还可以通过管道来实现进程之间的通讯:

from multiprocessing import Process, Pipe
def f(conn):
    conn.send('Hello my parent')
    print(conn.recv()) # 儿子收父亲 hello my child
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() # 生成一个管道实例,返回两个对象,可以理解为:父亲,儿子
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # 父亲收儿子: Hello my parent
    parent_conn.send('hello my child') # 父亲给儿子发
    p.join()

进程之间数据的共享,同时修改数据:

from multiprocessing import Process, Manager
import os
def f(d, l):
    d[1] = '1' # 字典添加
    l.append(os.getpid()) # 列表添加进程号
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict() # 生成一个字典,可在多进程间共享和传递

        l = manager.list(range(5)) # 生成一个列表,可在多进程间共享和传递
        p_list = []  # 存放所有的子进程
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list: # 等待所有子进程都执行完成
            res.join()

        print(d)
        print(l)

进程锁(屏幕修改锁,而不是数据修改锁):

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print('hello world', i)
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

进程池

启动进程数量过大,你会发现程序明显变慢,启动一个进程将相当于克隆父进程的内存数据,如果父进程占1G,启动100个进程就相当于101G,开销非常大。为了避免这种情况,就可以使用进程池,进程池中有两个方法:

  1. apply:串行
  2. apply_async:异步执行
from multiprocessing import  Pool
import time,os
def Foo(i): # 一个子进程
    time.sleep(2)
    print('in process',os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid()) # 父进程id

if __name__ == '__main__':
    pool = Pool(5) # 允许进程池同时放入5个进程池

    for i in range(10): # 启动10个进程
        # pool.apply_async(func=Foo, args=(i,)) # 进程池放入子进程
        pool.apply_async(func=Foo, args=(i,), callback=Bar) # callback回调,Foo执行完毕,执行Bar(父进程)
        # pool.apply(func=Foo, args=(i,)) # 串行

    print('end',os.getpid()) # 父进程id
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《multiprocessing模块、进程间通信、进程Queue、进程池》 发布于2020-03-28

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

6 + 3 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册