Sử Dụng Hàm Bậc Cao Trong PySpark Hiệu Quả
Giới thiệu
Nếu bạn là một lập trình viên đang làm việc với PySpark và đã từng cần áp dụng logic trong một cột kiểu mảng, có thể bạn đã nghĩ ngay đến việc tạo một UDF (User-Defined Function). Đây là một giải pháp nhanh và linh hoạt, nhưng nó ẩn chứa một vấn đề lớn về hiệu suất.
Trong bài viết này, chúng ta sẽ khám phá cách sử dụng Hàm Bậc Cao (Higher-Order Functions - HOFs) để thực hiện công việc này một cách hiệu quả hơn.
Chi phí ẩn của UDF trong Python
Khi sử dụng UDF trong Python, chúng ta thường không nhận ra các bước tốn kém mà Spark phải thực hiện:
- Xử lý tuần tự: Dữ liệu trong cột phải ra khỏi môi trường tối ưu của Spark (JVM).
- Chuyển giao: Dữ liệu được gửi đến một tiến trình Python.
- Thực thi: Hàm Python của bạn được thực thi.
- Trả về: Kết quả được gửi lại từ Python về JVM.
Việc này tạo ra một gáo chắn lớn, đặc biệt khi bạn xử lý tập dữ liệu với hàng triệu hoặc hàng tỷ dòng, có thể làm tăng thời gian thực thi của job hàng giờ hoặc thậm chí gây lỗi.
Giải pháp: Hàm Bậc Cao (HOFs)
HOFs là các hàm tích hợp sẵn trong Spark SQL cho phép nhận các hàm khác (thường là lambda) như đối số để xử lý dữ liệu phức tạp, chẳng hạn như mảng và bản đồ.
Lợi ích của HOFs
- Tối ưu hóa hiệu suất: Tất cả các phép toán xảy ra trong JVM mà không cần chi phí tuần tự/hồi tiếp.
- Dễ dàng sử dụng: HOFs cho phép bạn thực hiện các thao tác phức tạp chỉ với một hàm mà không cần phải viết mã dài dòng.
Sử dụng HOFs khi bạn cần:
- Chuyển đổi mỗi phần tử trong một mảng (
transform). - Lọc các phần tử của một mảng dựa trên điều kiện (
filter). - Kiểm tra sự tồn tại của một phần tử thỏa mãn điều kiện trong mảng (
exists). - Tính toán tổng các phần tử trong một mảng (
aggregate).
Thực hành: Ví dụ cụ thể
Chúng ta sẽ tạo một DataFrame đơn giản để minh họa các ví dụ. Cột scores sẽ là một mảng các số nguyên.
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
# Khởi tạo Spark Session
spark = SparkSession.builder.appName("HOF_Examples").getOrCreate()
# DataFrame mẫu
data = [
(1, "aluno_a", [80, 92, 75, 88]),
(2, "aluno_b", [60, 70, 65, 58]),
(3, "aluno_c", [95, 98, 100, 92]),
]
columns = ["id", "aluno", "scores"]
df_scores = spark.createDataFrame(data, columns)
1. transform: Áp dụng chuyển đổi cho mỗi phần tử
python
df_bonus = df_scores.withColumn(
"scores_com_bonus",
expr("transform(scores, x -> x + 10)")
)
df_bonus.show(truncate=False)
2. filter: Lọc các phần tử trong mảng
python
df_aprovados = df_scores.withColumn(
"notas_altas",
expr("filter(scores, nota -> nota >= 90)")
)
df_aprovados.show(truncate=False)
3. exists: Kiểm tra sự tồn tại của một phần tử
python
df_nota_max = df_scores.withColumn(
"tirou_100",
expr("exists(scores, nota -> nota = 100)")
)
df_nota_max.show()
Hướng dẫn tham khảo nhanh: Hàm Bậc Cao
Dưới đây là danh sách các HOFs phổ biến để bạn tham khảo.
Đối với Mảng
-
transform(array, function)
Mô tả: Áp dụng một hàm cho mỗi phần tử của mảng và trả về một mảng mới với kết quả.
Ví dụ: Chuyển đổi tất cả tên trong một mảng thành chữ hoa.transform(nomes, nome -> upper(nome)) -
filter(array, function)
Mô tả: Trả về một mảng mới chứa chỉ các phần tử thỏa mãn điều kiện.
Ví dụ: Giữ lại chỉ các số chẵn trong mảng.filter(numeros, n -> n % 2 == 0) -
exists(array, function)
Mô tả: Trả vềtruenếu ít nhất một phần tử trong mảng thỏa mãn điều kiện.
Ví dụ: Kiểm tra xem có sản phẩm nào có trạng thái "URGENT" trong mảng trạng thái không.exists(status, s -> s = 'URGENTE') -
forall(array, function)
Mô tả: Trả vềtruenếu tất cả các phần tử trong mảng thỏa mãn điều kiện.
Ví dụ: Kiểm tra xem tất cả các nhiệm vụ trong một dự án đều có trạng thái "COMPLETED" hay không.forall(tarefas, t -> t.status = 'CONCLUÍDO') -
aggregate(array, start, merge [, finish])
Mô tả: Giảm các phần tử của một mảng thành một giá trị duy nhất.
Ví dụ: Tính tổng tất cả các giá trị trong một mảng số.aggregate(valores, 0, (acumulador, valor) -> acumulador + valor) -
zip_with(array1, array2, function)
Mô tả: Kết hợp hai mảng, phần tử theo phần tử, bằng cách áp dụng một hàm kết hợp.
Ví dụ: Tính tổng của mỗi mặt hàng bằng cách nhân một mảngquantitiesvới một mảngprices.zip_with(quantidades, precos, (q, p) -> q * p)
Đối với Bản đồ
-
transform_keys(map, function)
Mô tả: Áp dụng một hàm cho mỗi khóa trong bản đồ và trả về một bản đồ mới.
Ví dụ: Chuẩn hóa tất cả các khóa của bản đồ thành chữ thường.transform_keys(mapa, (k, v) -> lower(k)) -
transform_values(map, function)
Mô tả: Áp dụng một hàm cho mỗi giá trị trong bản đồ và trả về một bản đồ mới.
Ví dụ: Áp dụng giảm giá 10% cho tất cả các giá trị trong bản đồproduct -> price.transform_values(mapa, (k, v) -> v * 0.9) -
map_filter(map, function)
Mô tả: Trả về một bản đồ mới chứa chỉ các mục thỏa mãn điều kiện.
Ví dụ: Lọc một bản đồ từproduct -> stockchỉ giữ lại các sản phẩm có tồn kho lớn hơn 0.map_filter(mapa, (k, v) -> v > 0)
Thử nghiệm ngay!
Một cách tuyệt vời để thử nghiệm tất cả những gì chúng ta đã học mà không cần phải cấu hình một môi trường Spark cục bộ là thông qua Databricks Free Edition, nơi cung cấp một cụm miễn phí để học tập và phát triển.
Để dễ dàng hơn, tôi đã để lại một notebook công khai với toàn bộ mã của bài viết này sẵn sàng để bạn xem và nhập vào tài khoản của bạn:
Kết luận
Lần tới khi bạn cần thao tác trên các phần tử trong một mảng (hoặc bản đồ) trong PySpark, hãy nhớ đến Hàm Bậc Cao. Với các ví dụ thực tiễn và hướng dẫn tham khảo ở trên, bạn đã có tất cả những gì cần thiết để bắt đầu. Hãy tự hỏi: "Tôi có thể giải quyết điều này bằng transform, filter, exists hoặc HOF khác không?". Câu trả lời gần như luôn là "có", và quy trình dữ liệu của bạn sẽ được cải thiện hiệu suất đáng kể.