Giao tiếp giữa các tiến trình (IPC) trong Python
Giao tiếp giữa các tiến trình (IPC) là một cơ chế cho phép các tiến trình độc lập trao đổi dữ liệu và phối hợp hành động của chúng, vì mỗi tiến trình có không gian bộ nhớ riêng. Trong Python, việc giao tiếp giữa các tiến trình được thực hiện qua các công cụ như Queue, Pipe, Manager, Value, Array và SharedMemory. Bài viết này sẽ giúp bạn hiểu rõ hơn về các công cụ này và cách sử dụng chúng hiệu quả trong lập trình đa tiến trình.
1. Tổng quan về IPC
Giao tiếp giữa các tiến trình là rất quan trọng trong lập trình đa tiến trình vì nó giúp các tiến trình khác nhau có thể tương tác với nhau mà không gây ra xung đột. Các phương pháp IPC phổ biến trong Python bao gồm:
- Queue: Dùng để trao đổi dữ liệu giữa các tiến trình một cách an toàn.
- Pipe: Cung cấp giao tiếp hai chiều giữa hai tiến trình.
- Manager: Cho phép chia sẻ các đối tượng Python giữa các tiến trình.
- Value và Array: Cung cấp khả năng chia sẻ các giá trị đơn giản và mảng trong bộ nhớ chia sẻ.
- SharedMemory: Cung cấp một khối bộ nhớ chia sẻ cho nhiều tiến trình mà không cần serial hóa.
2. multiprocessing.Queue
Trong mô hình đa tiến trình, Queue là một cách an toàn để các tiến trình trao đổi dữ liệu. Nó sử dụng các pipe và khóa để đảm bảo rằng nhiều tiến trình có thể thực hiện put() và get() mà không bị xung đột.
2.1 Cách hoạt động của Queue
- Khi một tiến trình thực hiện
put()vàoQueue, dữ liệu sẽ được pickle (chuỗi hóa), gửi qua một pipe và sau đó được unpickle (giải mã) ở phía nhận. Queuehoạt động giống nhưqueue.Queuetrong threading, nhưng được thiết kế cho các tiến trình.
2.2 Ví dụ sử dụng Queue
python
from multiprocessing import Process, Queue
import time
def producer(q):
for i in range(5):
print(f"Producing {i}")
q.put(i)
time.sleep(0.5)
q.put(None) # Sentinel: cho biết "chúng ta đã xong"
def consumer(q):
while True:
item = q.get() # chặn cho đến khi có dữ liệu
if item is None:
break
print(f"Consumed {item}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
2.3 Các hàm quan trọng
q.put(item): Thêm một mục vào hàng đợi.q.get(): Lấy và trả về một mục (chặn nếu hàng đợi rỗng).q.empty(): Trả vềTruenếu hàng đợi rỗng (không hoàn toàn đáng tin cậy).
3. multiprocessing.Pipe
Pipe là dạng giao tiếp giữa các tiến trình cơ bản nhất. Nó tương tự như một đường dây điện thoại hai chiều kết nối hai tiến trình. Pipe trả về hai đối tượng kết nối (conn1, conn2).
3.1 Cách hoạt động của Pipe
- Một đầu gửi (
send()), đầu còn lại nhận (recv()). - Không giống như Queue, Pipe là giao tiếp điểm-điểm (giữa hai tiến trình).
3.2 Ví dụ sử dụng Pipe
python
from multiprocessing import Process, Pipe
def worker(conn):
conn.send("Message from worker")
msg = conn.recv()
print("Worker got:", msg)
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
print("Parent got:", parent_conn.recv())
parent_conn.send("Ack from parent")
p.join()
4. multiprocessing.Manager
Manager cho phép các tiến trình chia sẻ các đối tượng Python (list, dict, Namespace, v.v.) một cách an toàn. Mặc dù nó chậm hơn so với Queue và Pipe vì nó sử dụng proxy và pickle.
4.1 Ví dụ sử dụng Manager
python
from multiprocessing import Process, Manager
def worker(shared_list, shared_dict):
shared_list.append("hello")
shared_dict["count"] = shared_dict.get("count", 0) + 1
if __name__ == "__main__":
with Manager() as manager:
shared_list = manager.list()
shared_dict = manager.dict()
processes = [Process(target=worker, args=(shared_list, shared_dict)) for _ in range(3)]
for p in processes: p.start()
for p in processes: p.join()
print("Final list:", list(shared_list))
print("Final dict:", dict(shared_dict))
5. multiprocessing.Value và multiprocessing.Array
Value tạo ra một biến scalar đơn (như int, double, char, v.v.) trong bộ nhớ chia sẻ mà nhiều tiến trình có thể truy cập và sửa đổi một cách an toàn. Array tạo ra một mảng kích thước cố định của các phần tử trong bộ nhớ chia sẻ.
5.1 Ví dụ sử dụng Value và Array
python
from multiprocessing import Process, Value, Array
def worker(num, arr):
num.value += 1
for i in range(len(arr)):
arr[i] *= -1
if __name__ == "__main__":
num = Value('i', 0)
arr = Array('i', [1, 2, 3])
p = Process(target=worker, args=(num, arr))
p.start(); p.join()
print("num:", num.value) # 1
print("arr:", arr[:]) # [-1, -2, -3]
6. multiprocessing.shared_memory (Python 3.8+)
multiprocessing.shared_memory cung cấp khối bộ nhớ chia sẻ sống ngoài bất kỳ tiến trình nào. Nó cung cấp quyền truy cập trực tiếp vào một khối bộ nhớ mà nhiều tiến trình có thể sử dụng mà không cần pickle, sao chép hoặc proxy. Điều này nhanh hơn nhiều so với Manager.
6.1 Ví dụ sử dụng shared_memory
python
from multiprocessing import shared_memory, Process
import numpy as np
def worker(name, shape):
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray(shape, dtype=np.int64, buffer=shm.buf)
arr *= 2 # gấp đôi tất cả giá trị
shm.close()
if __name__ == "__main__":
shm = shared_memory.SharedMemory(create=True, size=5 * 8)
arr = np.ndarray((5,), dtype=np.int64, buffer=shm.buf)
arr[:] = [1, 2, 3, 4, 5]
print("Before:", arr)
p = Process(target=worker, args=(shm.name, arr.shape))
p.start(); p.join()
print("After:", arr)
shm.close()
shm.unlink()
7. So sánh các công cụ IPC
| Công cụ IPC | Tốt nhất cho | Ghi chú |
|---|---|---|
Queue |
Vấn đề nhà sản xuất-tiêu thụ | FIFO, an toàn, đơn giản |
Pipe |
Giao tiếp hai chiều | Cấp thấp, cặp đôi |
Manager |
Chia sẻ đối tượng Python | Chậm hơn, dựa trên proxy |
Value, Array |
Trạng thái chia sẻ số đơn giản | Nhanh, cấp thấp |
shared_memory |
Dữ liệu lớn (NumPy, ML) | Không sao chép, hiệu quả |
8. Những lưu ý quan trọng
8.1 Vấn đề serialisation
Khi dữ liệu được chia sẻ hoặc truyền giữa các tiến trình, Python không thể đơn giản chuyển giao các con trỏ bộ nhớ. Thay vào đó, nó phải:
- Serialise (pickle) đối tượng trong tiến trình gửi.
- Gửi chuỗi byte đó.
- Deserialise (unpickle) lại thành một đối tượng Python trong tiến trình nhận.
8.2 Sử dụng pickle
pickle là thư viện serialisation mặc định của Python. Nó hỗ trợ hầu hết các kiểu dữ liệu tích hợp.
8.3 Sử dụng cloudpickle
cloudpickle là thư viện serialisation mạnh mẽ hơn, có thể pickle lambdas và các hàm lồng nhau.
9. Kết luận
Giao tiếp giữa các tiến trình là một phần thiết yếu trong lập trình đa tiến trình. Việc hiểu rõ về các công cụ IPC trong Python sẽ giúp bạn tối ưu hóa hiệu suất và hiệu quả của ứng dụng. Hãy thử nghiệm với các ví dụ trên và áp dụng vào dự án của bạn.
Nếu bạn có bất kỳ câu hỏi nào, đừng ngần ngại liên hệ với chúng tôi để được hỗ trợ thêm!