Створіть синтетичний конвеєр даних за допомогою Gretel та Apache Airflow

Вихідний вузол: 1068200

Створіть синтетичний конвеєр даних за допомогою Gretel та Apache Airflow

У цій публікації блогу ми створюємо конвеєр ETL, який генерує синтетичні дані з бази даних PostgreSQL за допомогою API синтетичних даних Gretel і Apache Airflow.


By Дрю Ньюберрі, інженер-програміст Gretel.ai

Створіть синтетичний конвеєр даних за допомогою Gretel та Apache Airflow

Привіт, друзі, мене звуть Дрю, і я інженер-програміст у Gretel. Нещодавно я думав про шаблони для інтеграції Gretel API в існуючі інструменти, щоб було легко створювати конвеєри даних, у яких безпека та конфіденційність клієнтів є першокласними функціями, а не лише запізнілою думкою чи полем для перевірки.

Одним із популярних серед інженерів і клієнтів Gretel інструментів розробки даних є Apache Airflow. Він також чудово працює з Гретель. У цій публікації блогу ми покажемо вам, як створити синтетичний конвеєр даних за допомогою Airflow, Gretel і PostgreSQL. Давайте стрибати!

Що таке повітряний потік

 
 
Повітряний потік це інструмент автоматизації робочого процесу, який зазвичай використовується для створення конвеєрів даних. Це дає змогу розробникам даних або дослідникам даних програмно визначати та розгортати ці конвеєри за допомогою Python та інших знайомих конструкцій. В основі Airflow лежить концепція DAG, або орієнтований ациклічний граф. Airflow DAG надає модель і набір API для визначення компонентів конвеєра, їхніх залежностей і порядку виконання.

Ви можете виявити конвеєри Airflow, які копіюють дані з бази даних продуктів у сховище даних. Інші конвеєри можуть виконувати запити, які об’єднують нормалізовані дані в єдиний набір даних, придатний для аналітики чи моделювання. Ще один конвеєр може публікувати щоденний звіт із сукупними ключовими показниками бізнесу. Спільна тема для цих випадків використання: координація руху даних між системами. Ось де сяє Airflow.

Використовуючи Airflow і його багату екосистему інтеграцій, інженери з обробки даних і вчені можуть організовувати будь-яку кількість різнорідних інструментів або служб в єдиний уніфікований конвеєр, який простий в обслуговуванні та експлуатації. Розуміючи ці можливості інтеграції, ми зараз почнемо говорити про те, як Gretel можна інтегрувати в конвеєр Airflow для покращення загальних робочих процесів обробки даних.

Як Гретель вписується?

 
 
Наша місія Gretel — зробити роботу з даними легшою та безпечнішою. Розмовляючи з клієнтами, ми часто чуємо одну проблему, про яку ми часто чуємо, це час і зусилля, необхідні для того, щоб спеціалісти з обробки даних отримали доступ до конфіденційних даних. Використання Гретель Синтетика, ми можемо зменшити ризик роботи з конфіденційними даними, створивши синтетичну копію набору даних. Завдяки інтеграції Gretel з Airflow можна створити конвеєри самообслуговування, які спрощують науковців швидко отримувати потрібні дані, не вимагаючи інженера даних для кожного нового запиту даних.

Щоб продемонструвати ці можливості, ми створимо конвеєр ETL, який витягує функції активності користувача з бази даних, генерує синтетичну версію набору даних і зберігає набір даних у S3. Завдяки синтетичному набору даних, збереженому в S3, його потім можуть використовувати спеціалісти з обробки даних для подальшого моделювання або аналізу без шкоди для конфіденційності клієнтів.

Щоб почати, давайте спочатку подивимося на трубопровід з висоти пташиного польоту. Кожен вузол на цій діаграмі представляє крок конвеєра або «завдання» в термінах Airflow.



Приклад синтетичного трубопроводу Gretel на Airflow.

 

Ми можемо розбити конвеєр на 3 етапи, подібно до того, що ви можете знайти в конвеєрі ETL:

  • Витяг – Завдання extract_features надсилатиме запит до бази даних і перетворюватиме дані в набір функцій, які можуть використовуватися дослідниками даних для створення моделей.
  • Синтезувати – generate_synthetic_features візьме витягнуті функції як вхідні дані, навчить синтетичну модель, а потім згенерує синтетичний набір функцій за допомогою Gretel API і хмарних служб.
  • TheНавантаження – upload_synthetic_features зберігає синтетичний набір функцій у S3, де його можна ввести в будь-яку наступну модель або аналіз.

У наступних кількох розділах ми детальніше розглянемо кожен із цих трьох кроків. Якщо ви бажаєте ознайомитися з кожним зразком коду, ви можете перейти до gretelai/gretel-airflow-pipelines і завантажте весь код, використаний у цій публікації блогу. Сховище також містить інструкції, дотримуючись яких можна запустити екземпляр Airflow і запустити конвеєр від кінця до кінця.

Крім того, може бути корисно переглянути трубопровід повітряного потоку повністю, перш ніж розбирати кожен компонент, dags/airbnb_user_bookings.py. Фрагменти коду в наступних розділах витягнуті зі зв’язаного каналу бронювання користувачів.

Функції вилучення

 
 
Перше завдання, extract_features, відповідає за вилучення необроблених даних із вихідної бази даних і перетворення їх у набір функцій. Це поширене явище інженерія функцій проблема, яку ви можете знайти в будь-якому конвеєрі машинного навчання чи аналітики.

У нашому прикладі конвеєра ми підготуємо базу даних PostgreSQL і завантажимо в неї дані бронювання з Конкурс Airbnb Kaggle.

Цей набір даних містить дві таблиці «Користувачі» та «Сеанси». Сесії містять посилання на зовнішній ключ user_id. Використовуючи цей зв’язок, ми створимо набір функцій, що містить різні показники бронювання, агреговані за користувачами. На наступному малюнку показано SQL-запит, який використовується для створення функцій.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) AS max_session_time_seconds, round(mined(secs_elapsed) AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM session_features_by_user s LEFT JOIN users u ON u.id = s.user_id5000 XNUMX


Потім SQL-запит виконується з нашого конвеєра Airflow і записується в проміжне розташування S3 за допомогою наступного визначення завдання.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" з NamedTemporaryFile (mode="r+", suffix=".csv") як tmp_csv: postgres.copy_expert( f"копіювати ({sql_query}) до stdout із заголовком csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, ключ=ключ, ) повернути ключ


Вхідні дані завдання, sql_file, визначають, який запит виконувати в базі даних. Цей запит буде зчитано до завдання, а потім виконано для бази даних. Потім результати запиту будуть записані в S3, а ключ віддаленого файлу буде повернено як результат завдання.

На знімку екрана нижче показано зразок набору результатів запиту на вилучення згори. У наступному розділі ми опишемо, як створити синтетичну версію цього набору даних.



Попередній перегляд результатів запиту.

Синтезуйте функції за допомогою Gretel API

 
 
Щоб створити синтетичну версію кожної функції, ми повинні спочатку навчити синтетичну модель, а потім запустити модель для створення синтетичних записів. Gretel має набір Python SDK, який полегшує інтеграцію в завдання Airflow.

На додаток до Python Client SDK, ми створили a Гачок Gretel Airflow який керує підключеннями та секретами Gretel API. Після налаштування Gretel Airflow Connection підключитися до Gretel API так само просто

from hooks.gretel import GretelHook gretel = GretelHook() project = gretel.get_project()


Щоб дізнатися більше про те, як налаштувати підключення Airflow, зверніться до нашого репозиторію Github README.

Змінну проекту в наведеному вище прикладі можна використовувати як основну точку входу для навчання та запуску синтетичних моделей за допомогою API Gretel. Для отримання додаткової інформації ви можете перевірити наш Документація Python API.

Повертаючись до конвеєра бронювання, зараз ми розглянемо завдання generate_synthetic_features. Цей крок відповідає за навчання синтетичної моделі з використанням функцій, отриманих у попередньому завданні.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) повертає model.get_artifact_link("data_preview")


Дивлячись на сигнатуру методу, ви побачите, що він використовує шлях, data_source. Це значення вказує на функції S3, отримані на попередньому кроці. У наступному розділі ми розповімо, як усі ці входи та виходи з’єднані разом.

Під час створення моделі за допомогою project.create_model_obj параметр model_config представляє конфігурацію синтетичної моделі, яка використовується для створення моделі. У цьому конвеєрі ми використовуємо наші конфігурація моделі за замовчуванням, але багато інших параметри конфігурації маються.

Після налаштування моделі ми викликаємо model.submit_cloud(). Це надішле модель для навчання та створення записів за допомогою Gretel Cloud. Виклик опитування (моделі) заблокує завдання, доки модель не завершить навчання.

Тепер, коли модель навчена, ми використаємо get_artifact_link для повернення посилання для завантаження згенерованих синтетичних функцій.



Попередній перегляд даних синтетичного набору функцій.

 

Це посилання на артефакт буде використано як вхідні дані для останнього кроку upload_synthetic_features.

Завантажити синтетичні функції

 
 
Було вилучено оригінальні функції та створено синтетичну версію. Тепер настав час завантажити синтетичні функції, щоб до них могли отримати доступ наступні споживачі. У цьому прикладі ми будемо використовувати сегмент S3 як кінцевий пункт призначення для набору даних.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() with open(data_set, "rb") as synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Це завдання досить просте. Вхідне значення data_set містить підписане посилання HTTP для завантаження синтетичного набору даних з API Gretel. Завдання зчитує цей файл у Worker Airflow, а потім використовує вже налаштований хук S3, щоб завантажити синтетичний файл функції до сегмента S3, де користувачі або моделі можуть отримати до нього доступ.

Оркестрування трубопроводу

 
 
Протягом останніх трьох розділів ми розглянули весь код, необхідний для вилучення, синтезу та завантаження набору даних. Останній крок — об’єднати кожне з цих завдань в єдиний конвеєр Airflow.

Якщо ви пам’ятаєте, на початку цієї публікації ми коротко згадували концепцію DAG. Використовуючи API TaskFlow Airflow, ми можемо скомпонувати ці три методи Python у DAG, який визначає входи, виходи та порядок виконання кожного кроку.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql") synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Якщо ви дотримуєтеся шляху цих викликів методів, ви зрештою отримаєте графік, який виглядає як наш оригінальний конвеєр функцій.



Синтетичний трубопровід Gretel на Airflow.

 

Якщо ви хочете запустити цей конвеєр і побачити його в дії, перейдіть до супроводжує репозиторій Github. Там ви знайдете інструкції щодо того, як запустити екземпляр Airflow і запустити конвеєр від кінця до кінця.

Обгортання речей

 
 
Якщо ви зайшли так далеко, ви бачили, як Gretel можна інтегрувати в конвеєр даних, побудований на Airflow. Поєднуючи зручні для розробників API Gretel і потужну систему перехоплювачів і операторів Airflow, можна легко створювати конвеєри ETL, які роблять дані доступнішими та безпечнішими для використання.

Ми також говорили про загальний варіант використання інженерних функцій, коли конфіденційні дані можуть бути недоступними. Створюючи синтетичну версію набору даних, ми зменшуємо ризик розголошення будь-яких конфіденційних даних, але зберігаємо корисність набору даних, роблячи його швидко доступним для тих, хто його потребує.

Розглядаючи конвеєр функцій у більш абстрактних термінах, ми тепер маємо шаблон, який можна перепрофілювати для будь-якої кількості нових запитів SQL. Розгорнувши нову версію конвеєра та замінивши початковий запит SQL, ми можемо передати будь-який потенційно конфіденційний запит синтетичним набором даних, який зберігає конфіденційність клієнтів. Єдиний рядок коду, який потрібно змінити, це шлях до файлу sql. Не потрібна складна інженерія даних.

Дякую за читання

 
 
Надішліть нам електронний лист за адресою hi@gretel.ai або приєднуйтеся до нас Млявий якщо у вас є запитання чи коментарі. Ми хотіли б почути, як ви використовуєте Airflow і як ми можемо найкраще інтегрувати ваші існуючі канали даних.

 
Біо: Дрю Ньюберрі є інженером-програмістом у Gretel.ai.

Оригінал. Повідомлено з дозволу.

За темою:

Джерело: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Часова мітка:

Більше від KDnuggets