0
0
Lập trình
Admin Team
Admin Teamtechmely

Giao tiếp giữa các tiến trình trong Python Multiprocessing

Đăng vào 5 tháng trước

• 7 phút đọc

Giao tiếp giữa các tiến trình (IPC) trong Python

Giao tiếp giữa các tiến trình (IPC) là một cơ chế cho phép các tiến trình độc lập trao đổi dữ liệu và phối hợp hành động của chúng, vì mỗi tiến trình có không gian bộ nhớ riêng. Trong Python, việc giao tiếp giữa các tiến trình được thực hiện qua các công cụ như Queue, Pipe, Manager, Value, ArraySharedMemory. Bài viết này sẽ giúp bạn hiểu rõ hơn về các công cụ này và cách sử dụng chúng hiệu quả trong lập trình đa tiến trình.

1. Tổng quan về IPC

Giao tiếp giữa các tiến trình là rất quan trọng trong lập trình đa tiến trình vì nó giúp các tiến trình khác nhau có thể tương tác với nhau mà không gây ra xung đột. Các phương pháp IPC phổ biến trong Python bao gồm:

  • Queue: Dùng để trao đổi dữ liệu giữa các tiến trình một cách an toàn.
  • Pipe: Cung cấp giao tiếp hai chiều giữa hai tiến trình.
  • Manager: Cho phép chia sẻ các đối tượng Python giữa các tiến trình.
  • Value và Array: Cung cấp khả năng chia sẻ các giá trị đơn giản và mảng trong bộ nhớ chia sẻ.
  • SharedMemory: Cung cấp một khối bộ nhớ chia sẻ cho nhiều tiến trình mà không cần serial hóa.

2. multiprocessing.Queue

Trong mô hình đa tiến trình, Queue là một cách an toàn để các tiến trình trao đổi dữ liệu. Nó sử dụng các pipe và khóa để đảm bảo rằng nhiều tiến trình có thể thực hiện put()get() mà không bị xung đột.

2.1 Cách hoạt động của Queue

  • Khi một tiến trình thực hiện put() vào Queue, dữ liệu sẽ được pickle (chuỗi hóa), gửi qua một pipe và sau đó được unpickle (giải mã) ở phía nhận.
  • Queue hoạt động giống như queue.Queue trong threading, nhưng được thiết kế cho các tiến trình.

2.2 Ví dụ sử dụng Queue

python Copy
from multiprocessing import Process, Queue
import time

def producer(q):
    for i in range(5):
        print(f"Producing {i}")
        q.put(i)
        time.sleep(0.5)
    q.put(None)  # Sentinel: cho biết "chúng ta đã xong"

def consumer(q):
    while True:
        item = q.get()  # chặn cho đến khi có dữ liệu
        if item is None:
            break
        print(f"Consumed {item}")

if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

2.3 Các hàm quan trọng

  • q.put(item): Thêm một mục vào hàng đợi.
  • q.get(): Lấy và trả về một mục (chặn nếu hàng đợi rỗng).
  • q.empty(): Trả về True nếu hàng đợi rỗng (không hoàn toàn đáng tin cậy).

3. multiprocessing.Pipe

Pipe là dạng giao tiếp giữa các tiến trình cơ bản nhất. Nó tương tự như một đường dây điện thoại hai chiều kết nối hai tiến trình. Pipe trả về hai đối tượng kết nối (conn1, conn2).

3.1 Cách hoạt động của Pipe

  • Một đầu gửi (send()), đầu còn lại nhận (recv()).
  • Không giống như Queue, Pipe là giao tiếp điểm-điểm (giữa hai tiến trình).

3.2 Ví dụ sử dụng Pipe

python Copy
from multiprocessing import Process, Pipe

def worker(conn):
    conn.send("Message from worker")
    msg = conn.recv()
    print("Worker got:", msg)
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()

    print("Parent got:", parent_conn.recv())
    parent_conn.send("Ack from parent")

    p.join()

4. multiprocessing.Manager

Manager cho phép các tiến trình chia sẻ các đối tượng Python (list, dict, Namespace, v.v.) một cách an toàn. Mặc dù nó chậm hơn so với QueuePipe vì nó sử dụng proxy và pickle.

4.1 Ví dụ sử dụng Manager

python Copy
from multiprocessing import Process, Manager

def worker(shared_list, shared_dict):
    shared_list.append("hello")
    shared_dict["count"] = shared_dict.get("count", 0) + 1 

if __name__ == "__main__":
    with Manager() as manager:
        shared_list = manager.list()
        shared_dict = manager.dict()

        processes = [Process(target=worker, args=(shared_list, shared_dict)) for _ in range(3)]
        for p in processes: p.start()
        for p in processes: p.join()

        print("Final list:", list(shared_list))
        print("Final dict:", dict(shared_dict))

5. multiprocessing.Valuemultiprocessing.Array

Value tạo ra một biến scalar đơn (như int, double, char, v.v.) trong bộ nhớ chia sẻ mà nhiều tiến trình có thể truy cập và sửa đổi một cách an toàn. Array tạo ra một mảng kích thước cố định của các phần tử trong bộ nhớ chia sẻ.

5.1 Ví dụ sử dụng ValueArray

python Copy
from multiprocessing import Process, Value, Array

def worker(num, arr):
    num.value += 1
    for i in range(len(arr)):
        arr[i] *= -1

if __name__ == "__main__":
    num = Value('i', 0)
    arr = Array('i', [1, 2, 3])
    p = Process(target=worker, args=(num, arr))
    p.start(); p.join()
    print("num:", num.value)    # 1
    print("arr:", arr[:])       # [-1, -2, -3]

6. multiprocessing.shared_memory (Python 3.8+)

multiprocessing.shared_memory cung cấp khối bộ nhớ chia sẻ sống ngoài bất kỳ tiến trình nào. Nó cung cấp quyền truy cập trực tiếp vào một khối bộ nhớ mà nhiều tiến trình có thể sử dụng mà không cần pickle, sao chép hoặc proxy. Điều này nhanh hơn nhiều so với Manager.

6.1 Ví dụ sử dụng shared_memory

python Copy
from multiprocessing import shared_memory, Process
import numpy as np

def worker(name, shape):
    shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray(shape, dtype=np.int64, buffer=shm.buf)
    arr *= 2  # gấp đôi tất cả giá trị
    shm.close()

if __name__ == "__main__":
    shm = shared_memory.SharedMemory(create=True, size=5 * 8)
    arr = np.ndarray((5,), dtype=np.int64, buffer=shm.buf)
    arr[:] = [1, 2, 3, 4, 5]

    print("Before:", arr)

    p = Process(target=worker, args=(shm.name, arr.shape))
    p.start(); p.join()

    print("After:", arr)

    shm.close()
    shm.unlink()

7. So sánh các công cụ IPC

Công cụ IPC Tốt nhất cho Ghi chú
Queue Vấn đề nhà sản xuất-tiêu thụ FIFO, an toàn, đơn giản
Pipe Giao tiếp hai chiều Cấp thấp, cặp đôi
Manager Chia sẻ đối tượng Python Chậm hơn, dựa trên proxy
Value, Array Trạng thái chia sẻ số đơn giản Nhanh, cấp thấp
shared_memory Dữ liệu lớn (NumPy, ML) Không sao chép, hiệu quả

8. Những lưu ý quan trọng

8.1 Vấn đề serialisation

Khi dữ liệu được chia sẻ hoặc truyền giữa các tiến trình, Python không thể đơn giản chuyển giao các con trỏ bộ nhớ. Thay vào đó, nó phải:

  1. Serialise (pickle) đối tượng trong tiến trình gửi.
  2. Gửi chuỗi byte đó.
  3. Deserialise (unpickle) lại thành một đối tượng Python trong tiến trình nhận.

8.2 Sử dụng pickle

pickle là thư viện serialisation mặc định của Python. Nó hỗ trợ hầu hết các kiểu dữ liệu tích hợp.

8.3 Sử dụng cloudpickle

cloudpickle là thư viện serialisation mạnh mẽ hơn, có thể pickle lambdas và các hàm lồng nhau.

9. Kết luận

Giao tiếp giữa các tiến trình là một phần thiết yếu trong lập trình đa tiến trình. Việc hiểu rõ về các công cụ IPC trong Python sẽ giúp bạn tối ưu hóa hiệu suất và hiệu quả của ứng dụng. Hãy thử nghiệm với các ví dụ trên và áp dụng vào dự án của bạn.

Nếu bạn có bất kỳ câu hỏi nào, đừng ngần ngại liên hệ với chúng tôi để được hỗ trợ thêm!

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào