Gretel 및 Apache Airflow를 사용하여 합성 데이터 파이프라인 구축

소스 노드 : 1068200

Gretel 및 Apache Airflow를 사용하여 합성 데이터 파이프라인 구축

이 블로그 게시물에서는 Gretel의 합성 데이터 API 및 Apache Airflow를 사용하여 PostgreSQL 데이터베이스에서 합성 데이터를 생성하는 ETL 파이프라인을 구축합니다.


By 드류 뉴베리, Gretel.ai의 소프트웨어 엔지니어

Gretel 및 Apache Airflow를 사용하여 합성 데이터 파이프라인 구축

안녕하세요 여러분, 제 이름은 Drew이고 여기 Gretel의 소프트웨어 엔지니어입니다. 저는 최근에 Gretel API를 기존 도구에 통합하는 패턴에 대해 생각해 보았습니다. 따라서 보안 및 고객 개인 정보 보호가 사후 고려 사항이나 확인해야 할 상자가 아니라 일급 기능인 데이터 파이프라인을 쉽게 구축할 수 있습니다.

Gretel 엔지니어와 고객 사이에서 인기 있는 데이터 엔지니어링 도구 중 하나는 Apache Airflow입니다. Gretel과도 잘 작동합니다. 이 블로그 게시물에서는 Airflow, Gretel 및 PostgreSQL을 사용하여 합성 데이터 파이프라인을 구축하는 방법을 보여줍니다. 뛰어들자!

에어플로우란?

 
 
기류 데이터 파이프라인을 구축하는 데 일반적으로 사용되는 워크플로 자동화 도구입니다. 이를 통해 데이터 엔지니어 또는 데이터 과학자는 Python 및 기타 친숙한 구성을 사용하여 이러한 파이프라인을 프로그래밍 방식으로 정의하고 배포할 수 있습니다. Airflow의 핵심에는 DAG 또는 방향성 비순환 그래프의 개념이 있습니다. Airflow DAG는 파이프라인 구성 요소, 해당 종속성 및 실행 순서를 정의하기 위한 모델 및 API 집합을 제공합니다.

제품 데이터베이스에서 데이터 웨어하우스로 데이터를 복제하는 Airflow 파이프라인을 찾을 수 있습니다. 다른 파이프라인은 정규화된 데이터를 분석 또는 모델링에 적합한 단일 데이터 세트로 결합하는 쿼리를 실행할 수 있습니다. 또 다른 파이프라인은 주요 비즈니스 지표를 집계하는 일일 보고서를 게시할 수 있습니다. 이러한 사용 사례에서 공유되는 공통 주제는 시스템 간 데이터 이동 조정입니다. 이것이 바로 Airflow가 빛나는 곳입니다.

Airflow와 풍부한 생태계를 활용하여 통합, 데이터 엔지니어와 과학자는 유지 및 운영하기 쉬운 단일 통합 파이프라인으로 다양한 도구 또는 서비스를 오케스트레이션할 수 있습니다. 이러한 통합 기능을 이해하고 이제 Gretel을 Airflow 파이프라인에 통합하여 일반적인 데이터 운영 워크플로를 개선할 수 있는 방법에 대해 이야기하기 시작합니다.

그레텔은 어떻게 어울리나요?

 
 
Gretel의 임무는 데이터를 보다 쉽고 안전하게 작업할 수 있도록 하는 것입니다. 고객과 이야기할 때 자주 듣는 문제 중 하나는 데이터 과학자가 민감한 데이터에 액세스할 수 있도록 하는 데 필요한 시간과 노력입니다. 사용 그레텔 합성, 데이터 세트의 합성 사본을 생성하여 민감한 데이터로 작업할 위험을 줄일 수 있습니다. Gretel을 Airflow와 통합하면 데이터 과학자가 모든 새로운 데이터 요청에 대해 데이터 엔지니어를 필요로 하지 않고도 필요한 데이터를 신속하게 얻을 수 있도록 하는 셀프 서비스 파이프라인을 생성할 수 있습니다.

이러한 기능을 시연하기 위해 데이터베이스에서 사용자 활동 기능을 추출하고, 데이터 세트의 합성 버전을 생성하고, 데이터 세트를 S3에 저장하는 ETL 파이프라인을 구축합니다. S3에 저장된 합성 데이터 세트를 사용하면 데이터 과학자가 고객 개인 정보를 손상시키지 않고 다운스트림 모델링 또는 분석에 사용할 수 있습니다.

일을 시작하기 위해 먼저 파이프라인을 조감도로 살펴보겠습니다. 이 다이어그램의 각 노드는 파이프라인 단계 또는 Airflow 용어로 "작업"을 나타냅니다.



Airflow의 Gretel 합성 파이프라인의 예.

 

ETL 파이프라인에서 볼 수 있는 것과 유사하게 파이프라인을 3단계로 나눌 수 있습니다.

  • 발췌 – extract_features 태스크는 데이터베이스를 쿼리하고 데이터 과학자가 모델을 구축하는 데 사용할 수 있는 기능 세트로 데이터를 변환합니다.
  • 종합 – generate_synthetic_features는 추출된 기능을 입력으로 사용하고 합성 모델을 훈련한 다음 Gretel API 및 클라우드 서비스를 사용하여 합성 기능 세트를 생성합니다.
  • 하중 – upload_synthetic_features는 합성 기능 세트를 S3에 저장하여 다운스트림 모델 또는 분석에 수집할 수 있습니다.

다음 몇 섹션에서 이 세 단계 각각에 대해 더 자세히 알아보겠습니다. 각 코드 샘플을 따르려면 다음으로 이동하십시오. 그레텔라이/그레텔-기류-파이프라인 이 블로그 게시물에 사용된 모든 코드를 다운로드합니다. 리포지토리에는 Airflow 인스턴스를 시작하고 파이프라인을 끝까지 실행하기 위해 따를 수 있는 지침도 포함되어 있습니다.

또한 각 구성 요소를 분석하기 전에 Airflow 파이프라인 전체를 보는 것이 도움이 될 수 있습니다. dags/airbnb_user_bookings.py. 다음 섹션의 코드 조각은 연결된 사용자 예약 파이프라인에서 추출됩니다.

기능 추출

 
 
첫 번째 작업인 extract_features는 소스 데이터베이스에서 원시 데이터를 추출하고 이를 기능 세트로 변환하는 역할을 합니다. 이것은 일반적인 기능 엔지니어링 기계 학습 또는 분석 파이프라인에서 찾을 수 있는 문제.

예제 파이프라인에서는 PostgreSQL 데이터베이스를 프로비저닝하고 이를 예약 데이터와 함께 로드합니다. 에어비앤비 캐글 대회.

이 데이터 세트에는 두 개의 테이블(Users 및 Sessions)이 있습니다. 세션에는 외래 키 참조인 user_id가 포함되어 있습니다. 이 관계를 사용하여 사용자별로 집계된 다양한 예약 메트릭을 포함하는 기능 집합을 만듭니다. 다음 그림은 기능을 빌드하는 데 사용되는 SQL 쿼리를 나타냅니다.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)_seconds)초, rounde min_session_time_seconds, ( SELECT count(*) FROM 세션 s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM 세션 GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u = .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds, s.max_session_time_seconds FROM session_features_by_user


그런 다음 SQL 쿼리는 Airflow 파이프라인에서 실행되고 다음 작업 정의를 사용하여 중간 S3 위치에 작성됩니다.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" with NamedTemporaryFile (mode="r+", suffix=".csv") as tmp_csv: postgres.copy_expert( f"copy ({sql_query}) to stdout with csv header", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, 키=키, ) 리턴 키


태스크에 대한 입력인 sql_file은 데이터베이스에서 실행할 쿼리를 결정합니다. 이 쿼리는 작업으로 읽어들인 다음 데이터베이스에 대해 실행됩니다. 그러면 쿼리 결과가 S3에 기록되고 원격 파일 키가 작업의 출력으로 반환됩니다.

아래 스크린샷은 위에서 추출한 쿼리의 샘플 결과 집합을 보여줍니다. 다음 섹션에서 이 데이터 세트의 합성 버전을 만드는 방법을 설명합니다.



쿼리 결과 미리보기.

Gretel API를 사용하여 기능 합성

 
 
각 기능의 합성 버전을 생성하려면 먼저 합성 모델을 훈련한 다음 모델을 실행하여 합성 레코드를 생성해야 합니다. Gretel에는 Airflow 작업에 쉽게 통합할 수 있는 Python SDK 세트가 있습니다.

Python 클라이언트 SDK 외에도 그레텔 에어플로우 후크 Gretel API 연결 및 비밀을 관리합니다. Gretel Airflow Connection을 설정한 후 Gretel API에 연결하는 것은 다음과 같이 쉽습니다.

hooks.gretel에서 가져오기 GretelHook gretel = GretelHook() 프로젝트 = gretel.get_project()


Airflow 연결을 구성하는 방법에 대한 자세한 내용은 Github 리포지토리를 참조하세요. README.

위 예의 프로젝트 변수는 Gretel의 API를 사용하여 합성 모델을 훈련하고 실행하기 위한 기본 진입점으로 사용할 수 있습니다. 자세한 내용은 당사의 파이썬 API 문서.

예약 파이프라인을 다시 참조하여 이제 generate_synthetic_features 작업을 검토하겠습니다. 이 단계는 이전 작업에서 추출한 특징을 사용하여 합성 모델을 훈련시키는 역할을 합니다.

@task() def generate_synthetic_features(data_source: str) -> str: 프로젝트 = gretel.get_project() 모델 = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() 투표(모델) 반환 model.get_artifact_link("data_preview")


메서드 서명을 보면 data_source라는 경로가 필요합니다. 이 값은 이전 단계에서 추출한 S3 특징을 가리킵니다. 이후 섹션에서 이러한 모든 입력과 출력이 함께 연결되는 방법을 살펴보겠습니다.

project.create_model_obj를 사용하여 모델을 생성할 때 model_config 매개변수는 모델을 생성하는 데 사용되는 합성 모델 구성을 나타냅니다. 이 파이프라인에서는 기본 모델 구성, 그러나 다른 많은 구성 옵션 사용할 수 있습니다.

모델이 구성되면 model.submit_cloud()를 호출합니다. 그러면 Gretel Cloud를 사용하여 훈련 및 레코드 생성을 위한 모델이 제출됩니다. poll(model)을 호출하면 모델이 훈련을 완료할 때까지 작업이 차단됩니다.

이제 모델이 학습되었으므로 get_artifact_link를 사용하여 생성된 합성 기능을 다운로드하는 링크를 반환합니다.



종합 기능 세트의 데이터 미리보기.

 

이 아티팩트 링크는 최종 upload_synthetic_features 단계에 대한 입력으로 사용됩니다.

합성 피처 로드

 
 
원본 기능이 추출되어 합성 버전이 생성되었습니다. 이제 다운스트림 소비자가 액세스할 수 있도록 합성 기능을 업로드해야 합니다. 이 예에서는 S3 버킷을 데이터 세트의 최종 대상으로 사용합니다.

@task() def upload_synthetic_features(data_set: str): context = get_current_context()와 open(data_set, "rb")을 synth_features로 사용: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.cs )


이 작업은 매우 간단합니다. data_set 입력 값에는 Gretel의 API에서 합성 데이터 세트를 다운로드하기 위한 서명된 HTTP 링크가 포함되어 있습니다. 작업은 해당 파일을 Airflow 작업자로 읽은 다음 이미 구성된 S3 후크를 사용하여 다운스트림 소비자 또는 모델이 액세스할 수 있는 S3 버킷에 합성 기능 파일을 업로드합니다.

파이프라인 조정

 
 
지난 세 섹션에서 데이터 세트를 추출, 합성 및 로드하는 데 필요한 모든 코드를 살펴보았습니다. 마지막 단계는 이러한 각 작업을 단일 Airflow 파이프라인으로 연결하는 것입니다.

이 게시물의 시작 부분으로 돌아가 보면 DAG의 개념에 대해 간략하게 언급했습니다. Airflow의 TaskFlow API를 사용하여 이 세 가지 Python 메서드를 입력, 출력 및 각 단계가 실행될 순서를 정의하는 DAG로 구성할 수 있습니다.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


이러한 메서드 호출의 경로를 따르면 결국 원래 기능 파이프라인처럼 보이는 그래프를 얻게 됩니다.



Airflow의 Gretel 합성 파이프라인.

 

이 파이프라인을 실행하고 작동하는 모습을 보려면 함께 제공되는 Github 저장소. 여기에서 Airflow 인스턴스를 시작하고 파이프라인을 끝까지 실행하는 방법에 대한 지침을 찾을 수 있습니다.

줄 바꿈

 
 
여기까지 했다면 Gretel을 Airflow에 구축된 데이터 파이프라인에 통합할 수 있는 방법을 확인했습니다. Gretel의 개발자 친화적인 API와 Airflow의 강력한 후크 및 연산자 시스템을 결합하여 데이터에 더 쉽게 액세스하고 더 안전하게 사용할 수 있도록 하는 ETL 파이프라인을 구축할 수 있습니다.

또한 민감한 데이터에 쉽게 액세스할 수 없는 일반적인 기능 엔지니어링 사용 사례에 대해서도 이야기했습니다. 데이터 세트의 합성 버전을 생성하여 민감한 데이터가 노출될 위험을 줄이면서 데이터 세트의 유용성을 유지하면서 필요한 사람들이 빠르게 사용할 수 있도록 합니다.

기능 파이프라인을 보다 추상적인 용어로 생각해보면 이제 새로운 SQL 쿼리의 수에 관계없이 용도를 변경할 수 있는 패턴이 있습니다. 파이프라인의 새 버전을 배포하고 초기 SQL 쿼리를 교체하여 잠재적으로 민감한 쿼리를 고객 개인 정보를 보호하는 합성 데이터 세트로 전면에 노출할 수 있습니다. 변경해야 하는 코드의 유일한 행은 sql 파일의 경로입니다. 복잡한 데이터 엔지니어링이 필요하지 않습니다.

읽어 주셔서 감사합니다

 
 
우리에게 이메일을 보내 안녕@gretel.ai 또는 우리와 함께 느슨하게 질문이나 의견이 있으시면. Airflow를 어떻게 사용하고 있으며 기존 데이터 파이프라인과 가장 잘 통합할 수 있는지 알고 싶습니다.

 
바이오 : 드류 뉴베리 Gretel.ai의 소프트웨어 엔지니어입니다.

실물. 허가를 받아 다시 게시했습니다.

관련 :

출처: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

타임 스탬프 :

더보기 너 겟츠