0
0
Lập trình
Admin Team
Admin Teamtechmely

Truyền dữ liệu Binance thời gian thực vào Confluent Cloud và PostgreSQL

Đăng vào 1 tháng trước

• 10 phút đọc

Chủ đề:

KungFuTech

1. Giới thiệu

Các đường dẫn dữ liệu thời gian thực đã trở nên quan trọng trong dịch vụ tài chính, đặc biệt là trong lĩnh vực tiền điện tử, nơi mà sự biến động giá xảy ra trong vài mili giây. Trong bài viết này, chúng ta sẽ xây dựng một đường dẫn streaming hoàn chỉnh, thu thập dữ liệu ticker trực tiếp từ Binance REST API, phát dữ liệu vào Confluent Cloud (Kafka) và lưu trữ nó vào PostgreSQL để phân tích và trực quan hóa sau này.

Đường dẫn này minh họa một kiến trúc hướng sự kiện cổ điển:

  • Producer → Lấy giá tiền điện tử từ Binance và xuất bản tin nhắn vào Kafka.
  • Consumer → Đọc từ Kafka, phân tích payload và chèn vào PostgreSQL.
  • Cơ sở dữ liệu + Views → Cho phép chúng ta xác thực và phân tích dữ liệu đã thu thập bằng các công cụ như DBeaver.

2. Cài đặt Dự án

Thư mục dự án của bạn chứa:
producer.py → trích xuất dữ liệu ticker 24h từ Binance & phát dữ liệu vào Kafka.
consumer.py → tiêu thụ từ Kafka & chèn vào PostgreSQL.
.env → chứa thông tin bảo mật (Kafka, Binance, PostgreSQL).
requirements.txt → các phụ thuộc Python.
.gitignore → giữ cho .env và các tệp không cần thiết ra khỏi kiểm soát phiên bản.

Phụ thuộc(requirements.txt)

Copy
confluent-kafka==2.11.1
psycopg2-binary==2.9.10
websockets==15.0.1
sqlalchemy
python-dotenv

Biến môi trường(.env)

Copy
BOOTSTRAP_SERVERS=pkc-xxxx.gcp.confluent.cloud:9092
CONFLUENT_API_KEY=xxxx
CONFLUENT_SECRET_KEY=xxxx
KAFKA_TOPIC=binance_topic
DATABASE_URL=postgresql://user:password@host:port/db?sslmode=require

3. Phát dữ liệu Binance vào Kafka

Kịch bản producer lấy dữ liệu ticker thời gian thực từ REST API của Binance và đẩy nó vào chủ đề Kafka binance_topic.

python Copy
import requests
import json, os, time, logging
from confluent_kafka import Producer

from dotenv import load_dotenv
from typing import Dict, Any, List
load_dotenv()

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("binance_producer")

binance_url = "https://api.binance.com/api/v3/ticker/24hr"

symbols: List[str] = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
kafka_config = {
    "bootstrap.servers": os.getenv("BOOTSTRAP_SERVERS"),
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv("CONFLUENT_API_KEY"),
    "sasl.password": os.getenv("CONFLUENT_SECRET_KEY"),
    "broker.address.family": "v4",
    "message.send.max.retries": 5,
    "retry.backoff.ms": 500,
}

topic = os.getenv("KAFKA_TOPIC")
producer = Producer(kafka_config)

def binance_extract(symbols: str) -> Dict[str, Any]:
    params = {"symbol": symbols}
    response = requests.get(binance_url, params=params)
    response.raise_for_status()
    data = response.json()
    data["extracted_symbol"] = symbols
    data["source"] = "binance24hr_ticker"
    return data

def delivery_report(err, msg):
    if err is not None:
        logger.error(f"Message delivery failed: {err}")
    else:
        logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def data_streaming():
    for symbol in symbols:
        try:
            data = binance_extract(symbol)
            producer.produce(
                topic,
                key = symbol,
                value = json.dumps(data),
                callback = delivery_report
            )
        except Exception as e:
            logger.error(f"Error fetching data for {symbol}: {e}")
    producer.flush()


if __name__ == "__main__":
    while True:
        data_streaming()
        logger.info("Batch sent")
        time.sleep(10)

Kịch bản này chạy trong một vòng lặp mỗi 10 giây, gửi một lô dữ liệu ticker mới.

4. Tiêu thụ và Lưu trữ vào PostgreSQL

Consumer đọc các sự kiện từ chủ đề Kafka, phân tích chúng và chèn các hàng vào PostgreSQL bằng SQLAlchemy.

python Copy
import os
from dotenv import load_dotenv
from confluent_kafka import Consumer, KafkaException
from json import loads
from datetime import datetime, timezone
import psycopg2
from sqlalchemy import create_engine, text

# Tải các biến môi trường
load_dotenv()

conn_string = os.getenv("DATABASE_URL")
if not conn_string:
    raise ValueError("DATABASE_URL environment variable not set")

# Tạo động cơ SQLAlchemy
engine = create_engine(conn_string, pool_pre_ping=True)

# Kiểm tra kết nối
with engine.connect() as conn:
    print("Database connection successful")

