0
0
Lập trình
Sơn Tùng Lê
Sơn Tùng Lê103931498422911686980

Mô Hình WaitGroup Giống Go trong Xử Lý OCR Không Đồng Bộ

Đăng vào 8 tháng trước

• 11 phút đọc

Chủ đề:

KungFuTech

🔄 Mô Hình WaitGroup Giống Go trong Xử Lý OCR Không Đồng Bộ

📍 Vị Trí Triển Khai WaitGroup

Mô hình WaitGroup giống Go được triển khai trong:

  • Tệp: src/core/async_ocr_integration.py
  • Lớp: AsyncOCRWaitGroup (dòng 28-84)
  • Cách sử dụng: Phương thức _process_concurrent_operations()

🔄 Cách Hoạt Động của WaitGroup (Giống sync.WaitGroup của Go)

1. Khởi Tạo WaitGroup

python Copy
# Dòng 189-190: Tạo WaitGroup để phối hợp các thao tác không đồng bộ
wait_group = AsyncOCRWaitGroup()

2. Thêm Thao Tác vào WaitGroup

python Copy
# Dòng 330: Thêm N thao tác vào WaitGroup (giống như wg.Add(n) của Go)
await wait_group.add(len(all_operations))

Kết quả: 🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3)

3. Bắt Đầu Thao Tác Đồng Thời

python Copy
# Dòng 332-340: Tạo các tác vụ không đồng bộ cho mỗi phát hiện
tasks = []
for i, operation in enumerate(all_operations):
    task = asyncio.create_task(
        self._process_single_operation_safe(operation, wait_group, updated_detections, i)
    )
    tasks.append(task)

4. Mỗi Thao Tác Thông Báo Hoàn Thành

python Copy
# Dòng 472-473: Mỗi thao tác gọi done() khi hoàn thành (giống như wg.Done() của Go)
finally:
    await wait_group.done()  # Giảm đếm xuống 1

Kết quả: ✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 2
Kết quả: ✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 1
Kết quả: ✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 0

5. Chờ Tất Cả Thao Tác Hoàn Thành

python Copy
# Dòng 199: Chờ tất cả thao tác hoàn thành (giống như wg.Wait() của Go)
await wait_group.wait(timeout=self.operation_timeout)

Kết quả: 🎉 [AsyncOCRWaitGroup] Tất cả 3 thao tác đã hoàn thành


🎯 Sơ Đồ Quy Trình Chi Tiết

plaintext Copy
┌─────────────────────────────────────────────────────────────┐
│                   QUY TRÌNH WAITGROUP ASYNC OCR           │
└─────────────────────────────────────────────────────────────┘

1. KHỞI TẠO
   ┌──────────────────┐
   │ Nhiều           │
   │ nhãn thực phẩm   │──┐
   │ phát hiện tìm thấy│  │
   └──────────────────┘  │     ┌─────────────────────┐
                         ├────▶│ Tạo WaitGroup       │
   ┌──────────────────┐  │     │ wait_group = new()  │
   │ Phát hiện QR     │  │     └─────────────────────┘
   │ tìm thấy         │──┘              │
   └──────────────────┘                 │
                                        ▼

2. THÊM THAO TÁC VÀO WAITGROUP
   ┌─────────────────────────────────────────────────────┐
   │ await wait_group.add(3)  # 3 phát hiện cần xử lý    │
   │ 🔢 Counter: 3 (delta: +3)                          │
   └─────────────────────────────────────────────────────┘
                               │
                               ▼

3. BẮT ĐẦU CÁC TÁC VỤ ĐỒNG THỜI
   ┌─────────────────────────────────────────────────────────┐
   │                  THỰC THI ĐỒNG THỜI                    │
   │                                                         │
   │ Tác vụ 1: nhãn thực phẩm   Tác vụ 2: nhãn thực phẩm   Tác vụ 3: QR    │
   │ ┌─────────────────┐  ┌─────────────────┐  ┌─────────┐  │
   │ │ Xử lý OCR      │  │ Xử lý OCR      │  │ QR Proc │  │
   │ │ ┌─────────────┐ │  │ ┌─────────────┐ │  │ ┌─────┐ │  │
   │ │ │   gRPC      │ │  │ │   gRPC      │ │  │ │gRPC │ │  │
   │ │ │ Dịch vụ OCR │ │  │ │ Dịch vụ OCR │ │  │ │ QR  │ │  │
   │ │ └─────────────┘ │  │ └─────────────┘ │  │ └─────┘ │  │
   │ └─────────────────┘  └─────────────────┘  └─────────┘  │
   │         │                    │                 │        │
   │         ▼                    ▼                 ▼        │
   │ ┌─────────────────┐  ┌─────────────────┐  ┌─────────┐  │
   │ │ await wg.done() │  │ await wg.done() │  │ await    │  │
   │ │ Counter: 2      │  │ Counter: 1      │  │ wg.done()│  │
   │ └─────────────────┘  └─────────────────┘  │Counter:0│  │
   │                                           └─────────┘  │
   └─────────────────────────────────────────────────────────┘
                               │
                               ▼

