Giới thiệu
Quản lý log ở quy mô lớn có thể là một thách thức, đặc biệt khi tất cả log đều được gửi đến cùng một topic Kafka. Nhưng nếu bạn có thể định tuyến log vào các topic riêng cho từng pod (hoặc bất kỳ topic tùy chỉnh nào) trực tiếp từ Fluent Bit? Điều này giúp việc lọc và truy vấn trở nên dễ dàng hơn, đặc biệt trong các môi trường Kubernetes đa người dùng hoặc quy mô lớn. Trong bài viết này, chúng ta sẽ khám phá cách thiết lập topic Kafka động trong Fluent Bit bằng cách sử dụng metadata Kubernetes.
Vấn đề
Plugin đầu ra Kafka của Fluent Bit hỗ trợ việc gán topic động bằng cách sử dụng Dynamic_topic on và Topic_key. Tuy nhiên, tài liệu mô tả về việc sử dụng các trường lồng nhau khá mơ hồ, điều này xảy ra với các trường metadata Kubernetes khi sử dụng làm khóa topic.
Ví dụ:
json
{
"timestamp":"2025-05-13T19:27:31.954980134Z",
"log":"Server is listening on port 5000",
"kubernetes":{
"namespace_name":"default",
"pod_id":"de70b4a7-2803-485b-9015-3d17791969ae",
"labels":{
"app":"dy-7d8eeb3d4fb6da81d88da56280de7d42",
"pod-template-hash":"5744dfc46c"
},
"container_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42",
"annotations":{
"kubectl.kubernetes.io/restartedAt":"2025-05-13T22:27:29+03:00"
},
"container_image":"docker.io/library/dy-7d8eeb3d4fb6da81d88da56280de7d42:latest",
"container_hash":"sha256:d110c044a3e9fed6c3b3c9222a1b4d5cf4bc5f128a1d5f47980abee7ff98f745",
"host":"kind-control-plane",
"docker_id":"da27432083a32dada6ccac359d72233a4e93d98d1a139e6b2d4bee7d167c49cc",
"pod_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf"
}
}
Nhìn qua, có vẻ như chúng ta có thể sử dụng $kubernetes['pod_name'] làm Topic_key, vì tài liệu cho thấy cú pháp này trong các ví dụ lọc. Nhưng thực tế, điều này không hoạt động cho đầu ra — chỉ dành cho các bộ lọc.
❌ Cấu hình không hoạt động:
ini
[OUTPUT]
Name kafka
Match *
Brokers kafka:9093
Topics kube
Topic_key $kubernetes['pod_name'] # Cú pháp này không hoạt động
Dynamic_topic on
Giải pháp
Sau một vài lần thử nghiệm, tôi phát hiện ra lý do tại sao cấu hình trong tài liệu không hoạt động:
Plugin đầu ra Kafka của Fluent Bit chỉ hỗ trợ các khóa cấp cao nhất trong Topic_key.
Điều này có nghĩa là nếu trường của bạn lồng nhau (như kubernetes.pod_name), Fluent Bit sẽ không giải quyết nó trực tiếp.
Mẹo là nâng cấp trường lồng nhau lên cấp cao nhất trước khi nó đến plugin đầu ra.
Chúng ta có thể làm điều này bằng cách sử dụng một bộ lọc Lua.
Bước 1: Log gốc (trường lồng nhau bên trong kubernetes)
json
{
"timestamp":"2025-05-13T19:27:31.954980134Z",
"log":"Server is listening on port 5000",
"kubernetes": {
"namespace_name":"default",
"pod_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf"
}
}
Bước 2: Bộ lọc Lua để nâng cấp trường lồng nhau
lua
function promote_pod_name(tag, timestamp, record)
local k8s = record["kubernetes"]
if k8s and k8s["pod_name"] then
record["topic_name"] = k8s["pod_name"]
else
record["topic_name"] = "default-topic" -- fallback
end
return 1, timestamp, record
end
Lưu ý nhanh:
- 1 có nghĩa là "giữ lại bản ghi"
timestampđược truyền qua bộ lọc không thay đổirecordlà log đã được cập nhật với topic_name ở cấp cao nhất
Bước 3: Log sau bộ lọc Lua
json
{
"timestamp":"2025-05-13T19:27:31.954980134Z",
"log":"Server is listening on port 5000",
"topic_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf",
"kubernetes": { ... }
}
Bây giờ Fluent Bit có thể an toàn sử dụng topic_name làm Topic_key.
Bước 4: Cấu hình Fluent Bit
ini
[FILTER]
Name lua
Match kube.*
script /fluent-bit/scripts/promote_pod_name.lua
call promote_pod_name
[OUTPUT]
Name kafka
Match *
Brokers kafka:9093
Topics kube
Topic_key topic_name
Dynamic_topic on
Format json
Cấu hình ConfigMap cho Fluent Bit
Dưới đây là một ví dụ về cấu hình ConfigMap cho Fluent Bit để xử lý log từ các pod Kubernetes:
yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: logging
data:
fluent-bit.conf: |
[INPUT]
Name tail
Path /var/log/containers/*.log
multiline.parser docker, cri
Tag kube.*
Mem_Buf_Limit 5MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match kube.*
Merge_Log On
Keep_Log Off
Kube_Tag_Prefix kube.var.log.containers.
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
K8S-Logging.Exclude On
[FILTER]
Name lua
Match kube.*
script /fluent-bit/scripts/promote_pod_name.lua
call promote_pod_name
[OUTPUT]
Name kafka
Match *
Brokers kafka:9093
Topics kube
Topic_key topic_name
Dynamic_topic on
Format json
Các ứng dụng khác
Cách tiếp cận này không chỉ giới hạn ở pod_name — bạn có thể nâng cấp bất kỳ trường hoặc nhãn Kubernetes lồng nhau nào lên trường cấp cao nhất và sử dụng nó làm khóa topic Kafka của bạn. Điều này hữu ích cho:
- Phân tách log theo namespace
- Tạo topic Kafka riêng cho từng ứng dụng
- Đảm bảo phân lập log cho đa người dùng
Nếu bạn thử nghiệm điều này trong môi trường của mình hoặc gặp phải các trường hợp biên, tôi rất muốn nghe từ bạn! Hãy kết nối với tôi trên LinkedIn hoặc gửi cho tôi một tin nhắn.