Giới Thiệu
Bài viết này sẽ cung cấp cho bạn một phiên bản cải tiến của cấu hình Logstash, giúp tối ưu hóa quá trình thu thập và xử lý dữ liệu từ Kafka. Qua đó, bạn sẽ hiểu rõ hơn về cách sử dụng Logstash để tương tác với các nguồn dữ liệu quan trọng.
Cấu Hình Logstash Đã Được Sửa Đổi
Phần Input
plaintext
input {
kafka {
bootstrap_servers => "10.100.30.32:9092"
topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
group_id => "log_consumer_group"
auto_offset_reset => "earliest"
}
}
Trong phần cấu hình input
, chúng ta đang kết nối với Kafka để nhận dữ liệu từ các topic cụ thể. Điều này giúp chúng ta thu thập log từ hai nguồn quan trọng là ESMART-CATEGORY-LOGS và ESMART-GATEWAY-LOGS.
Phần Filter
plaintext
filter {
grok {
match => {
"message" => """
{"instant":{"epochSecond":%{NUMBER:epoch_sec},"nanoOfSecond":%{NUMBER:nano_sec},"thread":"%{DATA:thread}","level":"%{WORD:level}","loggerName":"%{DATA:logger_name}","message":"%{GREEDYDATA:log_message}","endOfBatch":%{GREEDYDATA:end_of_batch},"loggerFqcn":"%{DATA:logger_fqcn}","contextMap":{"traceId":"%{DATA:trace_id}","spanId":"%{DATA:span_id}","className":"%{DATA:class_name}","clientIp":"%{IP:client_ip}","clientMessageId":"%{DATA:client_message_id}","clientTime":"%{TIMESTAMP_ISO8601:client_time}","duration":%{NUMBER:duration},"methodName":"%{DATA:method_name}","path":"%{DATA:path}","stackTrace":"%{GREEDYDATA:stack_trace}"},"threadId":%{NUMBER:thread_id},"threadPriority":%{NUMBER:thread_priority},"logType":"%{WORD:log_type}","application":"%{DATA:application}","localIp":"%{IP:local_ip}"}
"""
}
}
mutate {
add_field => {
"ts" => "%{epoch_sec}"
"ip" => "%{client_ip}"
"msgId" => "%{client_message_id}"
"duration" => "%{duration}"
"method" => "%{method_name}"
"path" => "%{path}"
"app" => "%{application}"
"localIp" => "%{local_ip}"
"logType" => "%{log_type}"
}
remove_field => [
"instant", "thread", "level", "logger_name", "message",
"endOfBatch", "logger_fqcn", "contextMap",
"threadId", "threadPriority"
]
}
date {
match => [ "client_time", "ISO8601" ] # Thay đổi để khớp với trường chính xác
timezone => "+07:00"
target => "@timestamp"
}
ruby {
code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
}
}
Trong phần này, chúng tôi sử dụng grok
để trích xuất dữ liệu từ các log message và mutate
để cải thiện cấu trúc dữ liệu đầu ra. Chúng tôi giữ lại các trường thông tin quan trọng và loại bỏ các trường không cần thiết.
Phần Output
plaintext
output {
elasticsearch {
hosts => ["http://10.152.183.57:9200"]
template => "/usr/share/logstash/templates/logstash_template.json"
template_name => "logstash"
template_overwrite => true
index => "logstash-%{indexDay}"
document_type => "_doc"
}
stdout {
codec => rubydebug
}
}
Cuối cùng, dữ liệu sau khi được xử lý sẽ được gửi đến Elasticsearch và hiển thị trên terminal để theo dõi.
Các Điểm Cần Lưu Ý
- Đảm bảo rằng các đường dẫn và thông số kết nối là chính xác trước khi triển khai.
- Theo dõi hiệu suất của Logstash để tối ưu hóa cho các khối lượng dữ liệu lớn.
Bằng cách áp dụng cấu hình này, bạn sẽ có được một hệ thống thu thập log hiệu quả hơn, giúp việc phân tích và giám sát trở nên dễ dàng và chính xác hơn.
source: viblo