分类 技术 下的文章

运行task进程:

use \Workerman\Worker;
use \Workerman\Connection\TcpConnection;

// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 10;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{

    echo "接收到任务: " . $task_data . PHP_EOL;

    // 假设发来的是json数据
    $task_data = json_decode($task_data, true);
     // 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
    //  $task_result = ......

    sleep(3);
    $task_result = array(
        'task_id' => $task_data['task_id'],
        'result' => 'task result',
    );

    // 发送结果
    $connection->send(json_encode($task_result));
};

Worker::runAll();

- 阅读剩余部分 -

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import os
import random
import threading

def task(name):

print(f"Hello from task {name} started (PID: {os.getpid()})")
time.sleep(2)
print(f"Task {name} finished")
return f"Result of {name}"

def task1(name):

print(f"Hello from task {name} started (PID: {os.getpid()})")
time.sleep(random.randint(1, 4))
print(f"Task {name} finished")
return f"Result of {name}"

def task2(name):

print(f"Hello from task {name} started (PID: {os.getpid()})")
while True:
    if exit_event.is_set():
        print("任务中断...")
        break
    time.sleep(random.randint(1, 4))
    print(f"Task {name} running...")

if name == '__main__':

print(f"主进程 (PID: {os.getpid()})")

# with ThreadPoolExecutor(max_workers=3) as executor:
#     future1 = executor.submit(task, "A")
#     future2 = executor.submit(task, "B")
#     future3 = executor.submit(task, "C")

#     print(future1.result())
#     print(future2.result())
#     print(future3.result())
    

# 多线程实现
# with ThreadPoolExecutor(max_workers=3) as executor:

#     futures = [executor.submit(task1, f"task_{i}") for i in range(5)]
#     # 按照完成顺序获取结果
#     for future in as_completed(futures):
#         print(future.result())

# 多进程实现 (调用Multiprocessing模块)
with ProcessPoolExecutor(max_workers=3) as executor:

    futures = [executor.submit(task1, f"task_{i}") for i in range(5)]
    # 按照完成顺序获取结果
    for future in as_completed(futures):
        print(future.result())





from multiprocessing import Process, Pool, current_process, cpu_count, Queue, Lock, Manager, Value, Array
import os
import time
import random
from queue import Empty, Full
import ctypes

def mul(x):

# 当前运行进程
print(f'当前进程 (PID: {os.getpid()}): {x}')
time.sleep(2)
return x * x

def push(q):

time.sleep(2)
q.put("hello")

def worker():

print(f'子进程 (PID: {os.getpid()})')
time.sleep(5)
print(f'子进程结束 (PID: {os.getpid()})')

- 阅读剩余部分 -

添加文件

hadoop fs -appendToFile /tmp/country.csv /my/country

创建文件夹

hadoop fs -mkdir -p /my/files

删除文件夹

hadoop fs -rm -r -f /my
hadoop fs -rm -r -f /my/files