Xây dựng đường ống dữ liệu tổng hợp bằng Gretel và Apache Airflow

Nút nguồn: 1068200

Xây dựng đường ống dữ liệu tổng hợp bằng Gretel và Apache Airflow

Trong bài đăng trên blog này, chúng tôi xây dựng đường dẫn ETL tạo dữ liệu tổng hợp từ cơ sở dữ liệu PostgreSQL bằng cách sử dụng API dữ liệu tổng hợp của Gretel và Apache Airflow.


By Đã vẽ Newberry, Kỹ sư phần mềm tại Gretel.ai

Xây dựng đường ống dữ liệu tổng hợp bằng Gretel và Apache Airflow

Hey folks, my name is Drew, and I’m a software engineer here at Gretel. I’ve recently been thinking about patterns for integrating Gretel APIs into existing tools so that it’s easy to build data pipelines where security and customer privacy are first-class features, not just an afterthought or box to check.

One data engineering tool that is popular amongst Gretel engineers and customers is Apache Airflow. It also happens to work great with Gretel. In this blog post, we’ll show you how to build a synthetic data pipeline using Airflow, Gretel and PostgreSQL. Let’s jump in!

Luồng không khí là gì

 
 
Luồng khí là một công cụ tự động hóa quy trình làm việc thường được sử dụng để xây dựng đường ống dẫn dữ liệu. Nó cho phép các kỹ sư dữ liệu hoặc nhà khoa học dữ liệu xác định theo chương trình và triển khai các đường ống này bằng Python và các cấu trúc quen thuộc khác. Cốt lõi của Luồng không khí là khái niệm về DAG, hoặc đồ thị xoay chiều có hướng. Một DAG luồng không khí cung cấp một mô hình và tập hợp các API để xác định các thành phần đường ống, sự phụ thuộc của chúng và thứ tự thực thi.

Bạn có thể thấy các đường ống Luồng khí sao chép dữ liệu từ cơ sở dữ liệu sản phẩm vào kho dữ liệu. Các đường ống khác có thể thực hiện các truy vấn kết hợp dữ liệu chuẩn hóa thành một tập dữ liệu duy nhất phù hợp cho phân tích hoặc mô hình hóa. Tuy nhiên, một quy trình khác có thể xuất bản một báo cáo hàng ngày tổng hợp các chỉ số kinh doanh chính. Một chủ đề chung được chia sẻ giữa các trường hợp sử dụng này: điều phối sự di chuyển của dữ liệu giữa các hệ thống. Đây là nơi Luồng không khí tỏa sáng.

Tận dụng Luồng không khí và hệ sinh thái phong phú của nó tích hợp, các kỹ sư dữ liệu và nhà khoa học có thể sắp xếp bất kỳ số lượng công cụ hoặc dịch vụ khác nhau thành một đường ống thống nhất duy nhất dễ bảo trì và vận hành. Với sự hiểu biết về các khả năng tích hợp này, bây giờ chúng ta sẽ bắt đầu nói về cách Gretel có thể được tích hợp vào đường ống Luồng khí để cải thiện quy trình hoạt động dữ liệu chung.

Làm thế nào để Gretel phù hợp?

 
 
Tại Gretel, sứ mệnh của chúng tôi là làm cho dữ liệu hoạt động dễ dàng hơn và an toàn hơn. Nói chuyện với khách hàng, một điểm khó khăn mà chúng tôi thường nghe nói đến là thời gian và nỗ lực cần thiết để các nhà khoa học dữ liệu truy cập vào dữ liệu nhạy cảm. Sử dụng Gretel tổng hợp, chúng tôi có thể giảm rủi ro khi làm việc với dữ liệu nhạy cảm bằng cách tạo bản sao tổng hợp của tập dữ liệu. Bằng cách tích hợp Gretel với Airflow, có thể tạo các đường ống tự phục vụ giúp các nhà khoa học dữ liệu dễ dàng nhanh chóng lấy dữ liệu họ cần mà không yêu cầu kỹ sư dữ liệu cho mọi yêu cầu dữ liệu mới.

Để chứng minh những khả năng này, chúng tôi sẽ xây dựng đường ống ETL trích xuất các tính năng hoạt động của người dùng từ cơ sở dữ liệu, tạo phiên bản tổng hợp của tập dữ liệu và lưu tập dữ liệu vào S3. Với tập dữ liệu tổng hợp được lưu trong S3, sau đó các nhà khoa học dữ liệu có thể sử dụng nó để lập mô hình hoặc phân tích hạ nguồn mà không ảnh hưởng đến quyền riêng tư của khách hàng.

Để bắt đầu mọi thứ, trước tiên chúng ta hãy nhìn toàn cảnh về đường ống. Mỗi nút trong sơ đồ này đại diện cho một bước đường ống, hoặc "nhiệm vụ" trong thuật ngữ Luồng không khí.



Ví dụ về đường ống tổng hợp Gretel trên Luồng không khí.

 

Chúng tôi có thể chia đường ống thành 3 giai đoạn, tương tự như những gì bạn có thể tìm thấy trong đường dẫn ETL:

  • Trích xuất – The extract_features task will query a database, and transform the data into a set of features that can be used by data scientists for building models.
  • Tổng hợp – generate_synthetic_features will take the extracted features as input, train a synthetic model, and then generate a synthetic set of features using Gretel APIs and cloud services.
  • Aukcje internetowe dla Twojej strony!Phụ tải – upload_synthetic_features saves the synthetic set of features to S3 where it can be ingested into any downstream model or analysis.

Trong một vài phần tiếp theo, chúng ta sẽ đi sâu vào từng bước trong ba bước này một cách chi tiết hơn. Nếu bạn muốn làm theo từng mẫu mã, bạn có thể đi tới gretelai / gretel-airflow-pipeways và tải xuống tất cả mã được sử dụng trong bài đăng trên blog này. Kho lưu trữ cũng chứa các hướng dẫn bạn có thể làm theo để bắt đầu một phiên bản Luồng không khí và chạy phần cuối đường ống để kết thúc.

Ngoài ra, có thể hữu ích khi xem toàn bộ đường ống Luồng khí, trước khi chúng tôi phân tích từng thành phần, dags / airbnb_user_bookings.py. Các đoạn mã trong các phần sau được trích xuất từ ​​quy trình đặt chỗ của người dùng được liên kết.

Tính năng trích xuất

 
 
Tác vụ đầu tiên, extract_features chịu trách nhiệm trích xuất dữ liệu thô từ cơ sở dữ liệu nguồn và chuyển đổi nó thành một tập hợp các tính năng. Đây là một phổ biến kỹ thuật tính năng vấn đề bạn có thể tìm thấy trong bất kỳ đường ống phân tích hoặc học máy nào.

Trong đường dẫn ví dụ của chúng tôi, chúng tôi sẽ cung cấp cơ sở dữ liệu PostgreSQL và tải nó cùng với dữ liệu đặt chỗ từ Cuộc thi Airbnb Kaggle.

Tập dữ liệu này chứa hai bảng, Người dùng và Phiên. Phiên chứa tham chiếu khóa ngoại, user_id. Sử dụng mối quan hệ này, chúng tôi sẽ tạo một tập hợp các tính năng chứa nhiều chỉ số đặt phòng khác nhau do người dùng tổng hợp. Hình sau đại diện cho truy vấn SQL được sử dụng để xây dựng các tính năng.

WITH session_features_by_user AS (SELECT user_id, count (*) AS number_of_actions_taken, count (DISTINCT action_type) AS number_of_unique_actions, round (avg (secs_elapsed)) AS avg_session_time_seconds, round (max (secsap_elapsed) AS) min_session_time_seconds, (SELECT count (*) FROM session s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM session GROUP BY user_id) CHỌN u.id AS user_id, u.uality, u.age, u .language, u.signup_method, u.date_account_create, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_server_seconds FROM session_features


Sau đó, truy vấn SQL được thực thi từ đường dẫn Luồng không khí của chúng tôi và được ghi vào vị trí S3 trung gian bằng cách sử dụng định nghĩa tác vụ sau.

@task () def extract_features (sql_file: str) -> str: context = get_current_context () sql_query = Đường dẫn (sql_file) .read_text () key = f "{context ['dag_run']. run_id} _booking_features.csv" với NamedTemporaryFile (mode = "r +", hậu tố = ". csv") dưới dạng tmp_csv: postgres.copy_expert (f "copy ({sql_query}) sang stdout với tiêu đề csv", tmp_csv.name) s3.load_file (filename = tmp_csv.name, key = key,) phím quay lại


Đầu vào cho tác vụ, sql_file, xác định truy vấn nào sẽ chạy trên cơ sở dữ liệu. Truy vấn này sẽ được đọc vào tác vụ và sau đó được thực thi dựa trên cơ sở dữ liệu. Sau đó, kết quả của truy vấn sẽ được ghi vào S3 và khóa tệp từ xa sẽ được trả về dưới dạng kết quả đầu ra của tác vụ.

Ảnh chụp màn hình bên dưới hiển thị tập hợp kết quả mẫu của truy vấn trích xuất từ ​​bên trên. Chúng tôi sẽ mô tả cách tạo phiên bản tổng hợp của tập dữ liệu này trong phần tiếp theo.



Xem trước kết quả truy vấn.

Tổng hợp các tính năng bằng API Gretel

 
 
Để tạo phiên bản tổng hợp của từng tính năng, trước tiên chúng ta phải đào tạo một mô hình tổng hợp, sau đó chạy mô hình đó để tạo các bản ghi tổng hợp. Gretel có một bộ Python SDK giúp bạn dễ dàng tích hợp vào các tác vụ Luồng không khí.

Ngoài các SDK ứng dụng Python, chúng tôi đã tạo Móc luồng gió Gretel quản lý các kết nối và bí mật của Gretel API. Sau khi thiết lập Kết nối luồng không khí Gretel, việc kết nối với API Gretel dễ dàng như

from hooks.gretel import GretelHook gretel = GretelHook () project = gretel.get_project ()


Để biết thêm thông tin về cách định cấu hình kết nối Luồng không khí, vui lòng tham khảo kho lưu trữ Github của chúng tôi README.

Biến dự án trong ví dụ trên có thể được sử dụng làm điểm nhập chính để đào tạo và chạy các mô hình tổng hợp bằng cách sử dụng API của Gretel. Để biết thêm chi tiết, bạn có thể kiểm tra Tài liệu API Python.

Đề cập lại quy trình đặt phòng, bây giờ chúng ta sẽ xem xét tác vụ create_synthetic_features. Bước này có nhiệm vụ đào tạo mô hình tổng hợp bằng cách sử dụng các tính năng được trích xuất trong tác vụ trước đó.

@task () def create_synthetic_features (data_source: str) -> str: project = gretel.get_project () model = project.create_model_obj (model_config = "syntilities / default", data_source = s3.download_file (data_source)) model.submit_cloud () thăm dò ý kiến ​​(mô hình) trả về model.get_artifact_link ("data_preview")


Nhìn vào chữ ký của phương thức, bạn sẽ thấy nó có một đường dẫn, data_source. Giá trị này trỏ đến các tính năng S3 được trích xuất trong bước trước. Trong phần sau, chúng ta sẽ xem xét cách tất cả các đầu vào và đầu ra này được kết nối với nhau.

Khi tạo mô hình bằng project.create_model_obj, tham số model_config đại diện cho cấu hình mô hình tổng hợp được sử dụng để tạo mô hình. Trong đường dẫn này, chúng tôi đang sử dụng cấu hình mô hình mặc định, nhưng nhiều thứ khác tùy chọn cấu hình có sẵn.

Sau khi mô hình đã được định cấu hình, chúng tôi gọi model.submit_cloud (). Điều này sẽ gửi mô hình để đào tạo và tạo bản ghi bằng Gretel Cloud. Cuộc gọi thăm dò ý kiến ​​(mô hình) sẽ chặn nhiệm vụ cho đến khi mô hình đã hoàn thành đào tạo.

Bây giờ mô hình đã được đào tạo, chúng tôi sẽ sử dụng get_artifact_link để trả về liên kết tải xuống các tính năng tổng hợp đã tạo.



Xem trước dữ liệu của tập hợp các tính năng tổng hợp.

 

Liên kết giả này sẽ được sử dụng làm đầu vào cho bước upload_synthetic_features cuối cùng.

Tải các tính năng tổng hợp

 
 
Các tính năng gốc đã được trích xuất và một phiên bản tổng hợp đã được tạo. Bây giờ đã đến lúc tải lên các tính năng tổng hợp để người tiêu dùng hạ lưu có thể truy cập chúng. Trong ví dụ này, chúng ta sẽ sử dụng nhóm S3 làm đích cuối cùng cho tập dữ liệu.

@task () def upload_synthetic_features (data_set: str): context = get_current_context () với mở (data_set, "rb") dưới dạng synth_features: s3.load_file_obj (file_obj = synth_features, key = f "{..._ booking_features_synthetic.csv" )


Nhiệm vụ này khá đơn giản. Giá trị đầu vào data_set chứa liên kết HTTP đã ký để tải xuống tập dữ liệu tổng hợp từ API của Gretel. Tác vụ sẽ đọc tệp đó vào Airflow worker, sau đó sử dụng hook S3 đã được định cấu hình để tải tệp tính năng tổng hợp lên nhóm S3 nơi người tiêu dùng hoặc người mẫu hạ cấp có thể truy cập.

Điều phối đường ống

 
 
Trong ba phần cuối cùng, chúng tôi đã xem qua tất cả các mã cần thiết để trích xuất, tổng hợp và tải tập dữ liệu. Bước cuối cùng là liên kết từng tác vụ này với nhau thành một đường ống Luồng khí duy nhất.

Nếu bạn nhớ lại phần đầu của bài đăng này, chúng tôi đã đề cập ngắn gọn đến khái niệm DAG. Sử dụng API TaskFlow của Airflow, chúng ta có thể biên soạn ba phương thức Python này thành một DAG xác định các đầu vào, đầu ra và thứ tự mỗi bước sẽ được chạy.

feature_path = extract_features ("/opt/airflow/dags/sql/session_rollups__by_user.sql") tổng hợp_data = create_synthetic_features (feature_path) upload_synthetic_features (tổng hợp_data)


Nếu bạn đi theo đường dẫn của các lệnh gọi phương thức này, cuối cùng bạn sẽ nhận được một biểu đồ trông giống như đường dẫn tính năng ban đầu của chúng tôi.



Đường ống tổng hợp Gretel trên Luồng không khí.

 

Nếu bạn muốn chạy đường dẫn này và thấy nó hoạt động, hãy chuyển đến kho lưu trữ Github đi kèm. Ở đó, bạn sẽ tìm thấy hướng dẫn về cách bắt đầu một phiên bản Luồng không khí và chạy phần cuối của đường ống để kết thúc.

Đóng gói đồ đạc

 
 
Nếu bạn đã làm được điều này đến nay, bạn đã thấy cách Gretel có thể được tích hợp vào một đường ống dữ liệu được xây dựng trên Airflow. Bằng cách kết hợp các API thân thiện với nhà phát triển của Gretel và hệ thống móc và nhà khai thác mạnh mẽ của Airflow, thật dễ dàng để xây dựng các đường ống ETL giúp dữ liệu dễ truy cập hơn và sử dụng an toàn hơn.

Chúng tôi cũng đã nói về một trường hợp sử dụng kỹ thuật tính năng phổ biến trong đó dữ liệu nhạy cảm có thể không dễ dàng truy cập được. Bằng cách tạo phiên bản tổng hợp của tập dữ liệu, chúng tôi giảm thiểu rủi ro để lộ bất kỳ dữ liệu nhạy cảm nào, nhưng vẫn giữ được tiện ích của tập dữ liệu trong khi cung cấp nhanh chóng cho những người cần.

Suy nghĩ về đường ống tính năng theo các thuật ngữ trừu tượng hơn, bây giờ chúng ta có một mẫu có thể được thay thế cho bất kỳ số lượng truy vấn SQL mới nào. Bằng cách triển khai phiên bản mới của đường ống và hoán đổi truy vấn SQL ban đầu, chúng tôi có thể xử lý bất kỳ truy vấn nhạy cảm tiềm năng nào bằng tập dữ liệu tổng hợp nhằm bảo vệ quyền riêng tư của khách hàng. Dòng mã duy nhất cần thay đổi là đường dẫn đến tệp sql. Không yêu cầu kỹ thuật dữ liệu phức tạp.

Cảm ơn vì đã đọc

 
 
Gửi email cho chúng tôi theo địa chỉ chào@gretel.ai hoặc đến tham gia với chúng tôi Slack nếu bạn có bất kỳ câu hỏi hoặc nhận xét nào. Chúng tôi muốn biết cách bạn đang sử dụng Airflow và cách chúng tôi có thể tích hợp tốt nhất với các đường ống dẫn dữ liệu hiện có của bạn.

 
Tiểu sử: Đã vẽ Newberry là Kỹ sư phần mềm tại Gretel.ai.

Nguyên. Đăng lại với sự cho phép.

Liên quan:

Nguồn: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Dấu thời gian:

Thêm từ Xe đẩy