Zbuduj syntetyczny potok danych przy użyciu Gretel i Apache Airflow

Węzeł źródłowy: 1068200

Zbuduj syntetyczny potok danych przy użyciu Gretel i Apache Airflow

W tym poście na blogu budujemy potok ETL, który generuje dane syntetyczne z bazy danych PostgreSQL przy użyciu interfejsów API danych syntetycznych Gretel i Apache Airflow.


By Drewa Newberry’ego, inżynier oprogramowania w Gretel.ai

Zbuduj syntetyczny potok danych przy użyciu Gretel i Apache Airflow

Hej ludzie, mam na imię Drew i jestem inżynierem oprogramowania w Gretel. Ostatnio zastanawiałem się nad wzorcami integracji interfejsów API Gretel z istniejącymi narzędziami, aby łatwo było budować potoki danych, w których bezpieczeństwo i prywatność klientów są funkcjami pierwszej klasy, a nie tylko refleksją lub polem do sprawdzenia.

Jednym z narzędzi inżynierii danych, które jest popularne wśród inżynierów i klientów Gretel, jest Apache Airflow. Świetnie sprawdza się również w przypadku Gretel. W tym poście na blogu pokażemy, jak zbudować syntetyczny potok danych za pomocą Airflow, Gretel i PostgreSQL. Wskakujmy!

Co to jest przepływ powietrza

 
 
Airflow to narzędzie do automatyzacji przepływu pracy, powszechnie używane do budowania potoków danych. Umożliwia inżynierom danych lub analitykom danych programistyczne definiowanie i wdrażanie tych potoków przy użyciu języka Python i innych znanych konstrukcji. U podstaw Airflow leży koncepcja DAG, czyli ukierunkowanego wykresu acyklicznego. Airflow DAG zapewnia model i zestaw interfejsów API do definiowania komponentów potoku, ich zależności i kolejności wykonywania.

Możesz znaleźć potoki Airflow replikujące dane z bazy danych produktów do hurtowni danych. Inne potoki mogą wykonywać zapytania, które łączą znormalizowane dane w jeden zestaw danych odpowiedni do analizy lub modelowania. Jeszcze inny potok może publikować dzienny raport agregujący kluczowe wskaźniki biznesowe. Wspólny temat wspólny dla tych przypadków użycia: koordynacja przepływu danych między systemami. Tutaj Airflow błyszczy.

Wykorzystując Airflow i jego bogaty ekosystem integracje, inżynierowie danych i naukowcy mogą skoordynować dowolną liczbę różnych narzędzi lub usług w jednym ujednoliconym potoku, który jest łatwy w utrzymaniu i obsłudze. Po zrozumieniu tych możliwości integracji zaczniemy teraz mówić o tym, jak Gretel można zintegrować z potokiem Airflow, aby usprawnić wspólne przepływy pracy związane z obsługą danych.

Jak pasuje Gretel?

 
 
Naszą misją w Gretel jest uczynienie pracy z danymi łatwiejszą i bezpieczniejszą. W rozmowach z klientami często słyszymy o problemie, który wymaga czasu i wysiłku, aby zapewnić analitykom danych dostęp do wrażliwych danych. Za pomocą Syntetyki Gretel, możemy zmniejszyć ryzyko pracy z danymi wrażliwymi, generując syntetyczną kopię zestawu danych. Dzięki integracji Gretel z Airflow możliwe jest tworzenie samoobsługowych potoków, które ułatwiają analitykom danych szybkie uzyskiwanie potrzebnych danych bez konieczności zatrudniania inżyniera danych do każdego nowego żądania danych.

Aby zademonstrować te możliwości, zbudujemy potok ETL, który wyodrębnia funkcje aktywności użytkownika z bazy danych, generuje syntetyczną wersję zestawu danych i zapisuje zestaw danych w S3. Syntetyczny zestaw danych zapisany w S3 może być następnie wykorzystany przez analityków danych do modelowania lub analizy na dalszych etapach procesu bez narażania prywatności klientów.

Na dobry początek spójrzmy na rurociąg z lotu ptaka. Każdy węzeł na tym diagramie reprezentuje krok potoku lub „zadanie” w kategoriach przepływu powietrza.



Przykład rurociągu syntetycznego Gretel na Airflow.

 

Możemy podzielić potok na 3 etapy, podobnie jak w przypadku potoku ETL:

  • Wyciąg – Zadanie extract_features prześle zapytanie do bazy danych i przekształci dane w zestaw funkcji, które mogą być wykorzystane przez analityków danych do budowania modeli.
  • Syntezuj – generate_synthetic_features pobierze wyodrębnione funkcje jako dane wejściowe, wytrenuje model syntetyczny, a następnie wygeneruje syntetyczny zestaw funkcji przy użyciu interfejsów API Gretel i usług w chmurze.
  • Aukcje internetowe dla Twojej strony!Załadować – upload_synthetic_features zapisuje syntetyczny zestaw funkcji w S3, gdzie można go wprowadzić do dowolnego modelu lub analizy.

W kilku następnych sekcjach bardziej szczegółowo omówimy każdy z tych trzech kroków. Jeśli chcesz śledzić każdą próbkę kodu, możesz przejść do gretelai/gretel-rurociągi-przepływu powietrza i pobierz cały kod użyty w tym poście na blogu. Repozytorium zawiera również instrukcje, których możesz użyć, aby uruchomić instancję Airflow i uruchomić cały potok.

Ponadto pomocne może być obejrzenie całego potoku przepływu powietrza, zanim przeanalizujemy każdy komponent, dags/airbnb_user_bookings.py. Fragmenty kodu w poniższych sekcjach są pobierane z połączonego potoku rezerwacji użytkownika.

Wyodrębnij funkcje

 
 
Pierwsze zadanie, extract_features, odpowiada za wyodrębnienie surowych danych ze źródłowej bazy danych i przekształcenie ich w zestaw cech. To jest powszechne inżynieria funkcji problem, który można znaleźć w każdym potoku uczenia maszynowego lub analizy.

W naszym przykładowym potoku udostępnimy bazę danych PostgreSQL i załadujemy ją danymi rezerwacji z pliku an Konkurs Airbnb Kaggle.

Ten zestaw danych zawiera dwie tabele, Użytkownicy i Sesje. Sesje zawierają odwołanie do klucza obcego, user_id. Korzystając z tej relacji, stworzymy zestaw funkcji zawierających różne metryki rezerwacji zagregowane według użytkownika. Poniższy rysunek przedstawia zapytanie SQL użyte do zbudowania funkcji.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) AS max_session_time_seconds, round(min(secs_elapsed)) AS min_session_time_seconds, ( SELECT count(*) FROM session s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM session GROUP BY user_id ) SELECT u.id AS user_id, u.płeć, u.wiek, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM session_features_by_user s LEWO DOŁĄCZ użytkowników u ON u.id = s.user_id LIMIT


Zapytanie SQL jest następnie wykonywane z naszego potoku Airflow i zapisywane w pośredniej lokalizacji S3 przy użyciu następującej definicji zadania.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" z NamedTemporaryFile (mode="r+", suffix=".csv") as tmp_csv: postgres.copy_expert( f"copy ({sql_query}) to stdout with csv header", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, klucz=klucz, ) klawisz powrotu


Dane wejściowe do zadania sql_file określają, jakie zapytanie należy uruchomić w bazie danych. To zapytanie zostanie wczytane do zadania, a następnie wykonane względem bazy danych. Wyniki zapytania zostaną następnie zapisane w S3, a klucz pliku zdalnego zostanie zwrócony jako dane wyjściowe zadania.

Poniższy zrzut ekranu pokazuje przykładowy zestaw wyników zapytania wyodrębniającego z góry. W następnej sekcji opiszemy, jak utworzyć syntetyczną wersję tego zestawu danych.



Podgląd wyniku zapytania.

Syntetyzuj funkcje za pomocą interfejsów API Gretel

 
 
Aby wygenerować syntetyczną wersję każdej funkcji, musimy najpierw wytrenować model syntetyczny, a następnie uruchomić model w celu wygenerowania syntetycznych rekordów. Gretel ma zestaw SDK Pythona, który ułatwia integrację z zadaniami Airflow.

Oprócz zestawów SDK klienta Pythona utworzyliśmy plik Hak Airflow Gretel który zarządza połączeniami i sekretami Gretel API. Po skonfigurowaniu Gretel Airflow Connection połączenie z API Gretel jest tak proste, jak to tylko możliwe

z hooks.gretel import GretelHook gretel = GretelHook() projekt = gretel.get_project()


Aby uzyskać więcej informacji na temat konfigurowania połączeń Airflow, zapoznaj się z naszym repozytorium Github README.

Zmienna projektu w powyższym przykładzie może służyć jako główny punkt wejścia do trenowania i uruchamiania modeli syntetycznych przy użyciu API Gretel. Aby uzyskać więcej informacji, możesz sprawdzić nasze Dokumentacja API Pythona.

Wracając do potoku rezerwacji, przejrzymy teraz zadanie generate_synthetic_features. Ten krok jest odpowiedzialny za uczenie modelu syntetycznego przy użyciu funkcji wyodrębnionych w poprzednim zadaniu.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) return model.get_artifact_link("data_preview")


Patrząc na sygnaturę metody, zobaczysz, że pobiera ona ścieżkę data_source. Ta wartość wskazuje funkcje S3 wyodrębnione w poprzednim kroku. W dalszej części omówimy, w jaki sposób wszystkie te wejścia i wyjścia są ze sobą połączone.

Podczas tworzenia modelu za pomocą project.create_model_obj parametr model_config reprezentuje konfigurację modelu syntetycznego używaną do generowania modelu. W tym potoku używamy naszego domyślna konfiguracja modelu, ale wiele innych opcje konfiguracyjne są dostępne.

Po skonfigurowaniu modelu wywołujemy model.submit_cloud(). Spowoduje to przesłanie modelu do trenowania i generowania rekordów przy użyciu Gretel Cloud. Wywołanie poll(model) zablokuje zadanie, dopóki model nie zakończy szkolenia.

Teraz, gdy model został przeszkolony, użyjemy get_artifact_link, aby zwrócić link do pobrania wygenerowanych funkcji syntetycznych.



Podgląd danych syntetycznego zestawu cech.

 

To łącze artefaktu zostanie użyte jako dane wejściowe w ostatnim kroku upload_synthetic_features.

Załaduj funkcje syntetyczne

 
 
Oryginalne cechy zostały wyodrębnione i stworzono wersję syntetyczną. Teraz nadszedł czas, aby przesłać funkcje syntetyczne, aby mogli uzyskać do nich dostęp dalsi konsumenci. W tym przykładzie użyjemy wiadra S3 jako ostatecznego miejsca docelowego dla zestawu danych.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() with open(data_set, "rb") as synth_features: s3.load_file_obj(file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


To zadanie jest dość proste. Wartość wejściowa data_set zawiera podpisany link HTTP do pobrania syntetycznego zestawu danych z API Gretel. Zadanie odczyta ten plik do procesu roboczego Airflow, a następnie użyje już skonfigurowanego haka S3 do przesłania pliku funkcji syntetycznych do zasobnika S3, gdzie dalsi konsumenci lub modele będą mieli do niego dostęp.

Orkiestrowanie potoku

 
 
W ostatnich trzech sekcjach omówiliśmy cały kod wymagany do wyodrębnienia, syntezy i załadowania zestawu danych. Ostatnim krokiem jest powiązanie każdego z tych zadań w jeden potok przepływu powietrza.

Jeśli przypomnisz sobie początek tego posta, krótko wspomnieliśmy o koncepcji DAG. Korzystając z API TaskFlow Airflow, możemy połączyć te trzy metody Pythona w DAG, który definiuje dane wejściowe, wyjściowe i kolejność wykonywania każdego kroku.

Feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) Synthetic_data = generowanie_syntetycznych_features(feature_path) upload_synthetic_features(synthetic_data)


Jeśli podążysz ścieżką tych wywołań metod, ostatecznie otrzymasz wykres, który wygląda jak nasz oryginalny potok funkcji.



Rurociąg syntetyczny Gretel w Airflow.

 

Jeśli chcesz uruchomić ten potok i zobaczyć go w akcji, przejdź do towarzyszące repozytorium Github. Znajdziesz tam instrukcje, jak uruchomić instancję Airflow i uruchomić potok od końca do końca.

Owijanie rzeczy

 
 
Jeśli dotarłeś tak daleko, widziałeś, jak Gretel można zintegrować z potokiem danych zbudowanym na Airflow. Łącząc przyjazne dla programistów interfejsy API Gretel oraz potężny system haków i operatorów Airflow, łatwo jest budować potoki ETL, które sprawiają, że dane są bardziej dostępne i bezpieczniejsze w użyciu.

Rozmawialiśmy również o typowym przypadku użycia inżynierii funkcji, w którym wrażliwe dane mogą nie być łatwo dostępne. Generując syntetyczną wersję zestawu danych, zmniejszamy ryzyko ujawnienia jakichkolwiek wrażliwych danych, ale nadal zachowujemy użyteczność zestawu danych, jednocześnie udostępniając go tym, którzy go potrzebują.

Myśląc o potoku funkcji w bardziej abstrakcyjny sposób, mamy teraz wzorzec, który można wykorzystać w dowolnej liczbie nowych zapytań SQL. Wdrażając nową wersję potoku i zamieniając początkowe zapytanie SQL, możemy poprzedzić każde potencjalnie wrażliwe zapytanie syntetycznym zestawem danych, który chroni prywatność klientów. Jedynym wierszem kodu, który należy zmienić, jest ścieżka do pliku sql. Nie jest wymagana złożona inżynieria danych.

Dziękuje za przeczytanie

 
 
Wyślij do nas e-mail na adres cześć@gretel.ai lub dołącz do nas Slack jeśli masz jakieś pytania lub uwagi. Chętnie dowiemy się, jak korzystasz z Airflow i jak najlepiej możemy zintegrować się z Twoimi istniejącymi potokami danych.

 
Bio: Drewa Newberry’ego jest inżynierem oprogramowania w Gretel.ai.

Oryginalny. Przesłane za zgodą.

Związane z:

Źródło: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Znak czasu:

Więcej z Knuggety