🔄 Mô Hình WaitGroup Giống Go trong Xử Lý OCR Không Đồng Bộ
📍 Vị Trí Triển Khai WaitGroup
Mô hình WaitGroup giống Go được triển khai trong:
- Tệp:
src/core/async_ocr_integration.py - Lớp:
AsyncOCRWaitGroup(dòng 28-84) - Cách sử dụng: Phương thức
_process_concurrent_operations()
🔄 Cách Hoạt Động của WaitGroup (Giống sync.WaitGroup của Go)
1. Khởi Tạo WaitGroup
python
# Dòng 189-190: Tạo WaitGroup để phối hợp các thao tác không đồng bộ
wait_group = AsyncOCRWaitGroup()
2. Thêm Thao Tác vào WaitGroup
python
# Dòng 330: Thêm N thao tác vào WaitGroup (giống như wg.Add(n) của Go)
await wait_group.add(len(all_operations))
Kết quả: 🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3)
3. Bắt Đầu Thao Tác Đồng Thời
python
# Dòng 332-340: Tạo các tác vụ không đồng bộ cho mỗi phát hiện
tasks = []
for i, operation in enumerate(all_operations):
task = asyncio.create_task(
self._process_single_operation_safe(operation, wait_group, updated_detections, i)
)
tasks.append(task)
4. Mỗi Thao Tác Thông Báo Hoàn Thành
python
# Dòng 472-473: Mỗi thao tác gọi done() khi hoàn thành (giống như wg.Done() của Go)
finally:
await wait_group.done() # Giảm đếm xuống 1
Kết quả: ✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 2
Kết quả: ✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 1
Kết quả: ✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 0
5. Chờ Tất Cả Thao Tác Hoàn Thành
python
# Dòng 199: Chờ tất cả thao tác hoàn thành (giống như wg.Wait() của Go)
await wait_group.wait(timeout=self.operation_timeout)
Kết quả: 🎉 [AsyncOCRWaitGroup] Tất cả 3 thao tác đã hoàn thành
🎯 Sơ Đồ Quy Trình Chi Tiết
plaintext
┌─────────────────────────────────────────────────────────────┐
│ QUY TRÌNH WAITGROUP ASYNC OCR │
└─────────────────────────────────────────────────────────────┘
1. KHỞI TẠO
┌──────────────────┐
│ Nhiều │
│ nhãn thực phẩm │──┐
│ phát hiện tìm thấy│ │
└──────────────────┘ │ ┌─────────────────────┐
├────▶│ Tạo WaitGroup │
┌──────────────────┐ │ │ wait_group = new() │
│ Phát hiện QR │ │ └─────────────────────┘
│ tìm thấy │──┘ │
└──────────────────┘ │
▼
2. THÊM THAO TÁC VÀO WAITGROUP
┌─────────────────────────────────────────────────────┐
│ await wait_group.add(3) # 3 phát hiện cần xử lý │
│ 🔢 Counter: 3 (delta: +3) │
└─────────────────────────────────────────────────────┘
│
▼
3. BẮT ĐẦU CÁC TÁC VỤ ĐỒNG THỜI
┌─────────────────────────────────────────────────────────┐
│ THỰC THI ĐỒNG THỜI │
│ │
│ Tác vụ 1: nhãn thực phẩm Tác vụ 2: nhãn thực phẩm Tác vụ 3: QR │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────┐ │
│ │ Xử lý OCR │ │ Xử lý OCR │ │ QR Proc │ │
│ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────┐ │ │
│ │ │ gRPC │ │ │ │ gRPC │ │ │ │gRPC │ │ │
│ │ │ Dịch vụ OCR │ │ │ │ Dịch vụ OCR │ │ │ │ QR │ │ │
│ │ └─────────────┘ │ │ └─────────────┘ │ │ └─────┘ │ │
│ └─────────────────┘ └─────────────────┘ └─────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────┐ │
│ │ await wg.done() │ │ await wg.done() │ │ await │ │
│ │ Counter: 2 │ │ Counter: 1 │ │ wg.done()│ │
│ └─────────────────┘ └─────────────────┘ │Counter:0│ │
│ └─────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
4. ĐIỀU PHỐI WAITGROUP
┌─────────────────────────────────────────────────────────┐
│ ĐIỀU PHỐI WAITGROUP │
│ │
│ ✅ Thao tác 1 hoàn thành → Counter: 2 │
│ ✅ Thao tác 2 hoàn thành → Counter: 1 │
│ ✅ Thao tác 3 hoàn thành → Counter: 0 │
│ │
│ Khi Counter = 0: │
│ 🎉 Tất cả 3 thao tác đã hoàn thành! │
│ 📤 Giải phóng await wait_group.wait() │
└─────────────────────────────────────────────────────────┘
│
▼
5. HOÀN THÀNH
┌─────────────────────────────────────────────────────────┐
│ ✨ Đã xử lý thành công 3 thao tác đồng thời │
│ 📊 Trả về các phát hiện đã cập nhật với kết quả OCR │
│ ⏱️ Thời gian tổng: ~200ms (thay vì 600ms tuần tự) │
└─────────────────────────────────────────────────────────┘
🔍 Giải Thích Các Thành Phần Chính
Lớp AsyncOCRWaitGroup (dòng 28-84)
python
class AsyncOCRWaitGroup:
def __init__(self):
self._counter = 0 # Số lượng thao tác còn lại
self._event = asyncio.Event() # Sự kiện để thông báo hoàn thành
self._lock = asyncio.Lock() # Cập nhật đếm an toàn với luồng
async def add(self, delta: int = 1):
# Giống như wg.Add(n) của Go - thêm n thao tác để chờ
self._counter += delta
print(f"🔢 Counter: {self._counter} (delta: {delta})")
async def done(self):
# Giống như wg.Done() của Go - thông báo một thao tác đã hoàn thành
self._counter -= 1
print(f"✅ Thao tác hoàn thành, counter: {self._counter}")
if self._counter == 0:
self._event.set() # Đánh thức wait()
async def wait(self, timeout=None):
# Giống như wg.Wait() của Go - chặn cho đến khi counter = 0
await self._event.wait()
print(f"🎉 Tất cả {self._initial_counter} thao tác đã hoàn thành")
Nơi Bắt Đầu Các Thao Tác (dòng 330)
python
# Thêm tất cả thao tác vào WaitGroup
await wait_group.add(len(all_operations)) # ví dụ: add(3)
Nơi Mỗi Thao Tác Kết Thúc (dòng 472-473)
python
# Trong _process_single_operation_async
finally:
# Luôn đánh dấu thao tác là hoàn thành trong WaitGroup
await wait_group.done() # Giảm đếm
Nơi Chúng Ta Chờ Tất Cả (dòng 199)
python
# Chờ tất cả thao tác hoàn thành với thời gian chờ
await wait_group.wait(timeout=self.operation_timeout)
📊 Ví Dụ Nhật Ký Thực Tế
Khi bạn có 3 phát hiện nhãn thực phẩm:
plaintext
🚀 [AsyncOCR] Đang xử lý 3 phát hiện đồng thời
🚀 [AsyncOCR] Bắt đầu 3 thao tác đồng thời
🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3) ← WaitGroup.Add(3)
[Các cuộc gọi gRPC OCR đồng thời diễn ra]
✅ [AsyncOCR] Hoàn thành thao tác nhãn thực phẩm cho phát hiện 0
✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 2 ← WaitGroup.Done()
✅ [AsyncOCR] Hoàn thành thao tác nhãn thực phẩm cho phát hiện 1
✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 1 ← WaitGroup.Done()
✅ [AsyncOCR] Hoàn thành thao tác nhãn thực phẩm cho phát hiện 2
✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 0 ← WaitGroup.Done()
🎉 [AsyncOCRWaitGroup] Tất cả 3 thao tác đã hoàn thành ← WaitGroup.Wait() giải phóng
✅ [AsyncOCR] Hoàn thành 3 thao tác trong 221.9ms
✨ [AsyncOCR] Đã xử lý thành công 2 thao tác đồng thời
🎯 So Sánh Go và Python
Phiên Bản Go
go
var wg sync.WaitGroup
wg.Add(3) // 3 thao tác
// Bắt đầu goroutines
for _, detection := range detections {
go func(d Detection) {
defer wg.Done() // Thông báo hoàn thành
processOCR(d)
}(detection)
}
wg.Wait() // Chờ tất cả hoàn thành
Phiên Bản Python (Triển Khai của Chúng Tôi)
python
wait_group = AsyncOCRWaitGroup()
await wait_group.add(3) # 3 thao tác
# Bắt đầu các tác vụ không đồng bộ
tasks = []
for detection in detections:
task = asyncio.create_task(process_ocr_async(detection, wait_group))
tasks.append(task)
await wait_group.wait() # Chờ tất cả hoàn thành
# Trong mỗi process_ocr_async:
try:
# Xử lý OCR
result = await ocr_service.process(detection)
finally:
await wait_group.done() # Thông báo hoàn thành
Mô hình này là giống hệt nhau - chúng ta chờ đợi một bộ đếm đạt đến 0 khi mỗi thao tác đồng thời thông báo hoàn thành! 🎉