Python系列六之多进程

在 Python 里,我们使用 multiprocessing 这个模块来进行多进程的操作。

multiprocessing 模块通过创建子进程的方式来运行多进程,因此绕过了 Python 里 GIL 的限制,可以充分利用机器上的多个处理器。

1、多进程使用示例

多进程的使用方式和多线程的方式类似,这里使用到的是 multiprocessing.Process 类,下面是一个简单的示例:

from multiprocessing import Process
import time

def f(x):
    if x % 2 == 1:
        time.sleep(x+1)
    print(x * x)
    return x * x


def test_multi_process():
    processes = []

    for i in range(5):
        processes.append(Process(target=f, args=(i,)))

    for p in processes:
        p.start()

    for p in processes:
        p.join(0.5)

    for p in processes:
        print(p, p.is_alive(), p.pid, p._parent_pid)


if __name__ == "__main__":
    test_multi_process()

在上面的示例中,test_multi_process() 函数里使用多进程的方式调用 f 函数,和多线程的调用方式一致,通过 start() 方法启动进程活动,使用 join() 方法阻塞调用其的进程。

接下来介绍一下 multiprocessing.Process 的一些方法和属性。

1. run()

表示进程活动的方法,可以在子类中重载此方法,比如多线程笔记的操作里重写 run() 对函数执行报错进行了处理,并返回了执行结果

2. start()

启动进程活动,将对象的 run() 方法在一个单独的进程中调用

3. join()

阻塞调用 join() 方法的进程,在上面的示例中也就是父进程,默认值为 None,也就表示阻塞操作。

如果设置为其他正数值,那么则最多会阻塞多少秒,比如上面的示例为 0.5 秒,如果超时,那么父进程则会继续往后执行。

比如上面的示例输出结果如下:

0
4
16
<Process name='Process-1' pid=6600 parent=24248 stopped exitcode=0> False 6600 24248
<Process name='Process-2' pid=4368 parent=24248 started> True 4368 24248
<Process name='Process-3' pid=13024 parent=24248 stopped exitcode=0> False 13024 24248
<Process name='Process-4' pid=3288 parent=24248 started> True 3288 24248
<Process name='Process-5' pid=16880 parent=24248 stopped exitcode=0> False 16880 24248
1
9

在打印每个进程的信息时,f() 函数内部进行 sleep 的进程还没有执行结束,但是进程已经超时了,所以不再阻塞父进程向下执行。

4. is_alive()

上面有打印出信息,返回布尔值,表示该进程是否还活着。

5. pid 和 parent_pid

上面使用 .pid 和 ._parent_pid 属性打印出了每个进程的 id 和其父进程的 id。

2、进程池

进程使用的对象是 multiprocessing.pool.Pool()。

接受 processes 参数为进程数,表示要使用的工作进程数目,如果不传入,则默认使用 cpu 的核数,根据 os.cpu_count() 获取。

接下来分别使用示例介绍 multiprocessing.pool 下的几个调用方法,进程池的使用可以使用 map() 和 starmap() 两个函数。

1. map()

map() 接受两个参数,func 表示多进程要执行的函数,iterable 表示要执行的 func 函数输入的参数的迭代对象。

这里需要注意一下,map() 函数使用的 func 函数只能接受一个参数,比如我们前面定义的 f 函数,下面是其使用示例:

def f(x):
    return x * x


def test_pool_map():
    with Pool(processes=4) as pool:
        results = pool.map(func=f, iterable=range(10))

    print(results)

2. starmap()

starmap() 函数与 map() 使用方法类似,但是 iterable 迭代参数的元素是 func 函数的多个参数,比如我们想要对下面的 add() 函数使用多进程:

def f_add(x, y):
    return x + y

它的调用方式如下:

def test_pool_starmap():
    with Pool(processes=4) as pool:
        results = pool.starmap(func=f_add, iterable=zip(range(6), range(6, 12)))
    
    print(results)

这里返回的 results 是一个列表,元素是每个进程执行的函数的返回结果。

3、进程间交换对象

前面介绍了,多进程的运行方式是通过建立子进程的形式来操作,而不同进程间数据是不共享的,这一点不同于多线程。