4. ĐIỀU PHỐI WAITGROUP
   ┌─────────────────────────────────────────────────────────┐
   │                ĐIỀU PHỐI WAITGROUP                       │
   │                                                         │
   │ ✅ Thao tác 1 hoàn thành → Counter: 2                    │
   │ ✅ Thao tác 2 hoàn thành → Counter: 1                    │
   │ ✅ Thao tác 3 hoàn thành → Counter: 0                    │
   │                                                         │
   │ Khi Counter = 0:                                       │
   │ 🎉 Tất cả 3 thao tác đã hoàn thành!                    │
   │ 📤 Giải phóng await wait_group.wait()                  │
   └─────────────────────────────────────────────────────────┘
                               │
                               ▼

5. HOÀN THÀNH
   ┌─────────────────────────────────────────────────────────┐
   │ ✨ Đã xử lý thành công 3 thao tác đồng thời           │
   │ 📊 Trả về các phát hiện đã cập nhật với kết quả OCR    │
   │ ⏱️ Thời gian tổng: ~200ms (thay vì 600ms tuần tự)      │
   └─────────────────────────────────────────────────────────┘

🔍 Giải Thích Các Thành Phần Chính

Lớp AsyncOCRWaitGroup (dòng 28-84)

python Copy
class AsyncOCRWaitGroup:
    def __init__(self):
        self._counter = 0           # Số lượng thao tác còn lại
        self._event = asyncio.Event()  # Sự kiện để thông báo hoàn thành
        self._lock = asyncio.Lock()    # Cập nhật đếm an toàn với luồng

    async def add(self, delta: int = 1):
        # Giống như wg.Add(n) của Go - thêm n thao tác để chờ
        self._counter += delta
        print(f"🔢 Counter: {self._counter} (delta: {delta})")

    async def done(self):
        # Giống như wg.Done() của Go - thông báo một thao tác đã hoàn thành
        self._counter -= 1
        print(f"✅ Thao tác hoàn thành, counter: {self._counter}")
        if self._counter == 0:
            self._event.set()  # Đánh thức wait()

    async def wait(self, timeout=None):
        # Giống như wg.Wait() của Go - chặn cho đến khi counter = 0
        await self._event.wait()
        print(f"🎉 Tất cả {self._initial_counter} thao tác đã hoàn thành")

Nơi Bắt Đầu Các Thao Tác (dòng 330)

python Copy
# Thêm tất cả thao tác vào WaitGroup
await wait_group.add(len(all_operations))  # ví dụ: add(3)

Nơi Mỗi Thao Tác Kết Thúc (dòng 472-473)

python Copy
# Trong _process_single_operation_async
finally:
    # Luôn đánh dấu thao tác là hoàn thành trong WaitGroup
    await wait_group.done()  # Giảm đếm

Nơi Chúng Ta Chờ Tất Cả (dòng 199)

python Copy
# Chờ tất cả thao tác hoàn thành với thời gian chờ
await wait_group.wait(timeout=self.operation_timeout)

📊 Ví Dụ Nhật Ký Thực Tế

Khi bạn có 3 phát hiện nhãn thực phẩm:

plaintext Copy
🚀 [AsyncOCR] Đang xử lý 3 phát hiện đồng thời
🚀 [AsyncOCR] Bắt đầu 3 thao tác đồng thời
🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3)           ← WaitGroup.Add(3)

[Các cuộc gọi gRPC OCR đồng thời diễn ra]

✅ [AsyncOCR] Hoàn thành thao tác nhãn thực phẩm cho phát hiện 0
✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 2  ← WaitGroup.Done()

✅ [AsyncOCR] Hoàn thành thao tác nhãn thực phẩm cho phát hiện 1  
✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 1  ← WaitGroup.Done()

✅ [AsyncOCR] Hoàn thành thao tác nhãn thực phẩm cho phát hiện 2
✅ [AsyncOCRWaitGroup] Thao tác hoàn thành, counter: 0  ← WaitGroup.Done()

🎉 [AsyncOCRWaitGroup] Tất cả 3 thao tác đã hoàn thành      ← WaitGroup.Wait() giải phóng
✅ [AsyncOCR] Hoàn thành 3 thao tác trong 221.9ms
✨ [AsyncOCR] Đã xử lý thành công 2 thao tác đồng thời

🎯 So Sánh Go và Python

Phiên Bản Go

go Copy
var wg sync.WaitGroup
wg.Add(3)  // 3 thao tác

// Bắt đầu goroutines
for _, detection := range detections {
    go func(d Detection) {
        defer wg.Done()  // Thông báo hoàn thành
        processOCR(d)
    }(detection)
}

wg.Wait()  // Chờ tất cả hoàn thành

Phiên Bản Python (Triển Khai của Chúng Tôi)

python Copy
wait_group = AsyncOCRWaitGroup()
await wait_group.add(3)  # 3 thao tác

# Bắt đầu các tác vụ không đồng bộ
tasks = []
for detection in detections:
    task = asyncio.create_task(process_ocr_async(detection, wait_group))
    tasks.append(task)

await wait_group.wait()  # Chờ tất cả hoàn thành

# Trong mỗi process_ocr_async:
try:
    # Xử lý OCR
    result = await ocr_service.process(detection)
finally:
    await wait_group.done()  # Thông báo hoàn thành

Mô hình này là giống hệt nhau - chúng ta chờ đợi một bộ đếm đạt đến 0 khi mỗi thao tác đồng thời thông báo hoàn thành! 🎉

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào