Xây Dựng Hệ Thống Phát Hiện Mối Đe Dọa AI với AWS Bedrock và Pinecone
Tôi đã xây dựng một hệ thống phát hiện mối đe dọa sẵn sàng sản xuất, phân tích các cuộc tấn công honeypot theo thời gian thực với độ chính xác 90/100.
Demo: Xem video 5 phút
Mã nguồn: Xem kho GitHub
Thách Thức
Các nhóm bảo mật thường sử dụng phân tích mối đe dọa thủ công, nhưng vấn đề là nó không thể mở rộng và các hệ thống dựa trên quy tắc truyền thống thường bỏ lỡ những cuộc tấn công tinh vi.
Do đó, tôi đã muốn tạo ra một hệ thống có khả năng:
- Phân tích mối đe dọa theo thời gian thực (< 3 giây)
- Học từ các mẫu tấn công lịch sử bằng cách sử dụng AI
- Tự động mở rộng với kiến trúc serverless
- Cung cấp thông tin có thể hành động, không chỉ là cảnh báo tĩnh
Xem demo 5 phút: Video Demo
Tổng Quan Kiến Trúc
Dưới đây là quy trình tổng thể:
Honeypot → S3 → Lambda → Bedrock (Claude) → Pinecone + DynamoDB + SNS
Tại Sao Chọn Thiết Kế Này?
- Xử Lý Dựa Trên Sự Kiện: Lambda chỉ kích hoạt khi có nhật ký mới trong S3, dẫn đến chi phí không có trong thời gian nhàn rỗi.
- Phân Tích AI Trước Tiên: AWS Bedrock với Claude cung cấp phân tích mối đe dọa tinh vi, hiểu ngữ cảnh chứ không chỉ mẫu.
- Tìm Kiếm Vector: Pinecone cho phép kiểm tra sự tương đồng giữa các cuộc tấn công trước đó để phát hiện các chiến dịch hoặc mẫu lặp lại.
- Chiến Lược Lưu Trữ Đa Dạng:
- DynamoDB cho các truy vấn có cấu trúc
- Pinecone cho tìm kiếm ngữ nghĩa
- S3 để lưu trữ dữ liệu nhật ký thô.
Các Điểm Nổi Bật Trong Triển Khai
Tích Hợp AWS Bedrock
Một prompt có cấu trúc đảm bảo kết quả AI nhất quán và dễ phân tích, như dưới đây:
python
import json
def invoke_bedrock(logs):
prompt_text = f"""Phân tích nhật ký bảo mật này và cung cấp đánh giá mối đe dọa.
Dữ liệu nhật ký: {json.dumps(log_data, indent=2)}
Cung cấp phản hồi của bạn theo định dạng này:
Điểm Mối Đe Dọa: [0-100]
Nhãn Mối Đe Dọa: [loại mối đe dọa]
Giải Thích: [giải thích ngắn gọn]"""
payload = {
"prompt": f"\\n\\nCon người: {prompt_text}\\n\\nTrợ lý:",
"max_tokens_to_sample": MAX_TOKENS,
"temperature": 0.1,
"top_p": 0.9
}
resp = client.invoke_model(
modelId="anthropic.claude-v2",
body=json.dumps(payload).encode(),
contentType="application/json"
)
Xử Lý Định Dạng JSONL
Nhật ký honeypot Cowrie được lưu trữ ở định dạng JSONL (nhiều đối tượng JSON trên một dòng). Tôi đã viết một bộ phân tích để xử lý hiệu quả nhiều đối tượng JSON trên mỗi dòng và xử lý các trường hợp ngoại lệ một cách gọn gàng.
Dưới đây là một phần của mã, bạn có thể xem kho GitHub để biết triển khai đầy đủ:
python
import json
def parse_jsonl(body):
logs = []
for line in body.strip().split('\\n'):
if line.strip():
remaining = line.strip()
while remaining:
try:
obj, idx = json.JSONDecoder().raw_decode(remaining)
logs.append(obj)
remaining = remaining[idx:].strip()
except json.JSONDecodeError:
break
return logs
Nhúng Vector
Để thực hiện các truy vấn tương đồng có khả năng mở rộng, tôi đã tạo ra các nhúng 1536-dimension xác định và lưu trữ chúng trong Pinecone để tìm kiếm nhanh chóng:
python
import json
import hashlib
def invoke_embedding(log):
text = json.dumps(log, separators=(',', ':'))
hash_obj = hashlib.sha256(text.encode())
hash_bytes = hash_obj.digest()
embedding = []
for i in range(0, len(hash_bytes), 2):
if i+1 < len(hash_bytes):
val = (hash_bytes[i] * 256 + hash_bytes[i+1]) / 65535.0
embedding.append(val)
# Đệm để có 1536 chiều cho Pinecone
while len(embedding) < 1536:
embedding.extend(embedding[:min(len(embedding), 1536-len(embedding))])
return embedding[:1536]
Cân Nhắc Trong Sản Xuất
Tối Ưu Hóa Chi Phí
AWS Bedrock tính phí theo token, vì vậy tôi đã triển khai theo dõi chi phí:
python
def estimate_cost(input_tokens, output_tokens):
INPUT_RATE = 0.0008 # mỗi 1K token
OUTPUT_RATE = 0.0016 # mỗi 1K token
return (input_tokens / 1000) * INPUT_RATE + (output_tokens / 1000) * OUTPUT_RATE
Giám Sát & Cảnh Báo
Bảng điều khiển CloudWatch theo dõi các chỉ số sau:
- Thời gian gọi
- Chi phí ước tính cho mỗi phân tích
- Tỷ lệ lỗi
Xử Lý Lỗi
Logic dự phòng đảm bảo tính khả dụng cao: nếu Bedrock không khả dụng, một bộ phân loại dựa trên quy tắc nhẹ sẽ hoạt động, ngăn ngừa sự chậm trễ trong xử lý hoặc tăng chi phí.
Kết Quả
Sau 2 tuần phát triển và thử nghiệm:
- Độ Chính Xác Phát Hiện: 90/100 trong các kịch bản thực thi malware
- Thời Gian Phản Hồi: <3 giây cho quá trình xử lý toàn bộ
- Khả Năng Mở Rộng: Kiến trúc serverless xử lý các đỉnh lưu lượng một cách liền mạch
- Hiệu Quả Chi Phí: ~$0.01 cho mỗi mối đe dọa đã xử lý
Bài Học Rút Ra
- Kỹ Thuật Prompting là Tất Cả: Các prompt được cấu trúc tốt thúc đẩy kết quả AI nhất quán và chất lượng cao.
- Xử Lý Lỗi Tiết Kiệm Chi Phí: Thời gian timeout của Lambda và lỗi Bedrock có thể tốn kém. Xử lý lỗi mạnh mẽ với các phương án dự phòng giữ cho chi phí có thể dự đoán.
- Kiểm Tra Sớm và Thường Xuyên: Các bài kiểm tra đơn vị và tích hợp toàn diện đã phát hiện ra các lỗi tinh vi trước khi đưa vào sản xuất.
- Giám Sát Ngay Từ Ngày Đầu: Các chỉ số CloudWatch đã cung cấp cái nhìn giúp tối ưu hóa hiệu suất.
Kế Hoạch Tương Lai
Những cải tiến trong tương lai mà tôi đang xem xét:
- Đào tạo một mô hình ML tùy chỉnh với dữ liệu tấn công lịch sử
- Xây dựng bảng điều khiển SOC theo thời gian thực
- Tích hợp các nguồn và định dạng nhật ký bổ sung
- Hành động phản hồi tự động, chẳng hạn như chặn IP động
Thử Nghiệm
Mã nguồn hoàn chỉnh có sẵn trên GitHub: GitHub Repository
Xem demo: Video Demo
Kết Nối
Bạn quan tâm đến ứng dụng AI trong an ninh mạng?
Kết nối với tôi trên LinkedIn hoặc tham gia Nhóm Người Dùng AWS London, Ontario, để thảo luận về GenAI, serverless, và phát hiện mối đe dọa hiện đại.
Được xây dựng với AWS Bedrock, Pinecone, Lambda, và niềm đam mê giải quyết các thách thức an ninh thực sự.