因为多线程的操作是在同一个进程内实现的,所以线程间数据是共享数据资源的。

接下来介绍一下如何在进程间进行对象的交换,其实进程间进行对象的交换是一个子命题,更高层级的概括是在进程间进行通信,在官方的文档中对其进行了细分,所以这里也对其进行分类别的介绍。

在进程间进行对象交换的方式有两种,一种是队列,一种是管道。

1. 队列

1) 队列的代码示例

这里的模块的引入是 multiprocess.Queue,这个类近似于是 queue.Queue 的克隆,以下是官方文档的一个示例,内容是在父进程中创建一个队列,然后在子进程中写入数据,然后再在父进程中读取:


from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

队列的写入使用 put(),读取使用 get()。

get() 还可以加上两个参数,block 和 timeout,block 表示是否阻塞,timeout 表示获取的超时时间。

接下来我们再实现一个功能,两个子进程写入数据,一个子进程读取数据,代码示例如下:

from multiprocessing import Queue, Process


def f_write(q, n, name):
    for i in range(n):
        q.put(f"{name}_{i}")
        time.sleep(0.1)


def f_read(q):
    while q.qsize() > 0:
        print(q.get(block=False, timeout=1))
        time.sleep(0.5)


def test_queue():
    # 三个进程,一个写进程,两个读进程
    q = Queue()
    q.put("origin_value")
    q.put("b")

    # p1 = Process(target=f_queue, args=(q, "c"))
    # p2 = Process(target=f_queue, args=(q, ))
    p1 = Process(target=f_write, args=(q, 5, "a"))
    p2 = Process(target=f_write, args=(q, 8, "b"))
    p3 = Process(target=f_read, args=(q,))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    print("total: ", q.qsize())

if __name__ == "__main__":
    test_queue()

2) 队列的相关方法

关于队列的相关函数,除了前面介绍的几种,还有比如判断队列的长度,是否为空等。

a) Queue()

在定义一个队列的时候,我们前面是直接定义 q=Queue(),不为其设置元素长度,而如果我们想要为其设置一个最大的长度,可以加上 maxsize 参数:

q = Queue(maxsize=3)

那么队列里最多只能有三个元素,而如果队列满了还往其中 put() 加入操作,则会阻塞,直到其他进程对其读取其中的数据。

b) put()

put() 函数表示的是往队列里添加元素,元素的类型不限,添加数字,字符串,字典,列表都可以:

q = Queue()
q.put(1)
q.put({"a": 4})
q.put([1,3,4])

前面介绍了,如果队列满了,还往队列里进行 put() 操作,则会进入阻塞操作,可以通过添加 block 或者 timeout 来进行避免。

block 表示是否阻塞,为 True 的话则会进入阻塞等待状态。False 的话则会引发异常。

timeout 表示超时,尝试往队列里添加数据,超出等待时间同样已发队列已满的异常。

c) get()

get() 函数表示从队列中读取元素,队列的写入和读取的原则是先入先出,最先进去的最先出来。

而为了避免队列为空的情况下进行 get() 进入阻塞状态,get() 可以使用两个参数,一个是 block,表示是否阻塞,一个是 timeout,表示超时时间。

如果队列为空还进行 get() 操作,使用上面这两个操作则会 raise 一个 Empty 的 error。

d) qsize()

返回队列的长度,但由于多进程或多线程的上下文,这个数字是不可靠的。

e) empty()

如果队列是空的,则返回 True,否则返回 False,由于多进程或多线程的环境,该状态是不可靠的。

f) full()

如果队列设置了 maxsize 参数,那么如果队列满了,则返回 True,否则返回 False,由于多进程或多线程的环境,该状态是不可靠的。

g) close()

关闭队列,如果执行了 q.close(),再往里面添加元素执行 q.put() 操作,则会引发报错。

2. 管道

1) 管道的相关函数

管道的引入方式如下:

from multiprocessing import Pipe

管道的定义可以直接实例化 Pipe,返回管道的两端:

conn1, conn2 = Pipe()

