Giới thiệu
Trong phần tiếp theo của series tìm hiểu về Debezium, chúng ta sẽ cùng nhau đi sâu vào Debezium Engine và cách tích hợp nó với cơ sở dữ liệu PostgreSQL. Bài viết này sẽ giúp bạn thiết lập và cấu hình Debezium để theo dõi sự thay đổi dữ liệu trong cơ sở dữ liệu một cách hiệu quả.
Debezium Engine
Debezium cung cấp một bộ module có tên là debezium-api được phát triển bằng Java. Module này cho phép bạn kết nối tới cơ sở dữ liệu để đọc logs và theo dõi các biến đổi dữ liệu.
Thêm dependencies vào file POM
Đầu tiên, bạn cần thêm các dependencies vào file POM của dự án Maven như sau:
xml
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
Nếu bạn sử dụng các cơ sở dữ liệu khác như MySQL, bạn chỉ cần thêm connector tương ứng:
xml
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
Khởi tạo Connector
Sau khi cấu hình dependencies, tiếp theo bạn có thể khởi tạo connector như sau:
java
this.engine = DebeziumEngine
.create(ChangeEventFormat.of(Connect.class))
.using(setConfig())
.notifying(new CdcSummaryBatchHandler2())
.build();
Thực thi Connector
Để thực thi connector, bạn cần tạo một luồng mới và chạy nó:
java
private final Executor executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void start() {
this.executor.execute(engine);
}
@PreDestroy
private void stop() {
if (this.engine != null) {
try {
this.engine.close();
} catch (IOException e) {
log.error("Không thể đóng engine Debezium");
}
}
}
Trong đó, hai đối tượng setConfig
và class CdcSummaryBatchHandler2
là nơi mà bạn sẽ khai báo cấu hình và lắng nghe các sự kiện từ cơ sở dữ liệu.
Cấu hình Debezium Engine
java
private Properties setConfig() {
Properties configProperties = new Properties();
configProperties.put("plugin.name", "pgoutput");
configProperties.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
// Cấu hình Kafka
configProperties.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
configProperties.put("key.converter.schemas.enable", "false");
configProperties.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
configProperties.put("value.converter.schemas.enable", "false");
configProperties.put("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
configProperties.put("offset.storage.file.filename", "/Users/vanhanhchu/Documents/file.DAT");
configProperties.put("offset.flush.interval.ms", "60000");
configProperties.put("name", "connectorName");
configProperties.put("database.server.name", applicationProperties.getInventorySummaryBranchCdc().getDatabaseServerName());
configProperties.put("database.hostname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseHostname());
configProperties.put("database.port", applicationProperties.getInventorySummaryBranchCdc().getDatabasePort());
configProperties.put("database.user", applicationProperties.getInventorySummaryBranchCdc().getDatabaseUser());
configProperties.put("database.password", applicationProperties.getInventorySummaryBranchCdc().getDatabasePassword());
configProperties.put("database.dbname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseDbname());
configProperties.put("database.tcpKeepAlive", 600);
configProperties.put("table.include.list", "item-services.inventory_summary_branch,item-services.cdc_update_es,item-services.item_model,item-services.cdc_heart_beat");
// Cấu hình mở rộng
configProperties.put("skipped.operations", "d,u");
configProperties.put("provide.transaction.metadata", "true");
configProperties.put("max.batch.size", "500");
configProperties.put("slot.name", "etl_replication_1");
configProperties.put("snapshot.mode", "never");
configProperties.put("heartbeat.interval.ms", "30000");
configProperties.put("heartbeat.action.query", "UPDATE \"item-services\".cdc_heart_beat SET ts_time = now() WHERE id = 1;");
return configProperties;
}
Giải thích các tham số
- skipped.operations: Đây là các event sẽ bị bỏ qua (d=delete, u=update).
- slot.name: Tên slot replication sẽ được tạo trong cơ sở dữ liệu của bạn.
- max.batch.size: Số lượng tối đa trong một batch.
- offset.storage.file.filename: Đường dẫn file lưu trữ offset.
- table.include.list: Danh sách các bảng mà Debezium sẽ đọc logs.
- snapshot.mode: Chế độ snapshot trong lần khởi tạo đầu tiên. Mặc định là
initial
, Debezium sẽ tiến hành snapshot toàn bộ dữ liệu từ logs. Nếu để giá trị lànever
, Debezium sẽ chỉ đọc logs từ thời điểm tạo replication. - heartbeat.interval.ms: Thời gian thực thi câu truy vấn
heartbeat.action.query
sau mỗi X ms. - heartbeat.action.query: Câu truy vấn được thực hiện, giữ cho kết nối ổn định.
Implementing CdcSummaryBatchHandler2
java
public class CdcSummaryBatchHandler2 implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
private final Logger log = LoggerFactory.getLogger(CdcSummaryBatchHandler2.class);
@Override
public void handleBatch(List<RecordChangeEvent<SourceRecord>> records,
DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
try {
Map<String, List<RecordChangeEvent<SourceRecord>>> group = records.stream().collect(Collectors.groupingBy(x -> x.record().topic()));
for (Map.Entry<String, List<RecordChangeEvent<SourceRecord>>> data : group.entrySet()) {
log.debug("record value" + data.getValue());
committer.markBatchFinished();
}
} catch (Throwable e) {
log.error("ETL_ERROR " + e.getMessage());
e.printStackTrace();
}
}
}
Kết quả nhận được
Khi nhận được RecordChangeEvent<SourceRecord>
, bạn có thể chuyển đổi nó về các đối tượng tương ứng mà bạn muốn:
java
public static <T> Optional<T> mapToModel(SourceRecord sourceRecord, Class<T> clasz, Operation... opeAction) {
Optional<Map<String, Object>> map = sourceRecordToMap(sourceRecord, opeAction);
if (map.isPresent()) {
return Optional.of(mapper.convertValue(map.get(), clasz));
}
return Optional.empty();
}
Tài liệu tham khảo
- T5K - Truyện chữ online (nhà cung cấp nguồn)
- debezium.io - bản hướng dẫn sử dụng Debezium Engine
Series bài viết
- Phần 1: Debezium là gì? Ứng dụng thực tế.
- Phần 2: Cấu hình sử dụng Debezium Engine với PostgreSQL.
- Phần 3: Cấu hình sử dụng Debezium với PostgreSQL và Kafka Connect.
source: viblo