Khi làm việc với hệ thống log phức tạp, việc xử lý log từ Kafka một cách hiệu quả là rất quan trọng. Trong bài viết này, chúng ta sẽ tìm hiểu cách cấu hình tệp logstash.conf
để thu thập và phân tích log từ Kafka. Chúng ta sẽ sử dụng grok
để phân tích cú pháp log và mutate
để loại bỏ những trường không cần thiết.
Cấu Hình Logstash Để Xử Lý Log Từ Kafka
Phần Nhập Dữ Liệu
Đầu tiên, chúng ta sẽ định nghĩa phần đầu vào để kết nối với Kafka:
plaintext
input {
kafka {
bootstrap_servers => "10.100.30.32:9092"
topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
group_id => "log_consumer_group"
auto_offset_reset => "earliest"
}
}
Phần Lọc Dữ Liệu
Sau đó, chúng ta sẽ phân tích cú pháp log và thực hiện các biến đổi cần thiết:
- Sử dụng
gsub
trong phầnmutate
để loại bỏ các ký tự không hợp lệ. - Sử dụng
grok
để phân tích các trường trong log. - Thêm và loại bỏ các trường không cần thiết.
plaintext
filter {
mutate {
gsub => ["message", "\u001b|\n", ""]
}
grok {
match => {
"message" => """
{"instant":{"epochSecond":%{NUMBER:epoch_sec},
"nanoOfSecond":%{NUMBER:nano_sec},
"thread":"%{DATA:thread},
"level":"%{WORD:level},
"loggerName":"%{DATA:logger_name},
"message":"%{GREEDYDATA:log_message},
"endOfBatch":%{GREEDYDATA:end_of_batch},
"loggerFqcn":"%{DATA:logger_fqcn},
"contextMap":{"traceId":"%{DATA:trace_id},
"spanId":"%{DATA:span_id},
"className":"%{DATA:class_name},
"clientIp":"%{IP:client_ip},
"clientMessageId":"%{DATA:client_message_id},
"clientTime":"%{TIMESTAMP_ISO8601:client_time},
"duration":%{NUMBER:duration},
"methodName":"%{DATA:method_name},
"path":"%{DATA:path},
"stackTrace":"%{GREEDYDATA:stack_trace}"},
"threadId":%{NUMBER:thread_id},
"threadPriority":%{NUMBER:thread_priority},
"logType":"%{WORD:log_type},
"application":"%{DATA:application},
"localIp":"%{IP:local_ip}"}
"""
}
}
mutate {
add_field => {
"ts" => "%{epoch_sec}"
"ip" => "%{client_ip}"
"msgId" => "%{client_message_id}"
"duration" => "%{duration}"
"method" => "%{method_name}"
"path" => "%{path}"
"app" => "%{application}"
"localIp" => "%{local_ip}"
"logType" => "%{log_type}"
}
remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"]
}
# Chuyển đổi timestamp
date {
match => [ "client_time", "ISO8601" ]
timezone => "+07:00"
target => "@timestamp"
}
# Thêm trường ngày tháng
ruby {
code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
}
}
Phần Xuất Dữ Liệu
Cuối cùng, định nghĩa phần xuất để gửi log đến Elasticsearch:
plaintext
output {
elasticsearch {
hosts => ["http://10.152.183.57:9200"]
template => "/usr/share/logstash/templates/logstash_template.json"
template_name => "logstash"
template_overwrite => true
index => "logstash-%{indexDay}"
document_type => "_doc"
codec => json
}
stdout {
codec => rubydebug
}
}
Kết Luận
Bằng cách áp dụng cấu hình trên, bạn có thể dễ dàng xử lý log từ Kafka với Logstash. Điều này không chỉ giúp bạn phân tích log một cách hiệu quả mà còn giúp loại bỏ những thông tin không cần thiết, từ đó tối ưu hóa quy trình thu thập dữ liệu của bạn.
source: viblo