0
0
Lập trình
Admin Team
Admin Teamtechmely

Hướng Dẫn Cấu Hình Logstash Hiệu Quả Để Xử Lý Dữ Liệu Từ Kafka

Đăng vào 1 tháng trước

• 2 phút đọc

Chủ đề:

Logstash

Giới Thiệu

Bài viết này sẽ cung cấp cho bạn một phiên bản cải tiến của cấu hình Logstash, giúp tối ưu hóa quá trình thu thập và xử lý dữ liệu từ Kafka. Qua đó, bạn sẽ hiểu rõ hơn về cách sử dụng Logstash để tương tác với các nguồn dữ liệu quan trọng.

Cấu Hình Logstash Đã Được Sửa Đổi

Phần Input

plaintext Copy
input {
  kafka {
    bootstrap_servers => "10.100.30.32:9092"
    topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
    group_id => "log_consumer_group"
    auto_offset_reset => "earliest"
  }
}

Trong phần cấu hình input, chúng ta đang kết nối với Kafka để nhận dữ liệu từ các topic cụ thể. Điều này giúp chúng ta thu thập log từ hai nguồn quan trọng là ESMART-CATEGORY-LOGS và ESMART-GATEWAY-LOGS.

Phần Filter

plaintext Copy
filter {
  grok {
    match => {
      "message" => """
        {"instant":{"epochSecond":%{NUMBER:epoch_sec},"nanoOfSecond":%{NUMBER:nano_sec},"thread":"%{DATA:thread}","level":"%{WORD:level}","loggerName":"%{DATA:logger_name}","message":"%{GREEDYDATA:log_message}","endOfBatch":%{GREEDYDATA:end_of_batch},"loggerFqcn":"%{DATA:logger_fqcn}","contextMap":{"traceId":"%{DATA:trace_id}","spanId":"%{DATA:span_id}","className":"%{DATA:class_name}","clientIp":"%{IP:client_ip}","clientMessageId":"%{DATA:client_message_id}","clientTime":"%{TIMESTAMP_ISO8601:client_time}","duration":%{NUMBER:duration},"methodName":"%{DATA:method_name}","path":"%{DATA:path}","stackTrace":"%{GREEDYDATA:stack_trace}"},"threadId":%{NUMBER:thread_id},"threadPriority":%{NUMBER:thread_priority},"logType":"%{WORD:log_type}","application":"%{DATA:application}","localIp":"%{IP:local_ip}"}
      """
    }
  }
  
  mutate {
    add_field => {
      "ts" => "%{epoch_sec}"
      "ip" => "%{client_ip}"
      "msgId" => "%{client_message_id}"
      "duration" => "%{duration}"
      "method" => "%{method_name}"
      "path" => "%{path}"
      "app" => "%{application}"
      "localIp" => "%{local_ip}"
      "logType" => "%{log_type}"
    }
    remove_field => [
      "instant", "thread", "level", "logger_name", "message", 
      "endOfBatch", "logger_fqcn", "contextMap", 
      "threadId", "threadPriority"
    ]
  }

  date {
    match => [ "client_time", "ISO8601" ]  # Thay đổi để khớp với trường chính xác
    timezone => "+07:00"
    target => "@timestamp"
  }
  
  ruby {
    code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
  }
}

Trong phần này, chúng tôi sử dụng grok để trích xuất dữ liệu từ các log message và mutate để cải thiện cấu trúc dữ liệu đầu ra. Chúng tôi giữ lại các trường thông tin quan trọng và loại bỏ các trường không cần thiết.

Phần Output

plaintext Copy
output {
  elasticsearch {
    hosts => ["http://10.152.183.57:9200"]
    template => "/usr/share/logstash/templates/logstash_template.json"
    template_name => "logstash"
    template_overwrite => true
    index => "logstash-%{indexDay}"
    document_type => "_doc"
  }
  stdout {
    codec => rubydebug
  }
}

Cuối cùng, dữ liệu sau khi được xử lý sẽ được gửi đến Elasticsearch và hiển thị trên terminal để theo dõi.

Các Điểm Cần Lưu Ý

  • Đảm bảo rằng các đường dẫn và thông số kết nối là chính xác trước khi triển khai.
  • Theo dõi hiệu suất của Logstash để tối ưu hóa cho các khối lượng dữ liệu lớn.

Bằng cách áp dụng cấu hình này, bạn sẽ có được một hệ thống thu thập log hiệu quả hơn, giúp việc phân tích và giám sát trở nên dễ dàng và chính xác hơn.
source: viblo

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào