1. Giới Thiệu
Change Data Capture (CDC) là một quy trình quan trọng trong quản lý cơ sở dữ liệu, cho phép ghi nhận và theo dõi các thay đổi dữ liệu trong thời gian thực bằng cách sử dụng log hoặc stream dữ liệu. Điều này mang lại cho các ứng dụng và hệ thống khả năng tiêu thụ và xử lý các thay đổi mới nhất trong dữ liệu.
Trong quá khứ, việc đồng bộ hóa dữ liệu giữa các cơ sở dữ liệu như MySQL, PostgreSQL hay MongoDB chỉ cần thực hiện việc xuất dữ liệu từ cơ sở dữ liệu này và nhập vào cơ sở dữ liệu khác. Tuy nhiên, phương pháp này yêu cầu thời gian dừng máy chủ (downtime) và có thể gây ra sự gián đoạn trong dịch vụ, đặc biệt khi khối lượng dữ liệu lớn, dễ dẫn đến treo máy chủ. Hơn nữa, việc đồng bộ giữa các loại cơ sở dữ liệu khác nhau cũng gặp rất nhiều khó khăn, chẳng hạn như đồng bộ từ MySQL sang Elasticsearch, PostgreSQL hay MongoDB.
Chính vì vậy, CDC đã ra đời như một giải pháp cho những vấn đề này. Một trong những công cụ CDC phổ biến hiện nay là Debezium, được sử dụng rộng rãi bởi nhiều công ty lớn, trong đó có TIKI. Debezium hoạt động kết hợp với Kafka, nơi mà các topic và partition của Kafka lưu trữ các message, và các service sử dụng worker để tiêu thụ các topic và xử lý nghiệp vụ.
Mô hình hoạt động của Debezium như sau: nó ghi nhận sự thay đổi trong cơ sở dữ liệu, gửi các message thay đổi vào các topic trong Kafka, các sink connector sẽ vào role tiêu thụ message và đồng bộ hóa dữ liệu sang các hệ thống thứ ba. Sink connector là một component trong hệ sinh thái Apache Kafka, chịu trách nhiệm tiêu thụ và xử lý message từ các topic.
Tuy nhiên, để hiểu rõ hơn về cách thiết lập CDC với Kafka, bạn chỉ cần chú ý đến việc đẩy message vào các topic trong Kafka và tự viết một worker để tiêu thụ các topic đó.
2. Cấu Hình
Dưới đây là một ví dụ về file Docker Compose để tạo các container cho Kafka, Kafka UI, MySQL, và Debezium Connect:
yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
- KAFKA_CLUSTERS_0_NAME=local-kafka
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
mysql:
image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
- MYSQL_DATABASE=cdc-db
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
Sau khi tạo file xong, bạn có thể chuyển đến thư mục chứa file và chạy lệnh:
sudo DEBEZIUM_VERSION=1.9 docker-compose up -d
Sau khi thực thi lệnh trên, bạn sẽ thấy các container đang chạy. Tiếp theo, bạn có thể sử dụng lệnh CURL dưới đây để tạo một connector cho Debezium:
curl --location 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
"name": "cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"tasks.max": "1",
"database.server.name": "server_name",
"database.whitelist": "cdc-db",
"table.include.list": "cdc-db.users",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "history.users"
}
}'
Thông qua CURL, bạn có thể cấu hình kết nối tới cơ sở dữ liệu MySQL và xác định danh sách các cơ sở dữ liệu và bảng cần thiết phải ghi nhận. Trong ví dụ này, chúng ta sẽ ghi nhận cơ sở dữ liệu cdc-db và bảng users, các message sẽ được gửi vào topic server_name.cdc-db.users.
Sau khi cấu hình xong, bạn chỉ cần insert một bản ghi mới vào bảng users và theo dõi các message được ghi nhận trong topic Kafka:
INSERT INTO `users` (`name`) VALUES
('Tuan test cdc');
Khi thực thi câu lệnh trên, bạn sẽ nhận được một message trong payload với dữ liệu trước và sau khi thay đổi (trong trường hợp này là before = null
và after
sẽ chứa dữ liệu của bản ghi vừa chèn). Chú ý rằng trường hợp op = 'c' chỉ rõ rằng đây là một hành động create.
Khi bạn cập nhật bản ghi, payload sẽ được cập nhật với before
chứa dữ liệu cũ và after
chứa dữ liệu mới.
UPDATE `users` SET `name` = 'Tuan test update cdc' WHERE `id` = 9;
Kết Thúc! Cảm ơn bạn đã theo dõi bài viết.
3. Tài Liệu Tham Khảo
- Bài viết của tôi trên blog: https://tuannguyenhust.hashnode.dev/change-data-capture-cau-hinh-connector-voi-kafka
source: viblo