from multiprocessing import Process, Pool, current_process, cpu_count, Queue, Lock, Manager, Value, Array
import os
import time
import random
from queue import Empty, Full
import ctypes

def mul(x):

# 当前运行进程
print(f'当前进程 (PID: {os.getpid()}): {x}')
time.sleep(2)
return x * x

def push(q):

time.sleep(2)
q.put("hello")

def worker():

print(f'子进程 (PID: {os.getpid()})')
time.sleep(5)
print(f'子进程结束 (PID: {os.getpid()})')

def callback(result):

print(f'回调函数 (PID: {os.getpid()})')
print("结果: " + str(result))

def run_pool_map_sync():

PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
    results = pool.map(mul, range(10), chunksize=2)
    print(results)

def run_pool_map_async():

PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
    # pool.map_async(mul, range(10), chunksize=2, callback=callback)
    # print("map_async结果:", results.get())
    # print(results)

    results = pool.map_async(mul, range(100), chunksize=2)
    print("hello")
    print("world")
    print("map_async结果:", results.get())

def run_pool_apply_sync():

PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
    result = pool.apply(mul, (8,))
    print(result)

def run_pool_apply_async():

PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES) as pool:
    # result = pool.apply_async(mul, (8,))
    # print(result.get())

    # print("hello")
    # print("world")
    # result1 = pool.apply_async(mul, (2,))
    # print(result1.get())

    # 带回调的apply_async
    # results = [pool.apply_async(mul, (i,), callback=callback) for i in range(10)]
    # for result in results:
    #     print(result.get())

    results = [pool.apply_async(mul, (i,)) for i in range(10)]
    for result in results:
        print(result.get())

def producer(q):

print(f'生产者 (PID: {os.getpid()})')
for i in range(50):
    time.sleep(1)
    q.put('product_' + str(i))
q.close()

def consumer(q):

print(f'消费者 (PID: {os.getpid()})')
try:
    while True:
        product = q.get(True, 5) # 阻塞5秒, 如果5秒内没有获取到数据, 则抛出异常
        if product is None:
            break
        print(f'消费者 (PID: {os.getpid()}) 消费了 {product}')
except Empty as queueEmptyException:
    print(f"队列为空, 退出进程(PID: {os.getpid()})")
except Exception as e:
    print(e)

def run_queue():

q = Queue()
p = Process(target=push, args=(q,))
p.start()
print("world")
print(q.get())
p.join()

def run_producer_consumer_queue():

q = Queue()

processes = []

# 运行生产者进程
p = Process(target=producer, args=(q,))
p.start()
processes.append(p)

# 运行消费者进程 (单个)
c = Process(target=consumer, args=(q,))
c.start()
processes.append(c)

# # 运行消费者进程 (多个)
# for i in range(5):
#     c = Process(target=consumer, args=(q,))
#     c.start()

#     processes.append(c)

# 等待生产者进程结束
for p in processes:
    p.join()

print("主进程结束")


def producer_async(q):

print(f'生产者 (PID: {os.getpid()})')
for i in range(50):
    # time.sleep(1)
    q.put('product_' + str(i))
q.close()

def consumer_async():

print(f'消费者 (PID: {os.getpid()})')
try:
    while True:
        product = worker_queue.get(True, 5) # 阻塞5秒, 如果5秒内没有获取到数据, 则抛出异常
        if product is None:
            break
        print(f'消费者 (PID: {os.getpid()}) 消费了 {product}')
except Empty as queueEmptyException:
    print(f"队列为空, 退出进程(PID: {os.getpid()})")
except Exception as e:
    print(e)

def init_worker(q):

global worker_queue
worker_queue = q

def run_producer_consumer_queue_async():

q = Queue()

# 运行生产者进程
p = Process(target=producer_async, args=(q,))
p.start()

# 运行消费者进程
PROCESSES = cpu_count()
print(f'CPU核心数: {PROCESSES}')
with Pool(PROCESSES, initializer=init_worker, initargs=(q,)) as pool:
    results = [pool.apply_async(consumer_async, ()) for i in range(10)]
    for result in results:
        print(result.get())

def run_with_lock(lock, i):

lock.acquire()

try:
    print(f'子进程 (PID: {os.getpid()}): {i}')
    time.sleep(random.randint(1, 3))
finally:
    lock.release()

def run_lock():

lock = Lock()

for i in range(5):
    p = Process(target=run_with_lock, args=(lock, i,))
    p.start()

def worker_with_manager(shared_list, i):

print(f'子进程 (PID: {os.getpid()}): {i}')
time.sleep(random.randint(1, 3))
shared_list[i] = i ** 2

def run_with_manager():

# 创建一个Manager对象
with Manager() as manager:
    shared_list = manager.list(range(5))

    processes = []
    for i in range(5):
        p = Process(target=worker_with_manager, args=(shared_list, i))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(shared_list)  # 输出: [0, 1, 4, 9, 16]

def worker_val(counter):

print(f'子进程 (PID: {os.getpid()})')
for i in range(100000):
    with counter.get_lock():
        counter.value += 1

def run_with_shared_value():

counter = Value(ctypes.c_int, 0)  # 共享一个整数,初始值为 0

p1 = Process(target=worker_val, args=(counter,))
p2 = Process(target=worker_val, args=(counter,))

p1.start()
p2.start()
p1.join()
p2.join()

print(f"主进程输出最终的 counter.value:{counter.value}")

def worker_arr(arr):

for j in range(10000):
    for i in range(5):
        arr[i] = j * 10

def run_with_shared_array():

# 创建一个共享数组,长度为5,初始值全为0
shared_array = Array('i', 5)  # 'i' 表示整数类型

# 创建多个进程,每个进程修改数组中的一个元素
processes = []
for i in range(5):
    p = Process(target=worker_arr, args=(shared_array,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

# 输出最终的数组内容
print("主进程输出最终的 shared_array:", list(shared_array))

if name == '__main__':

print(f'主进程 (PID: {os.getpid()})')

# p = Process(target=worker, args=())
# p.start()
# print(f'hello')
# print(f'world')
# p.join()

# run_pool_map_sync()
# run_pool_map_async()

# run_pool_apply_sync()

# run_pool_apply_async()

# run_queue()

# 生产者-消费者模式
# run_producer_consumer_queue()

# run_producer_consumer_queue_async()

# 锁模式
# run_lock()

# 管理器模式
# run_with_manager()

# 共享值模式
run_with_shared_value()

# 共享数组模式
# run_with_shared_array()












标签: none

添加新评论

icon_mrgreen.gificon_neutral.gificon_twisted.gificon_arrow.gificon_eek.gificon_smile.gificon_confused.gificon_cool.gificon_evil.gificon_biggrin.gificon_idea.gificon_redface.gificon_razz.gificon_rolleyes.gificon_wink.gificon_cry.gificon_surprised.gificon_lol.gificon_mad.gificon_sad.gificon_exclaim.gificon_question.gif