python多进程编程
from multiprocessing import Process, Pool, current_process, cpu_count, Queue, Lock, Manager, Value, Array
import os
import time
import random
from queue import Empty, Full
import ctypes
def mul(x):
# 当前运行进程
print(f'当前进程 (PID: {os.getpid()}): {x}')
time.sleep(2)
return x * x
def push(q):
time.sleep(2)
q.put("hello")
def worker():
print(f'子进程 (PID: {os.getpid()})')
time.sleep(5)
print(f'子进程结束 (PID: {os.getpid()})')
def callback(result):
print(f'回调函数 (PID: {os.getpid()})')
print("结果: " + str(result))
def run_pool_map_sync():
PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
results = pool.map(mul, range(10), chunksize=2)
print(results)
def run_pool_map_async():
PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
# pool.map_async(mul, range(10), chunksize=2, callback=callback)
# print("map_async结果:", results.get())
# print(results)
results = pool.map_async(mul, range(100), chunksize=2)
print("hello")
print("world")
print("map_async结果:", results.get())
def run_pool_apply_sync():
PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
result = pool.apply(mul, (8,))
print(result)
def run_pool_apply_async():
PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
# result = pool.apply_async(mul, (8,))
# print(result.get())
# print("hello")
# print("world")
# result1 = pool.apply_async(mul, (2,))
# print(result1.get())
# 带回调的apply_async
# results = [pool.apply_async(mul, (i,), callback=callback) for i in range(10)]
# for result in results:
# print(result.get())
results = [pool.apply_async(mul, (i,)) for i in range(10)]
for result in results:
print(result.get())
def producer(q):
print(f'生产者 (PID: {os.getpid()})')
for i in range(50):
time.sleep(1)
q.put('product_' + str(i))
q.close()
def consumer(q):
print(f'消费者 (PID: {os.getpid()})')
try:
while True:
product = q.get(True, 5) # 阻塞5秒, 如果5秒内没有获取到数据, 则抛出异常
if product is None:
break
print(f'消费者 (PID: {os.getpid()}) 消费了 {product}')
except Empty as queueEmptyException:
print(f"队列为空, 退出进程(PID: {os.getpid()})")
except Exception as e:
print(e)
def run_queue():
q = Queue()
p = Process(target=push, args=(q,))
p.start()
print("world")
print(q.get())
p.join()
def run_producer_consumer_queue():
q = Queue()
processes = []
# 运行生产者进程
p = Process(target=producer, args=(q,))
p.start()
processes.append(p)
# 运行消费者进程 (单个)
c = Process(target=consumer, args=(q,))
c.start()
processes.append(c)
# # 运行消费者进程 (多个)
# for i in range(5):
# c = Process(target=consumer, args=(q,))
# c.start()
# processes.append(c)
# 等待生产者进程结束
for p in processes:
p.join()
print("主进程结束")
def producer_async(q):
print(f'生产者 (PID: {os.getpid()})')
for i in range(50):
# time.sleep(1)
q.put('product_' + str(i))
q.close()
def consumer_async():
print(f'消费者 (PID: {os.getpid()})')
try:
while True:
product = worker_queue.get(True, 5) # 阻塞5秒, 如果5秒内没有获取到数据, 则抛出异常
if product is None:
break
print(f'消费者 (PID: {os.getpid()}) 消费了 {product}')
except Empty as queueEmptyException:
print(f"队列为空, 退出进程(PID: {os.getpid()})")
except Exception as e:
print(e)
def init_worker(q):
global worker_queue
worker_queue = q
def run_producer_consumer_queue_async():
q = Queue()
# 运行生产者进程
p = Process(target=producer_async, args=(q,))
p.start()
# 运行消费者进程
PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES, initializer=init_worker, initargs=(q,)) as pool:
results = [pool.apply_async(consumer_async, ()) for i in range(10)]
for result in results:
print(result.get())
def run_with_lock(lock, i):
lock.acquire()
try:
print(f'子进程 (PID: {os.getpid()}): {i}')
time.sleep(random.randint(1, 3))
finally:
lock.release()
def run_lock():
lock = Lock()
for i in range(5):
p = Process(target=run_with_lock, args=(lock, i,))
p.start()
def worker_with_manager(shared_list, i):
print(f'子进程 (PID: {os.getpid()}): {i}')
time.sleep(random.randint(1, 3))
shared_list[i] = i ** 2
def run_with_manager():
# 创建一个Manager对象
with Manager() as manager:
shared_list = manager.list(range(5))
processes = []
for i in range(5):
p = Process(target=worker_with_manager, args=(shared_list, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(shared_list) # 输出: [0, 1, 4, 9, 16]
def worker_val(counter):
print(f'子进程 (PID: {os.getpid()})')
for i in range(100000):
with counter.get_lock():
counter.value += 1
def run_with_shared_value():
counter = Value(ctypes.c_int, 0) # 共享一个整数,初始值为 0
p1 = Process(target=worker_val, args=(counter,))
p2 = Process(target=worker_val, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"主进程输出最终的 counter.value:{counter.value}")
def worker_arr(arr):
for j in range(10000):
for i in range(5):
arr[i] = j * 10
def run_with_shared_array():
# 创建一个共享数组,长度为5,初始值全为0
shared_array = Array('i', 5) # 'i' 表示整数类型
# 创建多个进程,每个进程修改数组中的一个元素
processes = []
for i in range(5):
p = Process(target=worker_arr, args=(shared_array,))
p.start()
processes.append(p)
for p in processes:
p.join()
# 输出最终的数组内容
print("主进程输出最终的 shared_array:", list(shared_array))
if name == '__main__':
print(f'主进程 (PID: {os.getpid()})')
# p = Process(target=worker, args=())
# p.start()
# print(f'hello')
# print(f'world')
# p.join()
# run_pool_map_sync()
# run_pool_map_async()
# run_pool_apply_sync()
# run_pool_apply_async()
# run_queue()
# 生产者-消费者模式
# run_producer_consumer_queue()
# run_producer_consumer_queue_async()
# 锁模式
# run_lock()
# 管理器模式
# run_with_manager()
# 共享值模式
run_with_shared_value()
# 共享数组模式
# run_with_shared_array()