🔑 Chương 5: Người Tiêu Dùng Trong Hệ Thống Kafka
🔍 Vai Trò Của Consumer
Trong hệ thống Kafka, người tiêu dùng (consumer) có vai trò vô cùng quan trọng, bao gồm:
- Đọc dữ liệu từ các topic: Thu thập thông điệp từ các log phân tán trong Kafka.
- Cung cấp dữ liệu cho ứng dụng: Sử dụng cho các bảng điều khiển hoặc các công cụ phân tích dữ liệu.
- Lưu trữ dữ liệu vào hệ thống khác: Đảm bảo việc truy cập lâu dài hoặc xử lý thêm dữ liệu khi cần thiết.
⏱ Quản Lý Tốc Độ Tiêu Thụ
Một lợi thế nổi bật của consumer trong Kafka là khả năng kiểm soát tốc độ tiêu thụ dữ liệu. Điều này cho phép:
- Quyết định lượng dữ liệu cần lấy và thời điểm lấy dữ liệu.
- Thiết kế ứng dụng để xử lý tải động một cách hiệu quả, tránh tình trạng quá tải.
5.1.1. Tùy Chọn Consumer
- Sử dụng các deserializer phù hợp với khóa và giá trị (như
StringDeserializer
hoặcLongDeserializer
). - Đảm bảo cấu hình đầy đủ như
bootstrap.servers
,group.id
, và các tham số timeout như (heartbeat.interval.ms
).
- Xử Lý Dữ Liệu Từ Topic:
- Poll dữ liệu từ topic
kinaction_promos
. - Áp dụng công thức xử lý giá trị từ các sự kiện với magic number (vd: nhân 1.543).
- Poll dữ liệu từ topic
📋 Tổng Quan Về Cấu Hình Consumer (Bảng 5.1)
Khóa | Mô Tả |
---|---|
bootstrap.servers | Danh sách một hoặc nhiều Kafka broker để kết nối khi khởi động client. |
value.deserializer | Sử dụng để giải mã (deserialization) giá trị từ topic. |
key.deserializer | Sử dụng để giải mã khóa từ topic. |
group.id | Tên nhóm để tham gia vào consumer group. |
client.id | ID để định danh người dùng (chương 10 sẽ sử dụng). |
heartbeat.interval.ms | Khoảng thời gian giữa các lần consumer gửi tín hiệu (ping) đến group coordinator. |
🖥️ Mã Nguồn: Consumer Xử Lý Khuyến Mãi
java
public class KinactionStopConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean stopping = new AtomicBoolean(false);
...
public KinactionStopConsumer(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
public void run() {
try {
consumer.subscribe(List.of("kinaction_promos"));
while (!stopping.get()) { ❶
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(250));
...
}
} catch (WakeupException e) { ❷
if (!stopping.get()) throw e;
} finally {
consumer.close(); ❸
}
}
public void shutdown() { ❹
stopping.set(true);
consumer.wakeup();
}
}
❶ Biến stopping xác định xem có tiếp tục xử lý hay không.
❷ Móc thoát khi client ngừng hoạt động.
❸ Dừng client và thông báo cho broker về việc dừng.
❹ Gọi shutdown từ một thread khác để dừng client một cách chính xác.
5.1.2 Hiểu Về Offset Trong Kafka
🔢 Offset và Cách Hoạt Động
- Offset là chỉ mục xác định vị trí của thông điệp trong log.
- Offset luôn tăng dần và không tái sử dụng.
- Mỗi partition có chuỗi offset riêng, giảm thiểu nguy cơ vượt giới hạn kiểu dữ liệu.
📌 Cấu Hình auto.offset.reset
- Mặc định:
auto.offset.reset = latest
, chỉ nhận các thông điệp mới sau khi consumer khởi động. - Chế độ đọc từ đầu: Sử dụng flag
--from-beginning
để thiết lậpauto.offset.reset = earliest
, cho phép đọc toàn bộ dữ liệu, kể cả thông điệp cũ.
🌍 Ảnh Hưởng Của Số Lượng Partition
- Nhiều partition tăng khả năng xử lý song song nhưng đi kèm chi phí:
- Tăng độ trễ khi đồng bộ giữa các broker.
- Tốn tài nguyên bộ nhớ nếu consumer phải xử lý nhiều partition.
- Khuyến nghị: Lựa chọn số lượng partition phù hợp với luồng dữ liệu và yêu cầu ứng dụng.
5.2 Tương Tác Của Các Consumer
Consumer Group là nhóm gồm một hoặc nhiều consumer cùng nhau đọc dữ liệu từ một topic trong Kafka. Cùng group: Các consumer phối hợp làm việc như một hệ thống duy nhất. Khác group: Các nhóm hoạt động độc lập, phù hợp với các logic xử lý khác nhau.
5.3 Theo Dõi
Khác với các hệ thống như RabbitMQ, trong Kafka, các record sẽ vẫn được lưu lại sau khi consumer xác nhận. Kafka consumer sẽ poll dữ liệu từ Kafka thay vì nhận dữ liệu qua push như các hệ thống khác. Điều này giúp giải quyết các vấn đề phát sinh trong khi tiêu thụ thông điệp.
5.4 Đánh Dấu Chỗ Dừng Của Chúng Ta
Mã Nguồn: Waiting on a Commit
java
consumer.commitSync(); // ❶
java
public static void commitOffset(long offset, int partition, String topic, KafkaConsumer<String, String> consumer) {
OffsetAndMetadata offsetMeta = new OffsetAndMetadata(++offset, "");
Map<TopicPartition, OffsetAndMetadata> kaOffsetMap = new HashMap<>();
kaOffsetMap.put(new TopicPartition(topic, partition), offsetMeta);
consumer.commitAsync(kaOffsetMap, (map, e) -> { // ❶
if (e != null) {
for (TopicPartition key : map.keySet()) {
log.info("kinaction_error: offset {}", map.get(key).offset());
}
} else {
for (TopicPartition key : map.keySet()) {
log.info("kinaction_info: offset {}", map.get(key).offset());
}
}
});
}
5.5 Topic Compacted
Kafka tiến hành compact log partition trong một quy trình nền, loại bỏ các bản ghi với cùng một khóa, giữ lại duy nhất bản ghi mới nhất. Điều này hữu ích khi yêu cầu không cần lưu trữ lịch sử dữ liệu, chỉ cần trạng thái mới nhất.
5.6 Thời Gian Lập Trình
Mã Nguồn: Cấu Hình Offset Cũ Nhất
java
Properties kaProperties = new Properties();
kaProperties.put("group.id", UUID.randomUUID().toString()); // ❶
kaProperties.put("auto.offset.reset", "earliest"); // ❷
Mã Nguồn: Cấu Hình Offset Mới Nhất
java
Properties kaProperties = new Properties();
kaProperties.put("group.id", UUID.randomUUID().toString()); // ❶
kaProperties.put("auto.offset.reset", "latest"); // ❷
Mã Nguồn: Tìm Đến Offset Theo Timestamp
java
Map<TopicPartition, OffsetAndTimestamp> kaOffsetMap = consumer.offsetsForTimes(timeStampMapper); // ❶
consumer.seek(partitionOne, kaOffsetMap.get(partitionOne).offset()); // ❷
Mã Nguồn: Logic Consumer Kiểm Tra
java
kaProperties.put("enable.auto.commit", "false"); // ❶
...
Mã Nguồn: Consumer Cảnh Báo Xu Hướng
java
kaProperties.put("enable.auto.commit", "true"); // ❶
...
Mã Nguồn: Consumer Cảnh Báo
java
kaProperties.put("enable.auto.commit", "false");
...
source: viblo