默认情况下,Pipe() 的参数 duplex 值为 True,表示管道是双工的,也就是可以双向通信的,比如 conn1 可以写入,也可以读出,conn2 可以写入也可以读出数据。

而如果手动设置 duplex 为 False,那么管道则是单向的,conn1 只能用于接收消息,conn2 只能发送消息。

管道用于发送和接收的函数分别如下:

发送信息

conn.send(obj)

发送的对象可以是字符串,也可以是其他对象,比如列表,字典等。

接收信息

conn.recv()

关闭连接对象

我们可以使用 close() 来关闭连接对象,当连接对象被垃圾回收时会自动调用:

conn.close()

判断连接对象中是否有可以读取的数据

如果我们直接使用 conn.recv() 的时候,如果管道内没有可接收的对象,会进入阻塞状态,直到管道内传入数据。

我们可以使用 poll() 函数判断管道内是否有可以读取的数据,返回的是一个布尔型数据,表示是否有数据:

has_data = conn.poll()

但是如果不设置超时时间,同样会进入等待状态,所以可以设置一个最大阻塞秒数:

has_data = conn.pool(timeout=3)  # 等待 3 秒

2) 管道的代码示例

接下来我们用下面的代码来进行管道的双工测试,即从管道的两端分别写入和读取数据。

from multiprocessing import Process, Pipe


def send_info(conn, info):
    conn.send(info)
    conn.close()


def read_info(conn):
    while conn.poll(timeout=2):
        info = conn.recv()
        print(info)


def test_pipe():
    # 两个 conn 分别都往里面读和写
    parent_conn, child_conn = Pipe()

    # p1 向 child 管道写入
    print("id out of func: ", id(child_conn))
    p1 = Process(target=send_info, args=(child_conn, "send_info_from_child"))
    p1.start()
    p1.join()

    # p2 从 parent 管道读取
    p2 = Process(target=read_info, args=(parent_conn,))
    p2.start()
    p2.join()

    # p3 向 parent 管道写入
    p3 = Process(target=send_info, args=(parent_conn, "send_info_from_parent"))
    p3.start()
    p3.join()

    # p4 从 child 管道读取
    p4 = Process(target=read_info, args=(child_conn,))
    p4.start()
    p4.join()


if __name__ == "__main__":
    test_pipe()

注意 :如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

4、进程间同步

与多线程一样,多进程也可以使用锁来确保一次只有一个进程来执行一个操作,比如有一个打印到标准输出的操作,我们需要确保其打印的日志不紊乱,就可以使用下面的操作:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print("hello ", i)
    finally:
        l.release()

if __name__ == "__main__":
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

而如果不使用锁,我们重写 f 函数如下:

def f(l, i):
    print("hello ", i)

多执行几次,我们可以看到控制台的输出会出现错乱的情况,这样就可能对输出信息不能直观查看,比如:

hello  2
hello  0
hello  4
hello hello  3
 1
hello  5
hello  6
hello  8
hello  9
hello  7

5、进程间共享状态

在并发编程的时候,应当尽量避免使用共享状态,尤其是多进程操作时,但如果真的有这个需求,需要共享一些数据,multiprocessing 提供了两种方法,一种是共享内存,一种是服务进程。

1. 共享内存

我们可以使用 Value 或者 Array 将数据存储在共享内存映射中。

Value 是存储的单个变量,Array 存储的是数组,注意下,这里的 Value 和 Array 在定义的时候都需要指定元素类型。

其引入及代码示例如下:

from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 5
    a[0] = 100


if __name__ == "__main__":
    num = Value('d', 1)
    arr = Array('i', range(5))
    print(num.value)
    print(arr[:])

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

其中,引入的方式可以直接从 multiprocessing 中引入,在定义 Value 和 Array 的时候,第一个参数是 'd' 和 'i',分别表示类型是双精度浮点数和有符号整数。

这些共享对象将是进程和线程安全的。

更多的关于共享内存的信息,可以使用 multiprocessing.sharedctypes 模块。

2. 服务进程

我们可以使用 Manager() 返回的管理对象控制一个服务进程,这个进程还可以保存 Python 对象并允许其他进程使用代理操作它们。

这个操作的意思就是使用 Manager() 会跟多进程的操作方式一样,创建一个子进程,然后将一些需要共享的数据都放到这个子进程里,其他子进程可以操作这个子进程的数据来达到数据共享的目的。

Manager() 支持的数据类型有:list,dict,Namespace,Lock,Value,Array 等,下面介绍一下代码示例:

from multiprocessing import Process, Manager


def f(d, l):
    d["a"] = 1
    d["b"] = 2
    l[0] = 100


if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。

此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/593490.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[C++基础学习-04]----C++数组详解

前言 在C中&#xff0c;数组是一种用来存储相同类型元素的数据结构。一维数组是最简单的数组形式&#xff0c;它由一系列按顺序存储的元素组成。二维数组则是由一维数组构成的数组&#xff0c;可以看作是一堆一维数组堆叠在一起形成的矩阵。 正文 01-数组简介 一维数组和二维…

库存管理系统开源啦

软件介绍 ModernWMS是一个针对小型物流仓储供应链流程的开源库存管理系统。该系统的开发初衷是为了满足中小型企业在有限IT预算下对仓储管理的需求。通过总结多年ERP系统研发经验&#xff0c;项目团队开发了这套适用于中小型企业的系统&#xff0c;以帮助那些有特定需求的用户。…

计算机毕业设计springboot基于vue电商抢购限时秒杀系统ch0h8

技术栈 ide工具&#xff1a;IDEA 或者eclipse 编程语言: java 数据库: mysql5.7以上版本 可选框架&#xff1a;ssmspringboot都有的 前端&#xff1a;vue.jsElementUI 详细技术&#xff1a;springbootSSMvueMYSQLMAVEN 数据库工具&#xff1a;Navicat/SQLyog都可以 开发工具 Ec…

【iOS】KVC

文章目录 前言一、KVC常用方法二、key与keypath区别key用法keypath用法 三、批量存值操作四、字典与模型相互转化五、KVC底层原理KVC设值底层原理KVC取值底层原理 前言 KVC的全称是Key-Value Coding&#xff0c;翻译成中文叫做键值编码 KVC提供了一种间接访问属性方法或成员变…

基于stm32的USB虚拟U盘+FATFS+W25Q64

基于stm32的USB虚拟U盘FATFSW25Q64 本文目标&#xff1a;基于stm32的USB虚拟U盘FATFSW25Q64 按照本文的描述&#xff0c;简单跑通USB的MSC类来进行简单交互。 先决条件&#xff1a;拥有C语言基础&#xff0c;装有编译和集成的开发环境&#xff0c;比如&#xff1a;Keil uVis…

mysql其它补充

exist和in的区别 exists 用于对外表记录做筛选。 exists 会遍历外表&#xff0c;将外查询表的每一行&#xff0c;代入内查询进行判断。 当 exists 里的条件语句能够返回记录行时&#xff0c;条件就为真&#xff0c;返回外表当前记录。反之如果 exists 里的条件语句不能返回记…

使用python开发的词云图生成器2.0

使用python开发的词云图生成器2.0 更新部分词云图主要三方库工具介绍和效果工具界面&#xff1a; 代码 更新部分 1.支持选择字体&#xff1b; 2.支持选择词云图形状 词云图 词云图啊&#xff0c;简单来说&#xff0c;它可以把文本数据中的高频关键词变成不同大小、颜色的词汇…

一篇文章带你深入了解“指针”

一篇文章带你深入了解“指针” 内存和地址了解指针指针类型const修饰指针指针的运算指针与整数之间的运算指针与指针之间的运算指针的关系运算 void* 指针传值调用和传址调用数组和指针的关系野指针野指针的形成原因规避野指针 二级指针字符指针指针数组数组指针数组传参一维数…

(三)JVM实战——对象的内存布局与执行引擎详解

对象的内存布局 对象的实例化 对象的创建方式 - new的方式 - Class的newInstance():反射的方式 - Construct的newInstance() - clone:实现Cloneable接口,默认浅拷贝 - 使用反序列化&#xff1a;将二进制流转化为内存对象 创建对象的步骤 - 判断对象对应的类是否加载、链接、初…

基础I/O--文件系统

文章目录 回顾C文件接口初步理解文件理解文件使用和并认识系统调用open概述标记位传参理解返回值 closewriteread总结 文件描述符fd0&1&2理解 回顾C文件接口 C代码&#xff1a; #include<stdio.h> int main() { FILE *fpfopen("log.txt",&…

08 - 步骤 表输出

简介 表输出&#xff08;Table Output&#xff09;步骤是用于将 Kettle 中的数据写入关系型数据库表的步骤。它允许用户将数据流中的数据插入、更新或删除到目标数据库表中。 使用 场景 我要将处理完的数据流中的sysOrgCode 跟 plateNumber 保存记录到mysql 1、拖拽表输出…

2.VAM新建保存修改场景文件

新建场景 点击返回场景预览 打开游戏的时候&#xff0c;本身就有了一个新场景&#xff0c;因为现在场景里什么也没有&#xff0c;所以是一片黑暗 点击星号打开主菜单会返回主界面 做一个最简单的Helloworld场景 底下有两个模式&#xff0c;游玩模式和编辑模式 编辑场景的时候…

OpenCV如何使用 GDAL 读取地理空间栅格文件(72)

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇:OpenCV的周期性噪声去除滤波器(70) 下一篇 :OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 目录 目标 代码&#xff1a; 解释&#xff1a; 如何使用 GDAL 读取栅格数据 注意 …

05_G1垃圾收集器

G1垃圾收集器简介 垃圾优先 Garbage-First&#xff08;G1&#xff09;垃圾收集器面向多处理器机器&#xff0c;适用于大内存场景。它尝试在无需太多配置的情况下实现垃圾收集暂停时间目标&#xff0c;并同时实现高吞吐量。G1旨在通过适用于当前目标应用和环境的功能&#xff0…

go mod

常用命令 初始化模块 go mod init 模块名下载 go.mod 文件中指明的所有依赖 go mod download github.com/gin-gonic/ginv1.9.(依赖路径)依赖对其&#xff08;使引用的都是所依赖的&#xff09; go mod tidy编辑go.mod go mod edit go mod edit -require"github.com/g…

记录几种排序算法

十种常见排序算法可以分类两大类别&#xff1a;比较类排序和非比较类排序。 常见的快速排序、归并排序、堆排序以及冒泡排序等都属于比较类排序算法。比较类排序是通过比较来决定元素间的相对次序&#xff0c;其时间复杂度不能突破 O(nlogn)。在冒泡排序之类的排序中&…

数据结构---时间复杂度+空间复杂度

算法(algorithm)简单说就是解决问题的方法。方法有好坏&#xff0c;同样算法也是&#xff0c;有效率高的算法&#xff0c;也有效率低的算法。衡量算法的好坏一般从时间和空间两个维度衡量&#xff0c;也就是本文要介绍的时间复杂度和空间复杂度。有些时候&#xff0c;时间与空间…

js api part3

环境对象 环境对象&#xff1a; 指的是函数内部特殊的 变量 this &#xff0c; 它代表着当前函数运行时所处的环境 作用&#xff1a; 弄清楚this的指向&#xff0c;可以让我们代码更简洁 函数的调用方式不同&#xff0c;this 指代的对象也不同 【谁调用&#xff0c; this 就是…

springboot模块以及非springboot模块构成的多模块maven项目最佳构建方式

文章目录 背景一般的实现使用spring-boot-dependencies 更优雅的实现. 背景 有时候构建一个多模块maven项目其中某一个模块是web-service需要使用spring boot,其他模块跟spring boot 完全无关,本文总结一下在这个场景下maven项目最佳构建方式. 一般的实现 网上应该也看到过很…

智能工业相机哪家好?

一、什么是智能工业相机 在工业自动化的浪潮中&#xff0c;智能工业相机扮演着至关重要的角色。它们如同工业领域的“眼睛”&#xff0c;为生产过程提供精准的视觉监测和数据采集。然而&#xff0c;面对众多的智能工业相机品牌&#xff0c;如何选择一款真正适合的产品成为了众多…