0
0
Lập trình
Harry Tran
Harry Tran106580903228332612117

Hướng Dẫn Cấu Hình Change Data Capture Connector Với Kafka

Đăng vào 2 ngày trước

• 4 phút đọc

1. Giới Thiệu

Change Data Capture (CDC) là một quy trình quan trọng trong quản lý cơ sở dữ liệu, cho phép ghi nhận và theo dõi các thay đổi dữ liệu trong thời gian thực bằng cách sử dụng log hoặc stream dữ liệu. Điều này mang lại cho các ứng dụng và hệ thống khả năng tiêu thụ và xử lý các thay đổi mới nhất trong dữ liệu.

Trong quá khứ, việc đồng bộ hóa dữ liệu giữa các cơ sở dữ liệu như MySQL, PostgreSQL hay MongoDB chỉ cần thực hiện việc xuất dữ liệu từ cơ sở dữ liệu này và nhập vào cơ sở dữ liệu khác. Tuy nhiên, phương pháp này yêu cầu thời gian dừng máy chủ (downtime) và có thể gây ra sự gián đoạn trong dịch vụ, đặc biệt khi khối lượng dữ liệu lớn, dễ dẫn đến treo máy chủ. Hơn nữa, việc đồng bộ giữa các loại cơ sở dữ liệu khác nhau cũng gặp rất nhiều khó khăn, chẳng hạn như đồng bộ từ MySQL sang Elasticsearch, PostgreSQL hay MongoDB.

Chính vì vậy, CDC đã ra đời như một giải pháp cho những vấn đề này. Một trong những công cụ CDC phổ biến hiện nay là Debezium, được sử dụng rộng rãi bởi nhiều công ty lớn, trong đó có TIKI. Debezium hoạt động kết hợp với Kafka, nơi mà các topic và partition của Kafka lưu trữ các message, và các service sử dụng worker để tiêu thụ các topic và xử lý nghiệp vụ.

Mô hình hoạt động của Debezium như sau: nó ghi nhận sự thay đổi trong cơ sở dữ liệu, gửi các message thay đổi vào các topic trong Kafka, các sink connector sẽ vào role tiêu thụ message và đồng bộ hóa dữ liệu sang các hệ thống thứ ba. Sink connector là một component trong hệ sinh thái Apache Kafka, chịu trách nhiệm tiêu thụ và xử lý message từ các topic.

Tuy nhiên, để hiểu rõ hơn về cách thiết lập CDC với Kafka, bạn chỉ cần chú ý đến việc đẩy message vào các topic trong Kafka và tự viết một worker để tiêu thụ các topic đó.

2. Cấu Hình

Dưới đây là một ví dụ về file Docker Compose để tạo các container cho Kafka, Kafka UI, MySQL, và Debezium Connect:

yaml Copy
version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    environment:
     - KAFKA_CLUSTERS_0_NAME=local-kafka
     - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
     - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
     - MYSQL_DATABASE=cdc-db
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

Sau khi tạo file xong, bạn có thể chuyển đến thư mục chứa file và chạy lệnh:

Copy
sudo DEBEZIUM_VERSION=1.9 docker-compose up -d

Sau khi thực thi lệnh trên, bạn sẽ thấy các container đang chạy. Tiếp theo, bạn có thể sử dụng lệnh CURL dưới đây để tạo một connector cho Debezium:

Copy
curl --location 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cdc-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "debezium",
        "tasks.max": "1",
        "database.server.name": "server_name",
        "database.whitelist": "cdc-db",
        "table.include.list": "cdc-db.users",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "history.users"
    }
}'

Thông qua CURL, bạn có thể cấu hình kết nối tới cơ sở dữ liệu MySQL và xác định danh sách các cơ sở dữ liệu và bảng cần thiết phải ghi nhận. Trong ví dụ này, chúng ta sẽ ghi nhận cơ sở dữ liệu cdc-db và bảng users, các message sẽ được gửi vào topic server_name.cdc-db.users.

Sau khi cấu hình xong, bạn chỉ cần insert một bản ghi mới vào bảng users và theo dõi các message được ghi nhận trong topic Kafka:

Copy
INSERT INTO `users` (`name`) VALUES
('Tuan test cdc');

Khi thực thi câu lệnh trên, bạn sẽ nhận được một message trong payload với dữ liệu trước và sau khi thay đổi (trong trường hợp này là before = nullafter sẽ chứa dữ liệu của bản ghi vừa chèn). Chú ý rằng trường hợp op = 'c' chỉ rõ rằng đây là một hành động create.

Khi bạn cập nhật bản ghi, payload sẽ được cập nhật với before chứa dữ liệu cũ và after chứa dữ liệu mới.

Copy
UPDATE `users` SET `name` = 'Tuan test update cdc' WHERE `id` = 9;

Kết Thúc! Cảm ơn bạn đã theo dõi bài viết.

3. Tài Liệu Tham Khảo

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào