Giới thiệu về Apache Spark
Apache Spark là một framework mạnh mẽ cho xử lý dữ liệu lớn, được ra đời nhằm khắc phục những hạn chế của Hadoop. Trong khi Hadoop thực hiện các tác vụ theo hướng batch và có thời gian xử lý lâu, Spark hỗ trợ cả xử lý theo lô và theo dòng (streaming), đặc biệt là cho các thuật toán Machine Learning có tính chất lặp đi lặp lại, giúp tiết kiệm thời gian đáng kể.
Spark bao gồm 5 thành phần chính:
- Spark Core: Là thành phần cốt lõi của Spark, thực hiện các công việc như quản lý bộ nhớ, thao tác I/O, lập lịch.
- Spark SQL: Module xử lý dữ liệu có cấu trúc, cho phép truy vấn dữ liệu bằng SQL.
- Spark Streaming: Thành phần hỗ trợ xử lý dữ liệu streaming, với đầu vào từ nhiều nguồn như Kafka, Flume.
- Spark MLlib: Thư viện tích hợp cho Machine Learning.
- Spark GraphX: Cung cấp API cho các tính toán đồ thị.
Trong bài viết này, chúng ta sẽ tìm hiểu cách cài đặt và sử dụng Spark Streaming để xử lý dữ liệu từ Kafka.
Cài Đặt Apache Spark Standalone
Để đơn giản hóa, chúng ta sẽ cài đặt Spark standalone trực tiếp trên máy tính (cụ thể là Ubuntu). Bạn cần cài đặt JDK, Scala và Python trước.
- Cài đặt JDK: Bạn có thể tham khảo tài liệu tại đây.
- Cài đặt Scala: Xem tài liệu tại đây.
- Cài đặt Python: Chỉ cần chạy lệnh
sudo apt install python3
trong terminal.
Sau đó, bạn truy cập trang tải xuống Spark để tải về phiên bản phù hợp. Sau khi tải về, bạn giải nén file với lệnh sau:
tar xvzf <file_name>.tgz
Tiếp theo, cấu hình môi trường cho Spark bằng cách mở file .profile
tại thư mục home:
sudo nano ~/.profile
Thêm các dòng sau vào cuối file:
export SPARK_HOME=<absolute_spark_dir>
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_PYTHON=/usr/bin/python3
Sau khi cấu hình xong, khởi động Spark với lệnh:
$SPARK_HOME/sbin/start-all.sh
Truy cập vào http://localhost:8080/
để mở giao diện web UI của Spark.
Hướng Dẫn Kết Nối Spark Streaming Với Kafka
Chúng ta sẽ tạo một folder và file code để triển khai luồng dữ liệu từ Kafka tới Spark Streaming và sau đó đến ClickHouse.
Cấu Hình Biến Môi Trường
Bạn cần tạo một file .env
với nội dung xác định các biến môi trường như sau:
CLICKHOUSE_HOST=127.0.0.1
CLICKHOUSE_PORT=9000
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=tuandeptrai
KAFKA_HOST=localhost
KAFKA_BROKER1_PORT=9091
KAFKA_BROKER2_PORT=9092
KAFKA_BROKER3_PORT=9093
Lập Trình Với PySpark
Đầu tiên, cài đặt pyspark
bằng lệnh:
pip install pyspark
Sau đó, trong file Python của bạn, cần import các thư viện và tải biến từ .env
:
python
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, IntegerType
from pyspark.sql.functions import from_json, current_timestamp
import os
from dotenv import load_dotenv
load_dotenv()
CLICKHOUSE_HOST = os.getenv('CLICKHOUSE_HOST')
CLICKHOUSE_PORT = os.getenv('CLICKHOUSE_PORT')
CLICKHOUSE_USER = os.getenv('CLICKHOUSE_USER')
CLICKHOUSE_PASSWORD = os.getenv('CLICKHOUSE_PASSWORD')
KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_BROKER1_PORT = os.getenv('KAFKA_BROKER1_PORT')
KAFKA_BROKER2_PORT = os.getenv('KAFKA_BROKER2_PORT')
KAFKA_BROKER3_PORT = os.getenv('KAFKA_BROKER3_PORT')
Định Nghĩa Schema cho Dữ Liệu
Sử dụng StructType
để định nghĩa schema cho bảng dữ liệu mà bạn sẽ đẩy vào ClickHouse:
python
json_schema = StructType([
StructField('sslsni', StringType(), True),
StructField('subscriberid', StringType(), True),
StructField('hour_key', IntegerType(), True),
StructField('count', IntegerType(), True),
StructField('up', IntegerType(), True),
StructField('down', IntegerType(), True)
])
Khởi Tạo Spark Session
Khởi tạo Spark session và cấu hình các thông số cần thiết:
python
spark = SparkSession.builder \
.appName("Streaming from Kafka") \
.config("spark.streaming.stopGracefullyOnShutdown", True) \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
.config("spark.sql.shuffle.partitions", 4) \
.master("spark://<your_spark_master>:7077") \
.getOrCreate()
Đọc Dữ Liệu từ Kafka
python
# Đọc dữ liệu từ Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", f"{KAFKA_HOST}:{KAFKA_BROKER1_PORT},{KAFKA_HOST}:{KAFKA_BROKER2_PORT},{KAFKA_HOST}:{KAFKA_BROKER3_PORT}") \
.option("failOnDataLoss", "false") \
.option("subscribe", "test-url-1204") \
.load()
json_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as msg_value")
json_expanded_df = json_df.withColumn("msg_value", from_json(json_df["msg_value"], json_schema)).select("msg_value.*")
exploded_df = json_expanded_df.select("sslsni", "subscriberid", "hour_key", "count", "up", "down")
df_with_date = exploded_df.withColumn("inserted_time", current_timestamp())
Chèn Dữ Liệu vào ClickHouse
Chúng ta sẽ định nghĩa hàm foreach_batch_function
để chèn dữ liệu vào ClickHouse:
python
def foreach_batch_function(df, epoch_id):
df.write \
.format("jdbc") \
.mode("append") \
.option("driver", "com.github.housepower.jdbc.ClickHouseDriver") \
.option("url", "jdbc:clickhouse://" + CLICKHOUSE_HOST + ":" + CLICKHOUSE_PORT) \
.option("user", CLICKHOUSE_USER) \
.option("password", CLICKHOUSE_PASSWORD) \
.option("dbtable", "default.raw_url") \
.save()
writing_df = df_with_date \
.writeStream \
.foreachBatch(foreach_batch_function) \
.start()
writing_df.awaitTermination()
Chạy Ứng Dụng Spark
Cuối cùng, để chạy ứng dụng Spark, sử dụng lệnh sau:
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,com.github.housepower:clickhouse-integration-spark_2.12:2.7.1,com.github.housepower:clickhouse-native-jdbc-shaded:2.7.1 streaming.py
Lưu ý rằng bạn cần kiểm tra định dạng và phiên bản của các package để đảm bảo tương thích với Scala và Java mà bạn đang sử dụng.
Với hướng dẫn trên, bạn đã có thể dễ dàng triển khai một ứng dụng Spark Streaming cơ bản sử dụng Kafka làm nguồn dữ liệu. Nếu bạn có thắc mắc nào, đừng ngần ngại để lại câu hỏi trong phần bình luận. Chúc bạn thành công!
source: viblo