0
0
Lập trình
TT

Hướng Dẫn Sử Dụng Spark Streaming Kết Nối Với Kafka

Đăng vào 3 ngày trước

• 4 phút đọc

Chủ đề:

SparkkafkaPyspark

Giới thiệu về Apache Spark

Apache Spark là một framework mạnh mẽ cho xử lý dữ liệu lớn, được ra đời nhằm khắc phục những hạn chế của Hadoop. Trong khi Hadoop thực hiện các tác vụ theo hướng batch và có thời gian xử lý lâu, Spark hỗ trợ cả xử lý theo lô và theo dòng (streaming), đặc biệt là cho các thuật toán Machine Learning có tính chất lặp đi lặp lại, giúp tiết kiệm thời gian đáng kể.

Spark bao gồm 5 thành phần chính:

  • Spark Core: Là thành phần cốt lõi của Spark, thực hiện các công việc như quản lý bộ nhớ, thao tác I/O, lập lịch.
  • Spark SQL: Module xử lý dữ liệu có cấu trúc, cho phép truy vấn dữ liệu bằng SQL.
  • Spark Streaming: Thành phần hỗ trợ xử lý dữ liệu streaming, với đầu vào từ nhiều nguồn như Kafka, Flume.
  • Spark MLlib: Thư viện tích hợp cho Machine Learning.
  • Spark GraphX: Cung cấp API cho các tính toán đồ thị.

Trong bài viết này, chúng ta sẽ tìm hiểu cách cài đặt và sử dụng Spark Streaming để xử lý dữ liệu từ Kafka.

Cài Đặt Apache Spark Standalone

Để đơn giản hóa, chúng ta sẽ cài đặt Spark standalone trực tiếp trên máy tính (cụ thể là Ubuntu). Bạn cần cài đặt JDK, Scala và Python trước.

  • Cài đặt JDK: Bạn có thể tham khảo tài liệu tại đây.
  • Cài đặt Scala: Xem tài liệu tại đây.
  • Cài đặt Python: Chỉ cần chạy lệnh sudo apt install python3 trong terminal.

Sau đó, bạn truy cập trang tải xuống Spark để tải về phiên bản phù hợp. Sau khi tải về, bạn giải nén file với lệnh sau:

Copy
tar xvzf <file_name>.tgz

Tiếp theo, cấu hình môi trường cho Spark bằng cách mở file .profile tại thư mục home:

Copy
sudo nano ~/.profile

Thêm các dòng sau vào cuối file:

Copy
export SPARK_HOME=<absolute_spark_dir>
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_PYTHON=/usr/bin/python3

Sau khi cấu hình xong, khởi động Spark với lệnh:

Copy
$SPARK_HOME/sbin/start-all.sh

Truy cập vào http://localhost:8080/ để mở giao diện web UI của Spark.

Hướng Dẫn Kết Nối Spark Streaming Với Kafka

Chúng ta sẽ tạo một folder và file code để triển khai luồng dữ liệu từ Kafka tới Spark Streaming và sau đó đến ClickHouse.

Cấu Hình Biến Môi Trường

Bạn cần tạo một file .env với nội dung xác định các biến môi trường như sau:

Copy
CLICKHOUSE_HOST=127.0.0.1
CLICKHOUSE_PORT=9000
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=tuandeptrai

KAFKA_HOST=localhost
KAFKA_BROKER1_PORT=9091
KAFKA_BROKER2_PORT=9092
KAFKA_BROKER3_PORT=9093

Lập Trình Với PySpark

Đầu tiên, cài đặt pyspark bằng lệnh:

Copy
pip install pyspark

Sau đó, trong file Python của bạn, cần import các thư viện và tải biến từ .env:

python Copy
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, IntegerType
from pyspark.sql.functions import from_json, current_timestamp
import os
from dotenv import load_dotenv

load_dotenv()

CLICKHOUSE_HOST = os.getenv('CLICKHOUSE_HOST')
CLICKHOUSE_PORT = os.getenv('CLICKHOUSE_PORT')
CLICKHOUSE_USER = os.getenv('CLICKHOUSE_USER')
CLICKHOUSE_PASSWORD = os.getenv('CLICKHOUSE_PASSWORD')

KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_BROKER1_PORT = os.getenv('KAFKA_BROKER1_PORT')
KAFKA_BROKER2_PORT = os.getenv('KAFKA_BROKER2_PORT')
KAFKA_BROKER3_PORT = os.getenv('KAFKA_BROKER3_PORT')

Định Nghĩa Schema cho Dữ Liệu

Sử dụng StructType để định nghĩa schema cho bảng dữ liệu mà bạn sẽ đẩy vào ClickHouse:

python Copy
json_schema = StructType([
    StructField('sslsni', StringType(), True),
    StructField('subscriberid', StringType(), True),
    StructField('hour_key', IntegerType(), True),
    StructField('count', IntegerType(), True),
    StructField('up', IntegerType(), True),
    StructField('down', IntegerType(), True)
])

Khởi Tạo Spark Session

Khởi tạo Spark session và cấu hình các thông số cần thiết:

python Copy
spark = SparkSession.builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("spark://<your_spark_master>:7077") \
    .getOrCreate()

Đọc Dữ Liệu từ Kafka

python Copy
# Đọc dữ liệu từ Kafka

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f"{KAFKA_HOST}:{KAFKA_BROKER1_PORT},{KAFKA_HOST}:{KAFKA_BROKER2_PORT},{KAFKA_HOST}:{KAFKA_BROKER3_PORT}") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "test-url-1204") \
    .load()

json_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as msg_value")
json_expanded_df = json_df.withColumn("msg_value", from_json(json_df["msg_value"], json_schema)).select("msg_value.*")
exploded_df = json_expanded_df.select("sslsni", "subscriberid", "hour_key", "count", "up", "down") 
df_with_date = exploded_df.withColumn("inserted_time", current_timestamp())

Chèn Dữ Liệu vào ClickHouse

Chúng ta sẽ định nghĩa hàm foreach_batch_function để chèn dữ liệu vào ClickHouse:

python Copy
def foreach_batch_function(df, epoch_id):
    df.write \
        .format("jdbc") \
        .mode("append") \
        .option("driver", "com.github.housepower.jdbc.ClickHouseDriver") \
        .option("url", "jdbc:clickhouse://" + CLICKHOUSE_HOST + ":" + CLICKHOUSE_PORT) \
        .option("user", CLICKHOUSE_USER) \
        .option("password", CLICKHOUSE_PASSWORD) \
        .option("dbtable", "default.raw_url") \
        .save()

writing_df = df_with_date \
    .writeStream \
    .foreachBatch(foreach_batch_function) \
    .start()

writing_df.awaitTermination()

Chạy Ứng Dụng Spark

Cuối cùng, để chạy ứng dụng Spark, sử dụng lệnh sau:

Copy
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,com.github.housepower:clickhouse-integration-spark_2.12:2.7.1,com.github.housepower:clickhouse-native-jdbc-shaded:2.7.1 streaming.py

Lưu ý rằng bạn cần kiểm tra định dạng và phiên bản của các package để đảm bảo tương thích với Scala và Java mà bạn đang sử dụng.

Với hướng dẫn trên, bạn đã có thể dễ dàng triển khai một ứng dụng Spark Streaming cơ bản sử dụng Kafka làm nguồn dữ liệu. Nếu bạn có thắc mắc nào, đừng ngần ngại để lại câu hỏi trong phần bình luận. Chúc bạn thành công!
source: viblo

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào