0
0
Lập trình
Admin Team
Admin Teamtechmely

Hướng Dẫn Xử Lý Log Từ Kafka Bằng Logstash

Đăng vào 5 ngày trước

• 3 phút đọc

Chủ đề:

apache kafkalogst

Khi làm việc với hệ thống log phức tạp, việc xử lý log từ Kafka một cách hiệu quả là rất quan trọng. Trong bài viết này, chúng ta sẽ tìm hiểu cách cấu hình tệp logstash.conf để thu thập và phân tích log từ Kafka. Chúng ta sẽ sử dụng grok để phân tích cú pháp log và mutate để loại bỏ những trường không cần thiết.

Cấu Hình Logstash Để Xử Lý Log Từ Kafka

Phần Nhập Dữ Liệu

Đầu tiên, chúng ta sẽ định nghĩa phần đầu vào để kết nối với Kafka:

plaintext Copy
input {
  kafka {
    bootstrap_servers => "10.100.30.32:9092"
    topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
    group_id => "log_consumer_group"
    auto_offset_reset => "earliest"
  }
}

Phần Lọc Dữ Liệu

Sau đó, chúng ta sẽ phân tích cú pháp log và thực hiện các biến đổi cần thiết:

  1. Sử dụng gsub trong phần mutate để loại bỏ các ký tự không hợp lệ.
  2. Sử dụng grok để phân tích các trường trong log.
  3. Thêm và loại bỏ các trường không cần thiết.
plaintext Copy
filter {  
  mutate {
    gsub => ["message", "\u001b|\n", ""]
  }

  grok {
    match => {
      "message" => """
          {"instant":{"epochSecond":%{NUMBER:epoch_sec},
          "nanoOfSecond":%{NUMBER:nano_sec},
          "thread":"%{DATA:thread},
          "level":"%{WORD:level},
          "loggerName":"%{DATA:logger_name},
          "message":"%{GREEDYDATA:log_message},
          "endOfBatch":%{GREEDYDATA:end_of_batch},
          "loggerFqcn":"%{DATA:logger_fqcn},
          "contextMap":{"traceId":"%{DATA:trace_id},
          "spanId":"%{DATA:span_id},
          "className":"%{DATA:class_name},
          "clientIp":"%{IP:client_ip},
          "clientMessageId":"%{DATA:client_message_id},
          "clientTime":"%{TIMESTAMP_ISO8601:client_time},
          "duration":%{NUMBER:duration},
          "methodName":"%{DATA:method_name},
          "path":"%{DATA:path},
          "stackTrace":"%{GREEDYDATA:stack_trace}"},
          "threadId":%{NUMBER:thread_id},
          "threadPriority":%{NUMBER:thread_priority},
          "logType":"%{WORD:log_type},
          "application":"%{DATA:application},
          "localIp":"%{IP:local_ip}"}
      """
    }
  }

  mutate {
    add_field => {
      "ts" => "%{epoch_sec}"
      "ip" => "%{client_ip}"
      "msgId" => "%{client_message_id}"
      "duration" => "%{duration}"
      "method" => "%{method_name}"
      "path" => "%{path}"
      "app" => "%{application}"
      "localIp" => "%{local_ip}"
      "logType" => "%{log_type}"
    }
    remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"]
  }

  # Chuyển đổi timestamp
  date {
    match => [ "client_time", "ISO8601" ]
    timezone => "+07:00"
    target => "@timestamp"
  }

  # Thêm trường ngày tháng
  ruby {
    code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
  }
}

Phần Xuất Dữ Liệu

Cuối cùng, định nghĩa phần xuất để gửi log đến Elasticsearch:

plaintext Copy
output {
  elasticsearch {
    hosts => ["http://10.152.183.57:9200"]
    template => "/usr/share/logstash/templates/logstash_template.json"
    template_name => "logstash"
    template_overwrite => true
    index => "logstash-%{indexDay}"
    document_type => "_doc"
    codec => json
  }
  stdout {
    codec => rubydebug
  }
}

Kết Luận

Bằng cách áp dụng cấu hình trên, bạn có thể dễ dàng xử lý log từ Kafka với Logstash. Điều này không chỉ giúp bạn phân tích log một cách hiệu quả mà còn giúp loại bỏ những thông tin không cần thiết, từ đó tối ưu hóa quy trình thu thập dữ liệu của bạn.
source: viblo

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào