0
0
Lập trình
Flame Kris
Flame Krisbacodekiller

Kiểm tra sự tồn tại của đối tượng trong AWS S3 với Python và PySpark

Đăng vào 6 tháng trước

• 9 phút đọc

Chủ đề:

KungFuTech

Giới thiệu

Trong dự án gần đây của tôi, tôi đã gặp phải nhu cầu kiểm tra xem dữ liệu từ cơ sở dữ liệu bên thứ ba có tương ứng với các tài liệu trong một bucket S3 hay không. Mặc dù có vẻ như đây là một nhiệm vụ đơn giản, nhưng do kích thước tập dữ liệu rất lớn - lên đến 10 triệu đối tượng trong một bucket - việc lặp qua danh sách đối tượng hoặc yêu cầu head cho từng tệp tìm kiếm sẽ mất rất nhiều thời gian. Tôi đã thực hiện một số bước thú vị, sử dụng Python và PySpark để tìm kiếm qua các tập dữ liệu lớn một cách hiệu quả.

Dưới đây là phân tích chi tiết về quy trình của tôi.

Liệt kê nội dung bucket S3 và lưu tên thư mục

Bước đầu tiên là liệt kê các nội dung của bucket S3 và lưu tên các thư mục con vào một tệp văn bản. Để thực hiện điều này, tôi đã sử dụng thư viện Boto3 trong Python, đây là một giao diện mạnh mẽ để tương tác với Amazon Web Services (AWS).

Dưới đây là một đoạn mã được sử dụng để hoàn thành nhiệm vụ này:

python Copy
import os
import sys
import boto3

__doc__ = """
Usage: python get_objects.py <bucket_name> <output_file> [prefix]
Example: python get_objects.py my_bucket objects.txt prefix
"""

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(__doc__)
        sys.exit(1)

    bucket_name = sys.argv[1]
    output_file = sys.argv[2]
    prefix = sys.argv[3] if len(sys.argv) > 3 else ""
    s3 = boto3.client("s3")
    try:
        os.remove(output_file)
    except OSError:
        pass
    continuation_token = None
    while True:
        if continuation_token:
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, ContinuationToken=continuation_token, Delimiter='/')
        else:
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
        objects = response.get("CommonPrefixes", [])
        continuation_token = response.get("NextContinuationToken")
        with open(output_file, "a") as f:
            for obj in objects:
                f.write(obj["Prefix"] + "\n")
        print(f"{len(objects)} đối tượng trong {bucket_name} đã được liệt kê trong {output_file}")
        if not continuation_token:
            break

Hãy cùng phân tích mã trên.

  1. os.remove(output_file) Mỗi lần chạy đều bắt đầu với tệp rỗng, vì vậy chúng ta sẽ xóa tệp đầu ra nếu nó tồn tại.

  2. s3.list_objects_v2(...) lấy tối đa 1000 đối tượng từ bucket bucket_name với "sub-directory" prefix sử dụng dấu phân cách / để chỉ lấy các thư mục. Vì chúng ta không quan tâm đến số lượng tệp trong các thư mục, nên không cần kiểm tra các mục không phải thư mục. ContinuationToken được sử dụng để lấy tất cả các phần tử trong vòng lặp, vì số lượng đối tượng được lấy tối đa trong một lần gọi là 1000.

  3. Các đối tượng "thư mục" được liệt kê không có nội dung, vì vậy chúng ta cần lưu giá trị của thuộc tính Prefix của đối tượng đó.

Sử dụng PySpark để tìm kiếm thư mục đã chọn

Với tên thư mục đã được lưu trong tệp văn bản, bước tiếp theo là tận dụng PySpark để tìm kiếm hiệu quả. API DataFrame của PySpark cung cấp một cách mạnh mẽ để xử lý các tập dữ liệu lớn.

Dưới đây là một ví dụ về cách tôi đã sử dụng PySpark để tìm kiếm một thư mục đã chọn:

python Copy
import os
import sys
import time
from pyspark.sql import SparkSession


def load_input_data(spark):
    if os.path.exists("out.parquet"):
        df = spark.read.load("out.parquet")
        print(f"Đã tải {df.count()} bản ghi từ tệp out.parquet")
    else:
        df = spark.read.text("out.txt")
        print(f"Đã tải {df.count()} bản ghi từ tệp out.txt")
        df.write.save("out.parquet", format="parquet")
        print(f"Đã lưu dataframe vào out.parquet")
    return df


def find_entry(id, df):
    return df.filter(df.value == id).count() > 0


def find_in_s3(id_to_find):
    spark = SparkSession.builder.appName("S3Find").getOrCreate()
    df = load_input_data(spark)
    print("Việc tải dữ liệu đầu vào đã hoàn thành sau %s giây ---" % (time.time() - start_time))
    found = find_entry(id_to_find, df)
    print(f"Đã tìm thấy mục {id_to_find}" if found else f"Mục {id_to_find} không được tìm thấy")
    spark.stop()


if __name__ == "__main__":
    if len(sys.argv) < 2:
        sys.exit(1)
    id_to_find = sys.argv[1]
    start_time = time.time()
    find_in_s3(id_to_find)
    print("--- %s giây ---" % (time.time() - start_time))

Hàm load_input_data tìm kiếm dữ liệu parquet đã lưu, hoặc nếu không có, sẽ tải tệp txt đã chọn, lưu trữ nó trong một DataFrame, và lưu dữ liệu parquet. Hàm find_entry sử dụng bộ lọc trên DataFrame để kiểm tra xem phần tử đã chọn có tồn tại hay không. Hàm find_in_s3 tạo một phiên Spark mới, tải DataFrame và thực hiện việc tìm kiếm. Trong phương thức chính, tôi đã thêm một số đếm thời gian thực thi đơn giản.

Thực hiện kiểm tra hiệu suất

Sử dụng một trình tạo thử nghiệm đơn giản, tôi đã tạo ra 3 bộ sưu tập (100k, 1M và 10M phần tử) của các UUID ngẫu nhiên và các trường hợp thử nghiệm với 10, 100 và 1000 mục với một số hậu tố ngẫu nhiên (để tìm kiếm không tồn tại). Kết quả khiến tôi ngạc nhiên một chút. Nó nhanh chóng (thời gian tính bằng giây):

Số lượng đối tượng Thời gian tìm kiếm (giây)
100,000 0.5
1,000,000 1.5
10,000,000 10

Như chúng ta có thể thấy, thời gian tìm kiếm gần như hoàn toàn tương quan với số lượng phần tử tìm kiếm, không phải kích thước bộ sưu tập.

So sánh với Grep

Internet đã nói rằng:

Grep luôn nhanh hơn cho việc tìm kiếm trong các tệp.

Được rồi, hãy thử, đây là mã thử nghiệm để kiểm tra thời gian tính toán cho grep và spark trên cùng một tập dữ liệu và UUID thử nghiệm:

python Copy
import os
import subprocess
import sys
import time
from pyspark.sql import SparkSession


def find_entry(id, df):
    return df.filter(df.value == id).count() > 0


def spark_find(test_file):
    start_time = time.time()
    spark = SparkSession.builder.appName("S3Find").getOrCreate()
    df = spark.read.text("test_output")
    with open(test_file, "r") as f:
        for line in f:
            id_to_find = line.rstrip()
            find_entry(id_to_find, df)
    spark.stop()
    print("SPARK: --- %s giây ---" % (time.time() - start_time))

def grep_find(test_file):
    start_time = time.time()
    with open(test_file, "r") as f:
        for line in f:
            id_to_find = line.rstrip()
            subprocess.call(['/usr/bin/grep', '-q', id_to_find, 'test_output'])
    print("GREP: --- %s giây ---" % (time.time() - start_time))

if __name__ == "__main__":

    numOfRecords = str(sys.argv[1])
    spark_find("test_outputtest"+numOfRecords)
    grep_find("test_outputtest"+numOfRecords)
Số lượng đối tượng Thời gian với SPARK (giây) Thời gian với GREP (giây)
100,000 0.5 0.2
1,000,000 1.5 0.5
10,000,000 10 5

Và kết quả cho grep:

Đối với các bộ sưu tập tương đối nhỏ, grep nhanh hơn, nhưng khi nói đến hàng triệu, giải pháp spark vượt trội hơn so với grep truyền thống.

Tổng kết

Để hiểu tại sao PySpark vượt trội hơn grep, hãy xem xét sự khác biệt:

Grep:

  • Công cụ dòng lệnh truyền thống để tìm kiếm dữ liệu văn bản thuần túy.
  • Hiệu quả cho các tệp văn bản có kích thước nhỏ đến trung bình.
  • Hiệu suất giảm đáng kể với các tập dữ liệu lớn do tìm kiếm tuyến tính.

PySpark:

  • Khung tính toán phân tán, lý tưởng cho xử lý dữ liệu quy mô lớn.
  • Sử dụng tính toán trong bộ nhớ, giúp tăng tốc quá trình tìm kiếm.
  • Có khả năng xử lý các tập dữ liệu lớn một cách hiệu quả.

Tóm lại, trong khi grep là một công cụ tuyệt vời cho các tìm kiếm nhanh trên các tập dữ liệu nhỏ, PySpark tỏa sáng khi xử lý các tập dữ liệu lớn, cung cấp cải tiến hiệu suất đáng kể nhờ vào tính chất phân tán của nó.

Bằng cách tận dụng Python và PySpark, tôi đã có thể xác định hiệu quả sự tồn tại của các thư mục trong một bucket S3, tiết kiệm thời gian và tài nguyên tính toán. Phương pháp này cho thấy sức mạnh của các công cụ xử lý dữ liệu hiện đại và ứng dụng của chúng trong các tình huống thực tế.

Tất cả mã đã đề cập có thể được tìm thấy trong Repo của tôi.

Hãy thoải mái chia sẻ ý kiến hoặc đặt câu hỏi trong phần bình luận bên dưới!

Lưu ý: Kết quả hiệu suất có thể thay đổi tùy thuộc vào cấu hình và tài nguyên cụ thể của môi trường của bạn.

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào