0
0
Lập trình
Sơn Tùng Lê
Sơn Tùng Lê103931498422911686980

Chương 5: Người tiêu dùng trong hệ thống Kafka - Khám phá và ứng dụng

Đăng vào 1 ngày trước

• 6 phút đọc

🔑 Chương 5: Người Tiêu Dùng Trong Hệ Thống Kafka

🔍 Vai Trò Của Consumer

Trong hệ thống Kafka, người tiêu dùng (consumer) có vai trò vô cùng quan trọng, bao gồm:

  • Đọc dữ liệu từ các topic: Thu thập thông điệp từ các log phân tán trong Kafka.
  • Cung cấp dữ liệu cho ứng dụng: Sử dụng cho các bảng điều khiển hoặc các công cụ phân tích dữ liệu.
  • Lưu trữ dữ liệu vào hệ thống khác: Đảm bảo việc truy cập lâu dài hoặc xử lý thêm dữ liệu khi cần thiết.

Quản Lý Tốc Độ Tiêu Thụ

Một lợi thế nổi bật của consumer trong Kafka là khả năng kiểm soát tốc độ tiêu thụ dữ liệu. Điều này cho phép:

  • Quyết định lượng dữ liệu cần lấy và thời điểm lấy dữ liệu.
  • Thiết kế ứng dụng để xử lý tải động một cách hiệu quả, tránh tình trạng quá tải.

5.1.1. Tùy Chọn Consumer

  • Sử dụng các deserializer phù hợp với khóa và giá trị (như StringDeserializer hoặc LongDeserializer).
  • Đảm bảo cấu hình đầy đủ như bootstrap.servers, group.id, và các tham số timeout như (heartbeat.interval.ms).
  1. Xử Lý Dữ Liệu Từ Topic:
    • Poll dữ liệu từ topic kinaction_promos.
    • Áp dụng công thức xử lý giá trị từ các sự kiện với magic number (vd: nhân 1.543).

📋 Tổng Quan Về Cấu Hình Consumer (Bảng 5.1)

Khóa Mô Tả
bootstrap.servers Danh sách một hoặc nhiều Kafka broker để kết nối khi khởi động client.
value.deserializer Sử dụng để giải mã (deserialization) giá trị từ topic.
key.deserializer Sử dụng để giải mã khóa từ topic.
group.id Tên nhóm để tham gia vào consumer group.
client.id ID để định danh người dùng (chương 10 sẽ sử dụng).
heartbeat.interval.ms Khoảng thời gian giữa các lần consumer gửi tín hiệu (ping) đến group coordinator.

🖥️ Mã Nguồn: Consumer Xử Lý Khuyến Mãi

java Copy
public class KinactionStopConsumer implements Runnable {
     private final KafkaConsumer<String, String> consumer;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     ...
    public KinactionStopConsumer(KafkaConsumer<String, String> consumer) {
      this.consumer = consumer;
    }
    public void run() {
         try {
             consumer.subscribe(List.of("kinaction_promos"));
             while (!stopping.get()) {                         ❶
                 ConsumerRecords<String, String> records =
                   consumer.poll(Duration.ofMillis(250));
                 ...
             }
         } catch (WakeupException e) {                         ❷
             if (!stopping.get()) throw e;
         } finally {
             consumer.close();                                 ❸
         }
     }
     public void shutdown() {                                  ❹
         stopping.set(true);
         consumer.wakeup();
     }
}
  ❶ Biến stopping xác định xem có tiếp tục xử lý hay không.

  ❷ Móc thoát khi client ngừng hoạt động.

  ❸ Dừng client và thông báo cho broker về việc dừng.

  ❹ Gọi shutdown từ một thread khác để dừng client một cách chính xác.

5.1.2 Hiểu Về Offset Trong Kafka

🔢 Offset và Cách Hoạt Động

  • Offset là chỉ mục xác định vị trí của thông điệp trong log.
  • Offset luôn tăng dần và không tái sử dụng.
  • Mỗi partition có chuỗi offset riêng, giảm thiểu nguy cơ vượt giới hạn kiểu dữ liệu.

📌 Cấu Hình auto.offset.reset

  • Mặc định: auto.offset.reset = latest, chỉ nhận các thông điệp mới sau khi consumer khởi động.
  • Chế độ đọc từ đầu: Sử dụng flag --from-beginning để thiết lập auto.offset.reset = earliest, cho phép đọc toàn bộ dữ liệu, kể cả thông điệp cũ.

🌍 Ảnh Hưởng Của Số Lượng Partition

  • Nhiều partition tăng khả năng xử lý song song nhưng đi kèm chi phí:
    • Tăng độ trễ khi đồng bộ giữa các broker.
    • Tốn tài nguyên bộ nhớ nếu consumer phải xử lý nhiều partition.
  • Khuyến nghị: Lựa chọn số lượng partition phù hợp với luồng dữ liệu và yêu cầu ứng dụng.

5.2 Tương Tác Của Các Consumer

Consumer Group là nhóm gồm một hoặc nhiều consumer cùng nhau đọc dữ liệu từ một topic trong Kafka. Cùng group: Các consumer phối hợp làm việc như một hệ thống duy nhất. Khác group: Các nhóm hoạt động độc lập, phù hợp với các logic xử lý khác nhau.

5.3 Theo Dõi

Khác với các hệ thống như RabbitMQ, trong Kafka, các record sẽ vẫn được lưu lại sau khi consumer xác nhận. Kafka consumer sẽ poll dữ liệu từ Kafka thay vì nhận dữ liệu qua push như các hệ thống khác. Điều này giúp giải quyết các vấn đề phát sinh trong khi tiêu thụ thông điệp.

5.4 Đánh Dấu Chỗ Dừng Của Chúng Ta

Mã Nguồn: Waiting on a Commit

java Copy
consumer.commitSync(); // ❶
java Copy
public static void commitOffset(long offset, int partition, String topic, KafkaConsumer<String, String> consumer) {
  OffsetAndMetadata offsetMeta = new OffsetAndMetadata(++offset, "");
  Map<TopicPartition, OffsetAndMetadata> kaOffsetMap = new HashMap<>();
  kaOffsetMap.put(new TopicPartition(topic, partition), offsetMeta);
  consumer.commitAsync(kaOffsetMap, (map, e) -> { // ❶
    if (e != null) {
      for (TopicPartition key : map.keySet()) {
        log.info("kinaction_error: offset {}", map.get(key).offset());
      }
    } else {
      for (TopicPartition key : map.keySet()) {
        log.info("kinaction_info: offset {}", map.get(key).offset());
      }
    }
  });
}

5.5 Topic Compacted

Kafka tiến hành compact log partition trong một quy trình nền, loại bỏ các bản ghi với cùng một khóa, giữ lại duy nhất bản ghi mới nhất. Điều này hữu ích khi yêu cầu không cần lưu trữ lịch sử dữ liệu, chỉ cần trạng thái mới nhất.

5.6 Thời Gian Lập Trình

Mã Nguồn: Cấu Hình Offset Cũ Nhất

java Copy
Properties kaProperties = new Properties();
kaProperties.put("group.id", UUID.randomUUID().toString()); // ❶
kaProperties.put("auto.offset.reset", "earliest"); // ❷

Mã Nguồn: Cấu Hình Offset Mới Nhất

java Copy
Properties kaProperties = new Properties();
kaProperties.put("group.id", UUID.randomUUID().toString()); // ❶
kaProperties.put("auto.offset.reset", "latest"); // ❷

Mã Nguồn: Tìm Đến Offset Theo Timestamp

java Copy
Map<TopicPartition, OffsetAndTimestamp> kaOffsetMap = consumer.offsetsForTimes(timeStampMapper); // ❶
consumer.seek(partitionOne, kaOffsetMap.get(partitionOne).offset()); // ❷

Mã Nguồn: Logic Consumer Kiểm Tra

java Copy
kaProperties.put("enable.auto.commit", "false"); // ❶
...

Mã Nguồn: Consumer Cảnh Báo Xu Hướng

java Copy
kaProperties.put("enable.auto.commit", "true"); // ❶
...

Mã Nguồn: Consumer Cảnh Báo

java Copy
kaProperties.put("enable.auto.commit", "false");
...

source: viblo

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào