1. Consumer API: Vòng Lặp Poll Tương Tác với Kafka
Trái tim của Consumer API là một vòng lặp đơn giản nhưng quan trọng, được sử dụng để liên tục gọi đến máy chủ Kafka nhằm lấy dữ liệu. Như cá mập cần phải chuyển động liên tục để sinh tồn, consumer cũng cần thực hiện việc polling đến Kafka thường xuyên. Nếu không thực hiện, cơ chế của Kafka sẽ coi consumer đó là "đã chết", và các partition mà consumer đang đảm nhận sẽ được chuyển giao cho một consumer khác trong cùng nhóm để tiếp tục tiêu thụ.
Khi sử dụng phương thức poll(), chúng ta truyền vào một tham số chỉ định khoảng thời gian chờ, kiểm soát thời gian mà poll() sẽ bị chặn nếu không có dữ liệu sẵn có trong bộ đệm của consumer. Nếu giá trị này được thiết lập là 0 hoặc đã có dữ liệu có sẵn, poll() sẽ trả về ngay lập tức. Nếu không, nó sẽ chờ trong thời gian được chỉ định.
Vòng lặp poll() không chỉ đơn thuần là thao tác lấy dữ liệu từ Kafka. Trong lần gọi poll() đầu tiên với một consumer mới, nó sẽ tìm kiếm GroupCoordinator, tham gia vào consumer group và nhận các partition được gán. Trong trường hợp có một rebalance xảy ra, nó sẽ được xử lý trong vòng lặp poll(), bao gồm cả các callback liên quan. Điều này có nghĩa là hầu hết mọi vấn đề xảy ra với consumer hoặc trong các callback được sử dụng trong listeners đều có thể xuất hiện dưới dạng ngoại lệ từ poll().
Lưu ý rằng nếu poll() không được gọi trong khoảng thời gian lâu hơn max.poll.interval.ms
, consumer sẽ được xem là đã chết và sẽ bị loại khỏi consumer group. Do đó, bạn cần tránh thực hiện các hoạt động có thể gây chậm trễ kéo dài trong vòng lặp poll().
2. Thread Safety Trong Kafka
Khi triển khai consumer trong Kafka, có một quy tắc quan trọng cần ghi nhớ: Không thể có nhiều consumer thuộc cùng một consumer group trong cùng một thread và cũng không thể có nhiều thread sử dụng cùng một consumer một cách an toàn. Quy tắc này yêu cầu mỗi thread cần phải có một consumer riêng biệt. Để chạy nhiều consumer trong cùng một consumer group trong một ứng dụng, bạn cần tạo một thread riêng cho từng consumer. Cách làm này trong Java thường là gói gọn logic của consumer trong một đối tượng riêng lẻ, và sau đó sử dụng ExecutorService để khởi động nhiều threads.
3. Kết Luận Về Việc Sử Dụng Phương Thức Poll
Trong các phiên bản trước của Kafka, chữ ký đầy đủ của phương thức poll là poll(long); tuy nhiên, chữ ký này đã được ngừng sử dụng và hiện tại, API mới sử dụng poll(Duration). Sự thay đổi này không chỉ là về loại tham số mà còn làm thay đổi hành vi block của phương thức. Phương thức cũ poll(long) sẽ chặn cho đến khi nhận được metadata cần thiết từ Kafka, bất kể thời gian chờ có thể kéo dài hơn thời gian timeout. Trong khi đó, phương thức mới poll(Duration) sẽ tuân thủ giới hạn thời gian chờ và không chờ để nhận metadata.
Nếu mã nguồn consumer hiện tại sử dụng poll(0) để buộc Kafka lấy metadata mà không tiêu thụ bất kỳ bản ghi nào (một mẹo khá phổ biến), việc chỉ đơn giản là thay đổi sang poll(Duration.ofMillis(0)) sẽ không dẫn đến hành vi tương tự. Bạn sẽ cần tìm một cách tiếp cận mới để đạt được mục tiêu. Giải pháp thường được khuyến nghị là đặt logic trong phương thức rebalanceListener.onPartitionAssignment(), đảm bảo rằng phương thức này sẽ được gọi sau khi nhận được metadata cho các partition đã được gán nhưng trước khi nhận các message đầu tiên.
4. Thông Tin Kết Nối và Trao Đổi Ý Kiến
Nếu bạn muốn thảo luận thêm về bài viết này, đừng ngần ngại kết nối với mình qua LinkedIn và Facebook:
- LinkedIn: Nguyễn Trung Nam
- Facebook: Trung Nam Nguyễn
Rất mong được kết nối và cùng nhau trao đổi thông tin!
source: viblo