Создайте конвейер синтетических данных с помощью Gretel и Apache Airflow

Исходный узел: 1068200

Создайте конвейер синтетических данных с помощью Gretel и Apache Airflow

В этом сообщении блога мы создаем конвейер ETL, который генерирует синтетические данные из базы данных PostgreSQL, используя API синтетических данных Gretel и Apache Airflow.


By Дрю Ньюберри, Инженер-программист Gretel.ai

Создайте конвейер синтетических данных с помощью Gretel и Apache Airflow

Привет, ребята, меня зовут Дрю, я программист в Gretel. Недавно я подумал о шаблонах для интеграции API-интерфейсов Gretel в существующие инструменты, чтобы можно было легко создавать конвейеры данных, в которых безопасность и конфиденциальность клиентов являются первоклассными функциями, а не просто второстепенной мыслью или флажком для проверки.

Одним из инструментов проектирования данных, который пользуется популярностью среди инженеров и заказчиков Gretel, является Apache Airflow. Это также прекрасно работает с Гретель. В этом сообщении блога мы покажем вам, как построить конвейер синтетических данных с помощью Airflow, Gretel и PostgreSQL. Давайте прыгнем!

Что такое воздушный поток

 
 
Воздушный поток - это инструмент автоматизации рабочего процесса, обычно используемый для построения конвейеров данных. Он позволяет инженерам данных или специалистам по обработке данных программно определять и развертывать эти конвейеры с использованием Python и других знакомых конструкций. В основе Airflow лежит концепция DAG или ориентированного ациклического графа. Airflow DAG предоставляет модель и набор API для определения компонентов конвейера, их зависимостей и порядка выполнения.

Вы можете обнаружить, что конвейеры Airflow реплицируют данные из базы данных продукта в хранилище данных. Другие конвейеры могут выполнять запросы, которые объединяют нормализованные данные в единый набор данных, подходящий для аналитики или моделирования. Еще один конвейер может публиковать ежедневный отчет, объединяющий ключевые бизнес-показатели. Общая тема, разделяемая этими вариантами использования: координация перемещения данных между системами. Вот где сияет Airflow.

Использование Airflow и его богатой экосистемы интеграций, специалисты по обработке данных и ученые могут объединить любое количество разрозненных инструментов или сервисов в единый унифицированный конвейер, который легко поддерживать и эксплуатировать. Понимая эти возможности интеграции, мы теперь начнем говорить о том, как Gretel можно интегрировать в конвейер Airflow для улучшения общих рабочих процессов обработки данных.

Какое место занимает Гретель?

 
 
В Gretel наша миссия - сделать работу с данными проще и безопаснее. Говоря с клиентами, мы часто слышим о проблеме, о которой мы часто слышим, - это время и усилия, необходимые для того, чтобы специалисты по обработке данных получили доступ к конфиденциальным данным. С использованием Гретель Синтетика, мы можем снизить риск работы с конфиденциальными данными, создав синтетическую копию набора данных. Интегрируя Gretel с Airflow, можно создавать конвейеры самообслуживания, которые позволяют специалистам по обработке данных быстро получать необходимые данные, не требуя от специалиста по обработке данных для каждого нового запроса данных.

Чтобы продемонстрировать эти возможности, мы создадим конвейер ETL, который извлекает функции пользовательской активности из базы данных, генерирует синтетическую версию набора данных и сохраняет набор данных в S3. Синтетический набор данных, сохраненный в S3, может быть использован специалистами по обработке данных для последующего моделирования или анализа без ущерба для конфиденциальности клиентов.

Для начала давайте сначала взглянем на трубопровод с высоты птичьего полета. Каждый узел на этой диаграмме представляет собой этап конвейера или «задачу» в терминах Airflow.



Пример синтетического трубопровода Gretel на Airflow.

 

Мы можем разбить конвейер на 3 этапа, аналогично тому, что вы можете найти в конвейере ETL:

  • Выписка - Задача extract_features запрашивает базу данных и преобразует данные в набор функций, которые могут использоваться специалистами по обработке данных для построения моделей.
  • Синтезировать - generate_synthetic_features примет извлеченные функции в качестве входных данных, обучит синтетическую модель, а затем сгенерирует синтетический набор функций с использованием API-интерфейсов Gretel и облачных сервисов.
  • нагрузка - upload_synthetic_features сохраняет синтетический набор функций в S3, где его можно использовать в любой последующей модели или анализе.

В следующих нескольких разделах мы более подробно рассмотрим каждый из этих трех шагов. Если вы хотите следить за каждым примером кода, вы можете перейти к gretelai / gretel-airflow-трубопроводы и загрузите весь код, использованный в этом сообщении в блоге. Репо также содержит инструкции, которым вы можете следовать, чтобы запустить экземпляр Airflow и запустить конвейер от начала до конца.

Кроме того, может быть полезно просмотреть конвейер воздушного потока целиком, прежде чем мы рассмотрим каждый компонент, dags / airbnb_user_bookings.py. Фрагменты кода в следующих разделах извлекаются из связанного конвейера бронирования пользователей.

Особенности извлечения

 
 
Первая задача, extract_features, отвечает за извлечение необработанных данных из исходной базы данных и преобразование их в набор функций. Это обычное особенность техники проблема, которую вы можете найти в любом конвейере машинного обучения или аналитики.

В нашем примере конвейера мы подготовим базу данных PostgreSQL и загрузим в нее данные бронирования из Конкурс Airbnb Kaggle.

Этот набор данных содержит две таблицы: Пользователи и Сеансы. Сеансы содержат ссылку на внешний ключ user_id. Используя это отношение, мы создадим набор функций, содержащий различные показатели бронирования, агрегированные по пользователям. На следующем рисунке представлен SQL-запрос, используемый для создания функций.

WITH session_features_by_user AS (SELECT user_id, count (*) AS number_of_action_taken, count (DISTINCT action_type) AS number_of_unique_actions, round (avg (secs_elapsed)) AS avg_session_time_seconds, round (max (secs_minelapsed)) AS_aped_seconds min_session_time_seconds, (SELECT count (*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_account_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_time_seconds, s.max_session_time_seconds от LIMOMTuser_features от LIMOMTuser.INSUINS


Затем SQL-запрос выполняется из нашего конвейера Airflow и записывается в промежуточное расположение S3 с использованием следующего определения задачи.

@task () def extract_features (sql_file: str) -> str: context = get_current_context () sql_query = Path (sql_file) .read_text () key = f "{context ['dag_run']. run_id} _booking_features.csv" с NamedTemporaryFile (mode = "r +", suffix = ". csv") as tmp_csv: postgres.copy_expert (f "copy ({sql_query}) to stdout with csv header", tmp_csv.name) s3.load_file (filename = tmp_csv.name, ключ = ключ,) ключ возврата


Входные данные для задачи sql_file определяют, какой запрос выполнять в базе данных. Этот запрос будет считан в задаче, а затем выполнен в базе данных. Затем результаты запроса будут записаны в S3, а ключ удаленного файла будет возвращен как результат выполнения задачи.

На снимке экрана ниже показан пример набора результатов запроса на извлечение сверху. В следующем разделе мы опишем, как создать синтетическую версию этого набора данных.



Предварительный просмотр результатов запроса.

Синтезируйте функции с помощью API-интерфейсов Gretel

 
 
Чтобы создать синтетическую версию каждой функции, мы должны сначала обучить синтетическую модель, а затем запустить модель для создания синтетических записей. У Gretel есть набор SDK для Python, которые упрощают интеграцию с задачами Airflow.

В дополнение к клиентским SDK Python мы создали Крючок Gretel Airflow который управляет соединениями и секретами API Gretel. После настройки Gretel Airflow Connection подключиться к Gretel API так же просто, как

from hooks.gretel import GretelHook gretel = GretelHook () project = gretel.get_project ()


Для получения дополнительной информации о настройке соединений Airflow, пожалуйста, обратитесь к нашему репозиторию Github. README.

Переменная проекта в приведенном выше примере может использоваться в качестве основной точки входа для обучения и запуска синтетических моделей с использованием API Gretel. Для получения более подробной информации вы можете ознакомиться с нашими Документы по API Python.

Возвращаясь к конвейеру бронирования, мы рассмотрим задачу generate_synthetic_features. Этот шаг отвечает за обучение синтетической модели с использованием функций, извлеченных в предыдущей задаче.

@task () def generate_synthetic_features (data_source: str) -> str: project = gretel.get_project () model = project.create_model_obj (model_config = "synthetics / default", data_source = s3.download_file (data_source)) model.submit_cloud () опрос (модель) return model.get_artifact_link ("data_preview")


Посмотрев на подпись метода, вы увидите, что он принимает путь data_source. Это значение указывает на функции S3, извлеченные на предыдущем шаге. В следующем разделе мы рассмотрим, как все эти входы и выходы соединены вместе.

При создании модели с использованием project.create_model_obj параметр model_config представляет конфигурацию синтетической модели, используемую для создания модели. В этом конвейере мы используем наши конфигурация модели по умолчанию, но многие другие параметры конфигурации имеются.

После настройки модели мы вызываем model.submit_cloud (). Это отправит модель для обучения и создания записи с помощью Gretel Cloud. Вызов опроса (модели) заблокирует задачу до тех пор, пока модель не завершит обучение.

Теперь, когда модель обучена, мы будем использовать get_artifact_link, чтобы вернуть ссылку для загрузки сгенерированных синтетических функций.



Предварительный просмотр данных синтетического набора функций.

 

Эта ссылка на артефакт будет использоваться в качестве входных данных для последнего шага upload_synthetic_features.

Загрузить синтетические элементы

 
 
Исходные черты были извлечены, и была создана синтетическая версия. Теперь пришло время загрузить синтетические функции, чтобы они могли быть доступны нижестоящим потребителям. В этом примере мы собираемся использовать корзину S3 в качестве конечного пункта назначения для набора данных.

@task () def upload_synthetic_features (data_set: str): context = get_current_context () с open (data_set, "rb") как synth_features: s3.load_file_obj (file_obj = synth_features, key = f "{..." booking_features_synthetic.cs )


Это довольно простая задача. Входное значение data_set содержит подписанную HTTP-ссылку для загрузки синтетического набора данных из API Gretel. Задача считывает этот файл в Airflow worker, а затем использует уже настроенную ловушку S3 для загрузки синтетического файла функций в корзину S3, где нижестоящие потребители или модели могут получить к нему доступ.

Организация конвейера

 
 
В последних трех разделах мы рассмотрели весь код, необходимый для извлечения, синтеза и загрузки набора данных. Последний шаг - связать каждую из этих задач в один конвейер Airflow.

Если вы вспомните начало этого поста, мы кратко упомянули концепцию DAG. Используя API-интерфейс Airflow TaskFlow, мы можем объединить эти три метода Python в группу DAG, которая определяет входные и выходные данные и порядок выполнения каждого шага.

feature_path = extract_features ("/opt/airflow/dags/sql/session_rollups__by_user.sql") Synthetic_data = generate_synthetic_features (feature_path) upload_synthetic_features (синтетические_данные)


Если вы проследуете путь этих вызовов методов, вы в конечном итоге получите график, похожий на наш исходный конвейер функций.



Трубопровод синтетики Gretel от Airflow.

 

Если вы хотите запустить этот конвейер и увидеть его в действии, перейдите к сопровождающий репозиторий Github. Там вы найдете инструкции о том, как запустить экземпляр Airflow и запустить конвейер от начала до конца.

Обертывание вещей

 
 
Если вы зашли так далеко, то уже видели, как Gretel можно интегрировать в конвейер данных, построенный на Airflow. Комбинируя удобные для разработчиков API-интерфейсы Gretel и мощную систему хуков и операторов Airflow, легко создавать конвейеры ETL, которые делают данные более доступными и безопасными в использовании.

Мы также говорили о типичном варианте использования при проектировании функций, когда доступ к конфиденциальным данным может быть затруднен. Создавая синтетическую версию набора данных, мы снижаем риск раскрытия любых конфиденциальных данных, но при этом сохраняем полезность набора данных, делая его быстро доступным для тех, кто в нем нуждается.

Если рассматривать конвейер функций в более абстрактных терминах, то теперь у нас есть шаблон, который можно использовать для любого количества новых SQL-запросов. Развернув новую версию конвейера и заменив исходный SQL-запрос, мы можем передать любой потенциально конфиденциальный запрос синтетическим набором данных, сохраняющим конфиденциальность клиентов. Единственная строка кода, которую необходимо изменить, - это путь к файлу sql. Не требуется сложной инженерии данных.

Спасибо за прочтение

 
 
Отправить нам письмо по адресу привет@gretel.ai или присоединяйся к нам Вялый если у вас есть вопросы или комментарии. Мы хотели бы услышать, как вы используете Airflow и как мы можем наилучшим образом интегрироваться с вашими существующими конвейерами данных.

 
Bio: Дрю Ньюберри является инженером-программистом в Gretel.ai.

Оригинал, Перемещено с разрешения.

Связанный:

Источник: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Отметка времени:

Больше от КДнаггетс