# Tạo bảng
with engine.begin() as conn:
    conn.execute(text("""
                CREATE TABLE IF NOT EXISTS binance_ticker_24h (
                    id BIGSERIAL PRIMARY KEY,
                    symbol TEXT NOT NULL,
                    price_change NUMERIC(38, 18),
                    price_change_percent NUMERIC(38, 18),
                    open_price NUMERIC(38, 18),
                    close_price NUMERIC(38, 18),
                    high_price NUMERIC(38, 18),
                    low_price NUMERIC(38, 18),
                    volume NUMERIC(38, 18),
                    ask_price NUMERIC(38, 18),
                    bid_price NUMERIC(38, 18),
                    ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
                )
            """))
    print("PostgreSQL table created successfully")

# Cấu hình Consumer Confluent Kafka
conf = {
    'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': os.getenv('CONFLUENT_API_KEY'),
    'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'),
    'group.id': 'binance-group-id',
    'auto.offset.reset': 'earliest'
}

# Khởi tạo consumer Kafka
consumer = Consumer(conf)
topic = os.getenv("KAFKA_TOPIC")
consumer.subscribe([topic])
print(f"Subscribed to topic: {topic}")

# --- Tiêu thụ & chèn ---
try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        try:
            payload = loads(msg.value().decode('utf-8'))
            symbol = payload.get("symbol") or payload.get("extracted_symbol")

            price_change = payload.get("priceChange")
            price_change_percent = payload.get("priceChangePercent")
            open_price = payload.get("openPrice")
            close_price = payload.get("lastPrice")
            high_price = payload.get("highPrice")
            low_price = payload.get("lowPrice")
            volume = payload.get("volume")
            ask_price = payload.get("askPrice")
            bid_price = payload.get("bidPrice")

            # Chèn dữ liệu vào PostgreSQL
            insert_sql = text("""
                INSERT INTO binance_ticker_24h (
                    symbol, price_change, price_change_percent, open_price, close_price,
                    high_price, low_price, volume, ask_price, bid_price
                ) VALUES (
                    :symbol, :price_change, :price_change_percent, :open_price, :close_price,
                    :high_price, :low_price, :volume, :ask_price, :bid_price
                )
            """)
            with engine.begin() as conn:
                conn.execute(insert_sql, {
                    "symbol": symbol,
                    "price_change": price_change,
                    "price_change_percent": price_change_percent,
                    "open_price": open_price,
                    "close_price": close_price,
                    "high_price": high_price,
                    "low_price": low_price,
                    "volume": volume,
                    "ask_price": ask_price,
                    "bid_price": bid_price
                })
            print(f"Inserted {symbol}: close={close_price} high={high_price} low={low_price} ")
        except Exception as e:
            print(f"Error processing message: {e}")
except KeyboardInterrupt:
    print("Consumer stopped manually")
finally:
    consumer.close()
    print("Consumer closed")

Khi tôi lần đầu kết nối với psycopg2, kết nối liên tục thất bại vì các biến .env của tôi không khớp chính xác (nhạy cảm với chữ hoa chữ thường và nhiều tham số phân mảnh như HOST, USER, PORT), vì vậy psycopg2 đã mặc định sử dụng người dùng hệ thống cục bộ của tôi (oliver) thay vì thông tin xác thực trên đám mây. Ngược lại, với SQLAlchemy, tôi chỉ cần một biến môi trường duy nhất (DATABASE_URL) bao gồm tất cả thông tin kết nối (người dùng, mật khẩu, máy chủ, cổng, cơ sở dữ liệu, chế độ SSL) trong một chuỗi, loại bỏ các sự cố khớp và đơn giản hóa xác thực. Sự trừu tượng này không chỉ giải quyết vấn đề tải thông tin xác thực mà còn cung cấp cho tôi khả năng xử lý kết nối sạch hơn, quản lý kết nối và tính di động, khiến SQLAlchemy trở thành lựa chọn đáng tin cậy và dễ bảo trì hơn cho đường dẫn streaming của tôi.

5. Xác thực dữ liệu trong PostgreSQL

Khi consumer đang chạy, bạn có thể truy vấn bảng trực tiếp. Sử dụng DBeaver hoặc psql:

Kết quả truy vấn DBeaver cho thấy giá Binance mới nhất từ view.

6. Kiến trúc Đầu-cuối

Toàn bộ đường dẫn trông như sau:

Binance API → Producer → Confluent Cloud (Kafka) → Consumer → PostgreSQL → DBeaver

7. Kết luận

Với chỉ một vài kịch bản, chúng ta đã xây dựng một đường dẫn streaming tiền điện tử thời gian thực:

  • Dữ liệu Binance được thu thập mỗi 10 giây.
  • Được phát vào Kafka trên Confluent Cloud.
  • Được lưu trữ vào PostgreSQL (Aiven).
  • Được truy vấn và xác thực trong DBeaver.

Kiến trúc này nhẹ nhưng có thể mở rộng:

  • Thêm nhiều ký hiệu vào producer để mở rộng độ bao phủ.
  • Sử dụng ksqlDB trong Confluent Cloud cho các biến đổi thời gian thực.
  • Kết nối PostgreSQL với các công cụ BI như Metabase hoặc Power BI cho bảng điều khiển.

Các đường dẫn như vậy tạo thành xương sống của các hệ thống giao dịch hiện đại, nền tảng phân tích thị trường và động cơ rủi ro thời gian thực.

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào