Python 多进程编程
# Python 多进程对效率的提升 一篇[《Python 中单线程、多线程和多进程的效率对比实验》][multiprocess-efficiency]的文章中提到: > Python是运行在解释器中的语言,有一个全局锁(GIL), > 在使用多进程(Thread)的情况下,不能发挥多核的优势。 > 而使用多进程(Multiprocess),则可以发挥多核的优势真正地提高效率。 文章中对 Python 在多线程、多进程的效率进行了对比: | 操作类型 | CPU 密集型 | IO 密集型 | 网络请求密集型 | | ---------- | -------------: | -------------: | -------------: | | 线性操作 | 94.91824996469 | 22.46199995279 | 7.3296000004 | | 多线程操作 | 101.1700000762 | 24.8605000973 | 0.5053332647 | | 多进程操作 | 53.8899999857 | 12.7840000391 | 0.5045000315 | 可见: 1. 多线程操作只在网络请求密集型操作中具有非常明显的优势,其开销小于多进程,可用于网络爬虫。 2. 多进程操作在各种操作中都有效率提升,在 IO 密集型操作中的优势更大。 最近在处理一套出租车数据,出租车数据量非常大,自己搭建数据库, 查询效率非常低。因此采用 Python 脚本进行处理, # Python 开启多进程 Python 中的 `multiprocess` 包提供了多进程支持。可以使用三种方法来创建进程。 ## 创建 Process 对象 最简单的开启 Python 进程的方法,是直接构造 `multiprocess.Process` 对象 ```python from multiprocess import Process process = Process() ``` ### 构造参数 `Process` 对象在构造时主要接收三个参数: - `target`:进程调用的函数; - `args`:进程调用函数时给函数传递的参数,为一个元组; - `name`:别名。 ### 属性 `Process` 的类型有以下属性: - `daemon`:当这个属性设置为 `True` 时,子进程会随着主进程的结束而结束。否则,主进程结束后,子进程依然会继续进行; - `exitcode`:进程在运行时为 None 、如果为 –N ,表示被信号 N 结束; - `name` - `pid` - `authkey` ### 方法 `Process` 有如下方法: - `start()`:调用 `start()` 函数时,子进程开始执行,主进程继续执行。 - `join()`:“[阻塞当前进程,直到调用 join 方法的那个进程执行完,再继续执行当前进程。][join-explain]” - `run()`:当构造时如果没有制定 `target` 参数,那么 `start()` 方法默认执行 `run()` 函数。 - `is_alive()`:判断当前进程是否活动。 ### 调用示例 ```python from multiprocess import Process import os import math def work_fun(work_list): pass def distrib_works(work_list, process_num): group_length = math.ceil(len(filename_list) / process_num) return [work_list[(i*group_length):((i+1)*group_length)] for i in range(process_num)] work_list = os.listdir("../data") process_num = 4 group = distrib_works(work_list, process_num) process_list = [Process(target=work_fun, args=(g,)) for g in group_list] for p in process_list: p.daemon = True p.start() for p in process_list: p.join() ``` ## 将进程定义为类 利用 Python 面向对象的特性,我们可以创建一个类,继承 `Process` 类, 将一些数据直接在构造的时候保存下来,可以无需在调用的时候传入。 例如,当我们在多进程程序中使用 `tqdm` 库显示进度条时,会用到其 `position` 参数来指定当前进度条在控制台中显示的位置,这个参数的值,我们可以直接保存在进程类中, 无需调用的时候再传入。 将进程定义为类的方法如下: ```python from multiprocess import Process class MyProcess(Process): position = 0 works = None def __init__(self, position, works) Process.__init__(self) self.position = poisition = works def run(): pass ``` 如果在构造函数中,调用的 `Process` 的构造函数没有指定 `target`, 进程同样默认执行 ***不带参数的 `run` 函数,即使你的 `run` 函数定义了形参!*** 在创建进程时,只需要将原来调用的 `Process` 的构造函数,改为调用 `MyProcess` 的构造函数即可。 这种创建进程方式的实例如下: ```python from multiprocess import Process, Lock def distrib_works(work_list: List[str], process_num) -> List[List[str]]: group_length = math.ceil(len(work_list) / process_num) return [g for g in [work_list[(i*group_length):((i+1)*group_length)] for i in range(process_num)] if len(g) > 0] class FindTargetTaxiProcess(multiprocessing.Process): def __init__(self, input_files, index, lock, log_file): multiprocessing.Process.__init__(self, target=find_target, args=(lock, log_file)) self.input_files = input_files self.index = index def find_target(self, lock, log_file): for filename in self.input_files: with open(filename) as in_file: for row in tqdm(in_file, ncols=80, position=self.index): cells = row.split(",") if int(cells[0]) == 11865: print(row) if __name__ == '__main__': LOG_FILE = r"E:\出租车点\上下车点\scripts\data\find_error.log" lock = multiprocessing.Lock() # ROOT_DIR = "../data/201502/temp" ROOT_DIR = r"E:\出租车点\201502\RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] GROUP_LIST = distrib_works(INPUT_FILES, 4) PROCESS_LIST = [FineTargetTaxiProcess(element, i, lock) for i, element in enumerate(GROUP_LIST)] for process in PROCESS_LIST: process.start() ``` > 这种定义为类的方式有一个好处,在用 VSCode 调试的时候,在子进程中打断点是无效的。 > 如果用这种方式,可以将调用的 `start()` 函数改为 `run()` 或其他实际进程执行的函数, > 这样就可以调试进程内部了。当解决了 Bug 后,就可以换回 `start()` 函数并行执行。 ## 进程池(Pool) 可以发现,上面两种创建进程的方式,都是用到了一个 `distrib_works()` 函数来分配各个进程的任务。 这一过程可以被一个叫做进程池的类型代替。 ### 进程池构造 构造进程池的方法非常简单,导入 `Pool` 之后,直接构造 `Pool` 对象。 构造时可以指定最多的进程数量,默认是 CPU 核心数。 ```python from multiprocess import Pool p = Pool(process=6) ``` ### 进程池方法 `Pool` 类型主要有以下方法: - `apply_async()` 和 `apply()`:这两个函数都是让进程池开始执行任务,`apply_async()` 是非阻塞的(主进程继续执行),`apply()` 是阻塞的(主进程等待子进程执行完成后继续执行)。 - `close()`:关闭进程池,不再接收新任务。 - `join()`:[主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。][python多进程-cnblogs] ### 进程池调用示例 将上面一段使用 `FindTargetTaxiProcess` 类编写的代码用 `Pool` 重写: ```python from multiprocess import Process, Pool, Lock def find_target(in_file, lock, log_file): with open(filename) as in_file: for row in tqdm(in_file, ncols=80, position=self.index): cells = row.split(",") if int(cells[0]) == 11865: with lock: with open(log_file, mode="a") as log: print(row, file=log) if __name__ == '__main__': LOCK = multiprocessing.Lock() LOG_FILE = r"E:\出租车点\上下车点\scripts\data\find_error.log" ROOT_DIR = r"E:\出租车点\201502\RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] POOL = Pool(process=6) for f in INPUT_FILES: POOL.apply_async(find_target, (f, LOCK, LOG_FILE)) POOL.close() POOL.join() ``` # 多进程共享资源 当我们把多个任务分解到 $n$ 个进程上执行时,这 $n$ 个进程往往会存在某种共享的资源, 如共享一个控制台、文件系统、列表或字典。这里存在两个问题: - 当多个进程同时访问这些资源时,就会产生冲突。例如,两个进程同时对控制台输出文本,写入的结果可以错综复杂,并不是两段文本的顺序组合。 - 各个进程有自己的内存空间,变量无法共享。例如,当想要利用多个进程操作主进程的一个列表时,各个进程操作结束后,主进程仍然是原来的状态。 这两个问题的解决,前者靠“锁”机制,后者靠“共享变量”机制。 ## 锁 对于冲突的情况,当使用 tqdm 显示多个进度条时比较明显。在 Windows 上,由于 “tqdm 无法获取默认锁”,因此控制台输出会比较乱,下面是一段程序在 Windows 上运行的效果: ```plain λ python3 Process 0: 0it [00:00, ?it/s] Process 1: 0it [00:00, ?it/s] Process 0: 273516it [00:00, 523719.79it/s] Process 0: 995883it [00:01, 510379.67it/s] Process 0: 1107387it [00:02, 510326.10it/s] Process 0: 1224813it [00:02, 512761.81it/s] Process 0: 3483799it [00:06, 539191.83it/s] Process 1: 3683852it [00:06, 571536.15it/s] Process 0: 3550015it [00:06, 540296.03it/s] Process 0: 3615558it [00:06, 540947.45it/s] Process 0: 3742521it [00:06, 542112.37it/s] ``` 而在 Linux 系统中的运行结果是 ```plain Process 0: 2045720it [00:03, 647073.52it/s] Process 1: 2092184it [00:03, 661530.01it/s] Process 2: 2065411it [00:03, 652446.31it/s] Process 3: 2093610it [00:03, 661782.04it/s] ``` 可见在访问共享资源的时候,加锁是非常有必要的。 ### 互斥锁(Lock) `Lock` 属于“互斥锁”,即[保证在任一时刻,只能有一个线程访问该对象。][使用Lock互斥锁] 通过 `Lock` 类型创建互斥锁后,将其传递到子进程内部,即可在子进程中使用。 使用 `Lock` 时,可以使用 `with` 语句加锁, `with` 语句块执行完成后自动解锁; 也可以通过其 `acquire()` 函数来加锁,使用 `release()` 函数解锁。 使用 `with` 语句进行加锁的示例代码如下: ```python def run(self): for filename in self.input_files: with open(filename, encoding="GB2312") as in_file: for row in tqdm(in_file): cells = row.split(",") if int(cells[0]) == 11865: with self.lock: with open(TARGET_TAXI_FILE, mode="a") as log: print(row, file=log) ``` > 这段代码在 Windows 上运行时,子进程内部的 lock 和 主进程传递进去的 lock 的 id 值不相同。 > 但是在 Linux 系统上时相同的。因此 Windows 上这段代码有可能会出错。 > 不过当文件被一个进程打开时,是无法被另一个进程打开的,因此这段程序的结果倒没出什么错。 ### 可重入锁(RLock) 互斥锁可以解决简单的避免资源冲突的问题,但当一个线程加锁后仍需要再次访问共享资源时, 就形成了嵌套锁,而使用互斥锁时就形成了“死锁”问题。这时我们需要使用 `RLock` 类型, 即“可重入锁”。 > 死锁的含义是:是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象, > 若无外力作用,它们都将无法推进下去。 > 此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 > 避免死锁主要方法是正确有序的分配资源。 `multiprocess` 中的 `RLock` 类型与 `Lock` 类型的区别在于: [RLock允许在同一线程中被多次申请。而 Lock 却不允许这种情况。][Python中Lock与RLock] 因此,[如果使用 `RLock` ,那么 acquire() 和 release() 必须成对出现][Python中Lock与RLock], 调用了几次 `acquire()`,就需要调用几次 `release()`。 ### 条件锁(Condition) 条件同步机制是指:线程 $B$ 等待特定条件 $C$ ,而另一个线程 $A$ 发出特定条件满足的信号 $C$ 。 $B$ 在收到信号 $C$ 时,继续执行。 可以通过“生产者-消费者”模型来理解这一过程。 生产者获取锁,生产一个随机整数,通知消费者并释放锁。 消费者获取锁,如果有整数则消耗一个整数并释放锁,如果没有就等待生产者继续生产。 示例代码如下(参考[《Python 线程同步机制》][Python线程同步机制]并进行修改): ```python import multiprocess class Producer(multiprocessing.Process): def __init__(self, productList, condition): multiprocessing.Process.__init__(self) self.productList = productList # type: List self.condition = condition # type: multiprocess.Condition def run(self): while True: product = random.randint(0, 100) with self.condition: print("条件锁:被 生产者 获取") self.productList.append(product) print(f"生产者:产生了 {product}。") print("生产者:唤醒消费者线程") self.condition.notify() print("条件锁:被 生产者 释放") time.sleep(1) class Customer(multiprocessing.Process): def __init__(self, productList, condition): multiprocessing.Process.__init__(self) self.productList = productList # type: List self.condition = condition # type: multiprocess.Condition def run(self): while True: with self.condition: print("条件锁:被 消费者 获取") while True: if self.productList: product = self.productList.pop() print(f"消费者:消费了 {product}") break print("消费者:等待生产者") self.condition.wait() print("条件锁:被 消费者 释放") def main(): manager = multiprocessing.Manager() productList = manager.list() condition = multiprocessing.Condition() process_producer = Producer(productList, condition) process_customer = Customer(productList, condition) process_producer.start() process_customer.start() process_producer.join() process_customer.join() if __name__ == '__main__': main() ``` 运行的部分结果是: ```plain 条件锁:被 生产者 获取 生产者:产生了 47。 生产者:唤醒消费者线程 条件锁:被 生产者 释放 条件锁:被 消费者 获取 消费者:消费了 47 条件锁:被 消费者 释放 条件锁:被 消费者 获取 消费者:等待生产者 条件锁:被 生产者 获取 生产者:产生了 100。 生产者:唤醒消费者线程 条件锁:被 生产者 释放 消费者:消费了 100 条件锁:被 消费者 释放 条件锁:被 消费者 获取 消费者:等待生产者 条件锁:被 生产者 获取 生产者:产生了 95。 生产者:唤醒消费者线程 条件锁:被 生产者 释放 消费者:消费了 95 条件锁:被 消费者 释放 条件锁:被 消费者 获取 消费者:等待生产者 ``` ### 信号量(Semaphore) 信号量是一个非负整数,所有通过它的进程都会将该整数减一, 当该整数值为零时,所有试图通过它的进程都将处于等待状态。 ```python from multiprocessing import Process, current_process, Semaphore import time def worker(s, i): s.acquire() print(current_process().name + "acquire"); time.sleep(i) print(current_process().name + "release\n"); s.release() if __name__ == "__main__": s = Semaphore(2) for i in range(5): p = Process(target = worker, args=(s, i*2)) p.start() ``` ## 共享变量 在多进程中,是无法直接使用全局变量作为共享变量的,因为不同进程具有不同的内存空间。 但是,共享变量也是不能避免的。Python 中也提供了一些创建共享变量的方法。 - Multiprocess 包内置类型 - 通过 Manager 创建共享变量 ### multiprocess 包内置类型 multiprocess 包提供了两种类型的共享变量: - `Value(typecode_or_type, *args, lock=True)`:表示一个值类型变量。 - `Array(typecode_or_type, size_or_initializer, *, lock=True)`:表示一个数组。这种创建数组的方式能力比较有限,它不支持除了 C 数据类型以外的类型。 `typecode_or_type` 描述了元素的类型,可取值是: | typecode | type | | -------- | --------------- | | 'c' | ctypes.c_char | | 'u' | ctypes.c_wchar | | 'b' | ctypes.c_byte | | 'B' | ctypes.c_ubyte | | 'h' | ctypes.c_short | | 'H' | ctypes.c_ushort | | 'i' | ctypes.c_int | | 'I' | ctypes.c_uint | | 'l' | ctypes.c_long | | 'L' | ctypes.c_ulong | | 'f' | ctypes.c_float | | 'd' | ctypes.c_doubl | 创建后,只要将这些变量传递给子进程即可。 ### 通过 Manager 创建共享变量 Manager() 返回的 manager 对象提供一个服务进程,使得其他进程可以通过代理的方式操作 Python 对象。 Manager 支持 list、dict 等多种数据类型。 ([多进程multiprocess][liujiang]) 把之前的共享变量的代码中,共享的变量由 list 改为 Manager 对象创建的 list,可以得到正确结果。 ```python from multiprocessing import Process, Lock, Manager import time def work(lock, var, index): with lock: var.append(index) print(f"Process {index} apped {index}") if __name__ == '__main__': var = Manager().list() lock = Lock() process_list = [Process(target=work, args=(lock, var, i)) for i in range(8)] for p in process_list: p.start() for p in process_list: p.join() print(var) ``` # 进程间通信 进程间通信,可以起到共享变量的效果,也可以起到锁的效果。 进程间通信的方式有三种: - 事件(Event) - 队列(Queue) - 管道(Pipe) ## 通过事件(Event)通信 Event 是同步通信的方式,有些类似于条件锁。由于是它是同步的,而且不能传递数据。 因此这里就不仔细研究 Event 的作用。 这个例子示例了主进程与子进程之间通过 Event 进行通信的方法。 ```python import multiprocessing import time def wait_for_event(e): print("wait_for_event: starting") e.wait() print("wairt_for_event: e.is_set()->" + str(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout:starting") e.wait(t) print("wait_for_event_timeout:e.is_set->" + str(e.is_set())) if __name__ == "__main__": e = multiprocessing.Event() w1 = multiprocessing.Process(target=wait_for_event, args=(e,)) w2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 6)) w1.start() w2.start() time.sleep(10) print("main: event setting") e.set() print("main: event is set") ``` ## 通过队列(Queue)通信 Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。 Queue 有两个方法: - `put()`:将数据插入队列中。 - `get()`:从队列读取并且删除一个元素。 这两个方法都有两个参数:`blocked`, `timeout`, 控制队满和队空两种情况: - `put`:当队满时,如果 `blocked=True` ,那么会阻塞 `timeout` 指定的时间,直到队列有空间。如果超时,或 `blocked=False` ,则抛出 `Queue.Full` 异常。 - `get`:当队满时,如果 `blocked=True` ,那么会阻塞 `timeout` 指定的时间,直到队列有元素。如果超时,或 `blocked=False` ,则抛出 `Queue.Empty` 异常。 调用实例: ```python class FineTargetTaxiProcess(mp.Process): ''' 处理进程:多进程方式处理文件,结果全部传递给打印进程。 ''' def __init__(self, input_files, index, queue): mp.Process.__init__(self, target=self.pick, args=(queue,)) self.input_files = input_files self.index = index self.lock = lock def pick(self, queue): for filename in tqdm(self.input_files, ncols=80, position=self.index, desc=f"Process {self.index}"): with open(filename, encoding="GB2312") as in_file: for row in in_file: cells = row.split(",") if int(cells[0]) == 11865: try: queue.put(",".join(cells), block=False) except: print("Queue full") class PrinterProcess(mp.Process): ''' 打印进程:维持对输出文件的打开状态,打印数据。 可避免频繁打开、关闭结果文件造成的系统开销, 但是引入了消息传递的开销。 ''' def __init__(self, output_file, log, queue): mp.Process.__init__(self, target=self.write, args=(queue,)) self.output_file = output_file self.log_file = log def write(self, queue): with open(self.output_file, mode="w", newline="\n") as printer, open(self.log_file, mode="w") as log: while True: try: row = queue.get(block=True, timeout=1) print(row, file=printer) except: print("Queue empty", file=log) if __name__ == '__main__': lock = mp.Lock() ROOT_DIR = r"/mnt/e/出租车点/201502/RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] GROUP_LIST = distrib_works(INPUT_FILES, 6) QUEUE = mp.Queue() PROCESS_LIST = [FineTargetTaxiProcess(element, i, QUEUE) for i, element in enumerate(GROUP_LIST)] PRINTER_PROCESS = PrinterProcess("./data/usequeue.txt", "./data/usequeue.log", QUEUE) for process in PROCESS_LIST: process.daemon = True process.start() PRINTER_PROCESS.daemon = True PRINTER_PROCESS.start() for p in PROCESS_LIST: p.join() ``` ## 通过管道(Pipe)通信 Pipe 是一个可以双向通信的对象,返回 `(conn1, conn2)`, 代表一个管道的两个端, `conn1` 只负责接受消息, `conn2` 只负责发送消息。 如果设置了 `duplex=True` ,那么这个管道是全双工模式, `conn1` 和 `conn2` 均可收发。 ```python class FineTargetTaxiProcess(mp.Process): ''' 处理进程:多进程方式处理文件,结果全部传递给打印进程。 ''' def __init__(self, input_files, index, pipe): mp.Process.__init__(self, target=self.pick, args=(pipe,)) self.input_files = input_files self.index = index self.lock = lock def pick(self, pipe): for filename in tqdm(self.input_files, ncols=80, position=self.index, desc=f"Process {self.index}"): with open(filename, encoding="GB2312") as in_file: for row in in_file: cells = row.split(",") if int(cells[0]) == 11865: try: pipe.send(",".join(cells)) except e as Exception: print("Pipe send error") class PrinterProcess(mp.Process): ''' 打印进程:维持对输出文件的打开状态,打印数据。 可避免频繁打开、关闭结果文件造成的系统开销, 但是引入了消息传递的开销。 ''' def __init__(self, output_file, log, pipe): mp.Process.__init__(self, target=self.write, args=(pipe,)) self.output_file = output_file self.log_file = log def write(self, pipe): with open(self.output_file, mode="w", newline="\n") as printer, open(self.log_file, mode="w") as log: while True: try: row = pipe.recv() print(row, file=printer) except e as Exception: print("Pipe read error", file=log) if __name__ == '__main__': lock = mp.Lock() # ROOT_DIR = "../data/201502/temp" ROOT_DIR = r"/mnt/e/出租车点/201502/RawCSV" INPUT_FILES = [os.path.join(ROOT_DIR, f) for f in os.listdir(ROOT_DIR)] GROUP_LIST = distrib_works(INPUT_FILES, 6) (RECEIVER, SENDER) = mp.Pipe() PROCESS_LIST = [FineTargetTaxiProcess(element, i, SENDER) for i, element in enumerate(GROUP_LIST)] PRINTER_PROCESS = PrinterProcess("./data/usepipe.txt", "./data/usepipe.log", RECEIVER) for process in PROCESS_LIST: process.daemon = True process.start() PRINTER_PROCESS.daemon = True PRINTER_PROCESS.start() for p in PROCESS_LIST: p.join() ``` # 其他 ## tqdm 多进度条 是一个快速,可扩展的 Python 进度条,可以在 Python 长循环中添加一个进度提示信息, 用户只需要封装任意的迭代器 `tqdm(iterator)` 。 这里有一些参数: - `ncols`:整个进度条(包括条以及其他文字)的宽度。最好设置一个小于控制台总宽的值。 - `mininterval`:进度条更新的最小间隔。默认为 0.1。 - `position`:进度条的位置,从0开始。对不同的 tqdm 对象设置不同的 position,可以在控制台的不同位置显示出来,适用于多进程与多线程。 > 由于 Windows 上多进程时 tqdm 无法获取默认的锁,所以会出现进度条错乱。在 Linux 上是没有问题的。 ## Windows 上 Lock 的问题 其实每次传入子进程函数内部的 `Lock`,在各个进程中的 `id` 都不一样。在 Linux 下没有这个问题。 这往往会导致一些程序在 Windows 上不正确。 因此,在 Windows 上最好少用 `Lock`,多采用消息传递或共享变量的方式设计程序。 [multiprocess-efficiency]: [join-explain]: [python多进程-cnblogs]: [Python中Lock与RLock]: [使用Lock互斥锁]: [Python线程同步机制]: [liujiang]:
