0
0
Lập trình
TT

Tất Tần Tật Về Change Data Capture (CDC)

Đăng vào 7 tháng trước

• 11 phút đọc

Giới thiệu về Change Data Capture (CDC)

Change Data Capture (CDC) là một phương pháp giúp phát hiện, ghi lại và chuyển tiếp chỉ những dữ liệu đã thay đổi từ hệ thống nguồn vào các hệ thống hạ nguồn như kho dữ liệu, bảng điều khiển hay ứng dụng phát trực tuyến. Trong thời đại dữ liệu lớn, việc duy trì tính chính xác và kịp thời của dữ liệu là vô cùng quan trọng, và CDC chính là giải pháp tối ưu cho bài toán này.

Nguyên tắc cốt lõi của CDC

  • Ghi lại dữ liệu: Phát hiện các thay đổi trong dữ liệu nguồn mà không làm ảnh hưởng đến hiệu suất của hệ thống nguồn.
  • Cập nhật gia tăng: Chỉ truyền tải dữ liệu đã thay đổi để giảm thiểu tải trọng.
  • Xử lý thời gian thực hoặc gần thời gian thực: Duy trì dữ liệu mới nhất trong các hệ thống mục tiêu.
  • Idempotency: Đảm bảo rằng các thay đổi được áp dụng nhiều lần không làm hỏng dữ liệu.
  • Theo dõi dựa trên nhật ký: Sử dụng nhật ký giao dịch của cơ sở dữ liệu để ghi lại dữ liệu một cách chính xác và có thể mở rộng.

Các phương pháp triển khai CDC

A) CDC dựa trên nhật ký (Log-based CDC)

Log-based CDC là phương pháp mạnh mẽ nhất, nó đọc trực tiếp các nhật ký giao dịch của cơ sở dữ liệu (ví dụ: PostgreSQL WAL, MySQL binlogs) để phát hiện các sự kiện thay đổi với độ trễ tối thiểu và khả năng mở rộng cao.
Ưu điểm: Tải trọng hệ thống thấp và hiệu suất gần thời gian thực, phù hợp cho các môi trường có khối lượng dữ liệu cao.
Nhược điểm: Cần quyền truy cập đặc quyền đến các nhật ký giao dịch và phụ thuộc vào các cài đặt lưu giữ nhật ký thích hợp.

Ví dụ: Sao chép logic với psql

sql Copy
-- Kích hoạt sao chép logic
ALTER SYSTEM SET wal_level = logical;

-- Tạo một slot sao chép logic để ghi lại các thay đổi
SELECT pg_create_logical_replication_slot('cdc_slot', 'pgoutput');

-- Lấy các thay đổi gần đây từ WAL
SELECT * FROM pg_logical_slot_changes('cdc_slot', NULL, NULL);

B) CDC dựa trên Trigger (Trigger-based CDC)

Sử dụng các trigger của cơ sở dữ liệu để ghi lại các thay đổi. Cung cấp độ trễ thấp nhưng có thể ảnh hưởng đến hiệu suất do tải trọng từ các trigger.

Ưu điểm: Dễ triển khai trên các cơ sở dữ liệu hỗ trợ trigger và đảm bảo ghi lại thay đổi ngay lập tức.
Nhược điểm: Có thể làm tăng tải cho cơ sở dữ liệu và có thể làm phức tạp các thay đổi về cấu trúc nếu không được quản lý cẩn thận.

sql Copy
-- Tạo một bảng audit để lưu trữ các thay đổi
CREATE TABLE customers_audit (
    audit_id SERIAL PRIMARY KEY,
    operation_type TEXT,
    customer_id INT,
    customer_name TEXT,
    modified_at TIMESTAMP DEFAULT now()
);

-- Tạo một hàm để chèn các bản ghi thay đổi
CREATE OR REPLACE FUNCTION capture_customer_changes()
RETURNS TRIGGER AS $
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO customers_audit (operation_type, customer_id, customer_name)
        VALUES ('INSERT', NEW.id, NEW.name);
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO customers_audit (operation_type, customer_id, customer_name)
        VALUES ('UPDATE', NEW.id, NEW.name);
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO customers_audit (operation_type, customer_id, customer_name)
        VALUES ('DELETE', OLD.id, OLD.name);
    END IF;
    RETURN NULL; -- Không cần sửa đổi dữ liệu bảng gốc
END;
$ LANGUAGE plpgsql;

-- Gắn trigger vào bảng customers
CREATE TRIGGER customer_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON customers
FOR EACH ROW EXECUTE FUNCTION capture_customer_changes();

C) CDC dựa trên Polling (Polling-based/Query-based CDC)

Phương pháp này định kỳ truy vấn cơ sở dữ liệu nguồn để kiểm tra các thay đổi dựa trên cột timestamp hoặc version.

Ví dụ: Một bảng sản phẩm với cột version_number mà tăng lên mỗi khi có cập nhật.

Ưu điểm: Dễ triển khai khi không có quyền truy cập nhật ký hoặc trigger.
Nhược điểm: Có thể làm chậm việc ghi lại thay đổi và tăng tải nếu polling quá thường xuyên.

D) CDC dựa trên Timestamp (Timestamp-based CDC)

Phương pháp này dựa vào một cột dành riêng ghi lại thời gian sửa đổi cuối cùng cho mỗi bản ghi. Bằng cách so sánh các timestamp này, hệ thống có thể xác định các bản ghi đã thay đổi kể từ lần kiểm tra trước đó.

Các công cụ và công nghệ CDC chính

Debezium

Debezium là nền tảng CDC mã nguồn mở ghi lại các thay đổi theo hàng từ nhiều cơ sở dữ liệu khác nhau, bao gồm PostgreSQL, MySQL, SQL Server và MongoDB, và phát hành chúng dưới dạng các luồng sự kiện thay đổi thường vào Apache Kafka.

json Copy
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.dbname": "inventory",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "database.server.name": "dbserver1",
    "include.schema.changes": "true"
  }
}

Debezium hỗ trợ các snapshot gia tăng và chặn, đáp ứng các thay đổi về cấu trúc, cung cấp khả năng chịu lỗi với việc theo dõi offset và hỗ trợ các bảng tín hiệu cho các snapshot theo yêu cầu.

Apache Kafka & Kafka Connect

Kafka phục vụ như một nền tảng phát sự kiện bền vững, có thể mở rộng lý tưởng cho việc vận chuyển các sự kiện CDC. Kafka Connect cung cấp các connector có thể mở rộng để nhập các sự kiện CDC từ các nguồn (như các connector Debezium) và cung cấp cho các hệ thống hạ nguồn.

python Copy
from kafka import KafkaProducer
import json

# Khởi tạo producer Kafka với các máy chủ bootstrap và bộ tuần tự hóa JSON cho giá trị.
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Định nghĩa một sự kiện CDC bao gồm chi tiết của hoạt động.
cdc_event = {
    "table": "orders",
    "operation": "update",
    "data": {"order_id": 123, "status": "shipped"}
}

# Gửi sự kiện CDC đến 'cdc-topic' và flush để đảm bảo truyền tải.
producer.send('cdc-topic', cdc_event)
producer.flush()
print("Sự kiện CDC đã được gửi thành công!")

Các sự kiện CDC được công bố dưới dạng các chủ đề Kafka, cho phép các ứng dụng tiêu thụ thực hiện phân tích thời gian thực, caching và các tác vụ sao chép.

Các loại connector CDC chính trong Kafka Connect:

  • Source connectors: Ghi lại và truyền tải các sự kiện thay đổi vào Kafka.
  • Sink connectors: Tiêu thụ các sự kiện CDC từ Kafka và ghi chúng vào các kho dữ liệu khác.

Confluent Cloud CDC Connectors

Confluent Cloud cung cấp các connector CDC được quản lý, bao gồm Oracle CDC Source Connector, cho phép dễ dàng ghi lại từ các nhật ký redo của Oracle và công bố lên các chủ đề Kafka với khả năng chịu lỗi tích hợp và hỗ trợ cho ACL bảo mật, quản lý offset và phân vùng chủ đề.

AWS Database Migration Service (DMS)

Sử dụng CDC dựa trên nhật ký để liên tục sao chép dữ liệu từ các hệ thống tại chỗ sang đám mây AWS với thời gian ngừng hoạt động tối thiểu.

Talend và Informatica

TalendInformatica là các nền tảng ETL toàn diện cung cấp chức năng CDC tích hợp để ghi lại và xử lý các thay đổi dữ liệu, giảm thiểu cấu hình thủ công. Chúng đặc biệt có lợi trong các tình huống chuyển đổi dữ liệu phức tạp, nơi các giải pháp tích hợp có thể đơn giản hóa hoạt động.

Giải pháp CDC tự nhiên trong cơ sở dữ liệu

Nhiều cơ sở dữ liệu quan hệ cung cấp các tính năng CDC tự nhiên, giảm thiểu nhu cầu sử dụng các công cụ bên ngoài:

  • Sao chép logic PostgreSQL: Ghi lại các thay đổi trong WAL và phát chúng đến các người đăng ký.
  • CDC SQL Server: Sử dụng nhật ký giao dịch để theo dõi các thay đổi tự động.
  • Sao chép nhật ký nhị phân MySQL: Ghi lại các thay đổi cho mục đích sao chép.

Chiến lược triển khai CDC trong thực tế

1. Snapshot ban đầu

Bất kỳ pipeline CDC nào cũng bắt đầu bằng việc ghi lại một snapshot nhất quán của cơ sở dữ liệu nguồn để các hệ thống hạ nguồn bắt đầu với một cơ sở chính xác. Debezium thực hiện các snapshot bằng cách sử dụng các truy vấn SQL trong các mức độ cách ly giao dịch tối ưu. Snapshot sẽ chạy một lần tại thời điểm khởi động hoặc theo yêu cầu, ghi lại trạng thái hiện tại trước khi chuyển sang chế độ phát trực tuyến.

2. Phát trực tuyến các thay đổi

Các thay đổi tiếp theo (INSERT, UPDATE, DELETE) được phát trực tiếp dưới dạng các sự kiện nguồn gốc trực tiếp từ các nhật ký cơ sở dữ liệu. Kafka cung cấp thông điệp bền vững và đảm bảo thứ tự. Các ứng dụng tiêu thụ sự kiện tái tạo hoặc duy trì các đại diện cập nhật của dữ liệu nguồn một cách hiệu quả.

3. Các mẫu phi chuẩn hóa

CDC thường phản ánh các sơ đồ nguồn có cấu trúc cao, điều này có thể khó tiêu thụ cho phân tích. Các phương pháp phi chuẩn hóa bao gồm:

  • Không phi chuẩn hóa: Sao chép đơn giản với các phép nối hạ nguồn.
  • Views vật liệu: Tạo các view cơ sở dữ liệu để kết hợp/enrich dữ liệu trước khi ghi lại CDC.
  • Mẫu Outbox: Ứng dụng ghi các sự kiện thay đổi vào một bảng outbox không thay đổi từ đó thực hiện CDC.
  • Xử lý luồng: Sử dụng Kafka Streams hoặc ksqlDB để làm phong phú và phi chuẩn hóa các sự kiện ở hạ nguồn.
  • Phi chuẩn hóa tại điểm đến: Thực hiện các chuyển đổi trong kho dữ liệu hoặc lake.

Việc lựa chọn nơi phi chuẩn hóa xảy ra phụ thuộc vào độ trễ, độ phức tạp và sở thích kiến trúc.

Thách thức và giải pháp CDC

  • Sự tiến hóa của sơ đồ: Thay đổi sơ đồ cơ sở dữ liệu nguồn (thêm/xóa cột, loại dữ liệu) có thể làm hỏng các pipeline CDC
    Giải pháp: Sử dụng registry sơ đồ và phiên bản; Debezium hỗ trợ một số quản lý thay đổi sơ đồ; thực hiện các cập nhật sơ đồ tương thích ngược; các snapshot gia tăng có thể xử lý một số thay đổi một cách duyên dáng.
  • Thứ tự sự kiện: Các thay đổi đến không theo thứ tự có thể gây ra trạng thái dữ liệu sai.
    Giải pháp: Sử dụng các đảm bảo phân vùng và thứ tự của Kafka; Debezium đệm các sự kiện snapshot và phát trực tuyến để giải quyết các va chạm; thiết kế các ứng dụng tiêu thụ idempotent.
  • Dữ liệu trễ: Các thay đổi dữ liệu bị trì hoãn do gián đoạn luồng hoặc độ trễ sao chép.
    Giải pháp: Sử dụng các chiến lược cửa sổ và watermark trong xử lý luồng; hỗ trợ phát lại bằng cách sử dụng lưu trữ nhật ký và quản lý offset của Kafka.
  • Khả năng chịu lỗi: Các lỗi mạng, hệ thống có thể ngắt quãng hoạt động của pipeline.
    Giải pháp: Theo dõi offset của Debezium để tiếp tục; độ bền của Kafka; ghi idempotent tại điểm đến; các bảng tín hiệu cho việc khởi động lại snapshot có kiểm soát.

Ví dụ về Kafka Connect Debezium Sink Connector để ghi dữ liệu CDC vào kho dữ liệu

json Copy
{
  "name": "dw-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "dbserver1.inventory.customers",
    "connection.url": "jdbc:postgresql://datawarehouse:5432/dw",
    "connection.user": "dw_user",
    "connection.password": "password",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id"
  }
}

Các connector nguồn Kafka đọc dữ liệu từ một hệ thống bên ngoài và ghi vào các chủ đề Kafka, trong khi các connector sink Kafka đọc dữ liệu từ các chủ đề Kafka và ghi vào một hệ thống bên ngoài.

Kết luận

Change Data Capture (CDC) là một giải pháp hiệu quả cho việc quản lý và tối ưu hóa quy trình xử lý dữ liệu trong thời gian thực. Với các công nghệ như Debezium và Kafka, các nhà phát triển có thể dễ dàng triển khai CDC để đảm bảo dữ liệu luôn được cập nhật và chính xác. Hãy bắt đầu khám phá các giải pháp CDC ngay hôm nay để tối ưu hóa quy trình dữ liệu của bạn!

Câu hỏi thường gặp (FAQ)

1. CDC có thể sử dụng cho những loại cơ sở dữ liệu nào?
CDC có thể được sử dụng với nhiều loại cơ sở dữ liệu khác nhau như PostgreSQL, MySQL, SQL Server, và MongoDB.
2. Có cần phải thay đổi cấu trúc cơ sở dữ liệu khi triển khai CDC không?
Không nhất thiết, nhưng một số thay đổi có thể yêu cầu cập nhật sơ đồ để đảm bảo CDC hoạt động hiệu quả.
3. Làm thế nào để xử lý các thay đổi lịch sử trong CDC?
Các thay đổi lịch sử có thể được quản lý bằng cách lưu trữ các phiên bản dữ liệu hoặc sử dụng các bảng audit để ghi lại các thay đổi.

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào