Giới thiệu
Trong những năm gần đây, với sự bùng nổ của công nghệ AI, lĩnh vực Big Data ngày càng thu hút sự chú ý. Trong số nhiều thành phần của luồng dữ liệu (Data Streaming), ETL, và các khía cạnh khác, Message Queue là một phần không thể thiếu. Trong đó, Apache Kafka đã trở thành lựa chọn hàng đầu cho nhiều tổ chức lớn như Netflix, Tesla và Meta. Bài viết này sẽ hướng dẫn bạn cách cài đặt Apache Kafka với cấu hình multi-broker trên nền tảng Docker.
Cài đặt
Các Yêu cầu
Trước tiên, bạn cần cài đặt Docker. Bạn có thể tìm thấy hướng dẫn chi tiết và dễ dàng trên tài liệu chính thức của Docker. Trong bài viết này, chúng ta sẽ cài đặt một cụm Kafka gồm 1 Zookeeper, 3 broker và 1 dịch vụ Kafdrop để trực quan hóa dữ liệu.
Sử dụng Docker Compose
Bật Docker lên và tạo một tệp tin docker-compose.yml
trong thư mục bất kỳ. Dưới đây là nội dung tệp tin mà bạn có thể sao chép và sử dụng. Sau đó, chạy lệnh docker compose up -d
để khởi tạo và khởi động các container (nhớ bật Docker trước nhé).
yaml
version: "3"
services:
zookeeper:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
container_name: kafka-broker-1
ports:
- "9091:9091"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://kafka1:9091
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka2:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka2
container_name: kafka-broker-2
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:29092,LISTENER_DOCKER_EXTERNAL://kafka2:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:29092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
volumes:
- ./data/kafka2/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka3:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka3
container_name: kafka-broker-3
ports:
- "9093:9093"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:29093,LISTENER_DOCKER_EXTERNAL://kafka3:9093
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:29093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
volumes:
- ./data/kafka3/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop:latest
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "9123:9123"
environment:
SERVER_PORT: 9123
MANAGEMENT_SERVER_PORT: 9123
KAFKA_BROKERCONNECT: kafka-broker-1:19091,kafka-broker-2:29092,kafka-broker-3:29093
Zookeeper
Zookeeper là component đầu tiên mà chúng ta cài đặt, phục vụ cho việc điều phối các broker. Mặc dù đã có giải pháp cài đặt Kafka không cần Zookeeper, nhưng chúng tôi không đề cập đến trong bài viết này.
- Trường
image
xác định hình ảnh Docker được sử dụng, trong khihostname
là tên được đặt cho container để dễ dàng nhận diện trong một mạng. - Với
ports
, có hai loại port:HOST_PORT
(port bên ngoài) vàCONTAINER_PORT
(port bên trong container). Các container sẽ giao tiếp qua CONTAINER_PORT, trong khi các client bên ngoài sẽ sử dụng HOST_PORT để kết nối với dịch vụ. environment
dùng để cấu hình các thiết lập cho container.volumes
được sử dụng để ánh xạ thư mục từ máy chủ vào container, đảm bảo dữ liệu không bị mất khi khởi động lại hoặc dừng container. Định dạng là[SOURCE]:[TARGET]:[MODE]
, trong đóSOURCE
là thư mục trên máy chủ vàTARGET
là thư mục trong container.
Kafka Broker
Các trường như port
, image
,... trong các broker giống với Zookeeper, do đó chúng tôi sẽ không đi sâu vào chi tiết ở đây.
- Trường
depends_on
xác định thứ tự container sẽ khởi động, đảm bảo rằng Zookeeper được khởi động trước. Dưới đây là ý nghĩa của các biến trong phầnenvironment
:KAFKA_ZOOKEEPER_CONNECT
: chỉ định địa chỉ Zookeeper.KAFKA_BROKER_ID
: ID duy nhất cho mỗi broker.KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
: số lượng sao chép cho các topic, phải nhỏ hơn hoặc bằng số lượng broker.KAFKA_LISTENERS
: chỉ định giao diện mạng mà Kafka lắng nghe.KAFKA_ADVERTISED_LISTENERS
: cấu hình cách mà client sẽ kết nối.KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
: ánh xạ giữa các listener và giao thức bảo mật.KAFKA_INTER_BROKER_LISTENER_NAME
: chỉ định listener cho các kết nối nội bộ giữa các broker.
Ví dụ với broker 1, các client bên trong mạng Docker sẽ kết nối với LISTENER_DOCKER_INTERNAL
qua port 19091 và hostname là kafka1. Ngược lại, client bên ngoài sẽ kết nối với broker 1 thông qua LISTENER_DOCKER_EXTERNAL
qua port 9091 và hostname là localhost.
Kafdrop
Kafdrop là dịch vụ giúp bạn dễ dàng trực quan hóa các thông tin trong Kafka. Cấu hình Kafdrop rất đơn giản, bạn chỉ cần chỉ định các broker trong biến KAFKA_BROKERCONNECT
. Các trường SERVER_PORT
và MANAGEMENT_SERVER_PORT
cho phép tùy chỉnh port cho Kafdrop, bởi vì port mặc định của nó là 9000 trong khi chúng ta đang sử dụng một dịch vụ khác ở trên cùng một port. Thông qua Kafdrop, bạn có thể xem các message trong các partition của các broker, kiểm tra các topic và các tham số liên quan như replication factor và leader, hoặc tạo topic mới một cách đơn giản.
Kiểm tra Cụm Kafka
Sau khi cài đặt và khởi động Docker, bạn hãy kiểm tra xem các container đã hoạt động đúng hay chưa. Để gửi message vào Kafka, chúng ta sẽ dùng Python. Bạn cần cài đặt thư viện kafka-python
. Dưới đây là mã ví dụ để gửi message:
python
from kafka import KafkaProducer
import json
import datetime
producer = KafkaProducer(bootstrap_servers=['localhost:9091', 'localhost:9092', 'localhost:9093'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
topic = 'tuan'
msg = {
"current_time": str(datetime.datetime.now())
}
producer.send(topic, value=msg)
producer.flush()
# print(msg)
Chạy tệp tin này bằng lệnh python kafka_prod.py
. Nếu bạn thấy message in ra terminal, điều đó có nghĩa là bạn đã gửi thành công. Sau đó, bạn có thể kiểm tra Kafdrop để xác nhận rằng message đã được gửi lên hệ thống thành công.
Tài liệu Tham khảo
source: viblo