با استفاده از Gretel و Apache Airflow یک خط لوله داده مصنوعی بسازید

گره منبع: 1068200

با استفاده از Gretel و Apache Airflow یک خط لوله داده مصنوعی بسازید

در این پست وبلاگ، ما یک خط لوله ETL ایجاد می کنیم که داده های مصنوعی را از پایگاه داده PostgreSQL با استفاده از APIهای داده مصنوعی Gretel و Apache Airflow تولید می کند.


By درو نیوبری، مهندس نرم افزار در Gretel.ai

با استفاده از Gretel و Apache Airflow یک خط لوله داده مصنوعی بسازید

سلام دوستان، اسم من درو است و من یک مهندس نرم افزار اینجا در گرتل هستم. من اخیراً به الگوهایی برای ادغام APIهای گرتل در ابزارهای موجود فکر کرده‌ام، به‌گونه‌ای که ساخت خطوط لوله داده‌ای که در آن امنیت و حریم خصوصی مشتری ویژگی‌های درجه یک هستند، آسان باشد، نه صرفاً یک فکر یا جعبه برای بررسی.

یکی از ابزارهای مهندسی داده که در میان مهندسان و مشتریان گرتل محبوب است، Apache Airflow است. همچنین اتفاقاً با گرتل عالی کار می کند. در این پست وبلاگ، ما به شما نشان خواهیم داد که چگونه با استفاده از Airflow، Gretel و PostgreSQL یک خط لوله داده مصنوعی بسازید. بیایید بپریم داخل!

جریان هوا چیست؟

 
 
جریان هوا یک ابزار اتوماسیون گردش کار است که معمولاً برای ساخت خطوط لوله داده استفاده می شود. مهندسان داده یا دانشمندان داده را قادر می سازد تا به صورت برنامه نویسی این خطوط لوله را با استفاده از پایتون و سایر سازه های آشنا تعریف و مستقر کنند. در هسته جریان هوا مفهوم DAG یا گراف غیر چرخه ای جهت دار وجود دارد. Airflow DAG مدل و مجموعه ای از APIها را برای تعریف اجزای خط لوله، وابستگی های آنها و ترتیب اجرا ارائه می دهد.

ممکن است خطوط لوله جریان هوا را پیدا کنید که داده ها را از یک پایگاه داده محصول در یک انبار داده تکثیر می کنند. خطوط لوله دیگر ممکن است پرس و جوهایی را اجرا کنند که داده های نرمال شده را به یک مجموعه داده واحد مناسب برای تجزیه و تحلیل یا مدل سازی می پیوندند. با این حال، خط لوله دیگری ممکن است گزارش روزانه ای را منتشر کند که معیارهای کلیدی کسب و کار را جمع آوری می کند. موضوع مشترکی که در بین این موارد استفاده مشترک است: هماهنگ کردن حرکت داده ها در سیستم ها. اینجاست که Airflow می درخشد.

استفاده از جریان هوا و اکوسیستم غنی آن یکپارچگیمهندسان داده و دانشمندان می توانند هر تعداد ابزار یا خدمات متفاوت را در یک خط لوله واحد که نگهداری و کارکرد آن آسان است، هماهنگ کنند. با درک این قابلیت‌های یکپارچه‌سازی، اکنون شروع به صحبت در مورد چگونگی ادغام گرتل در خط لوله Airflow برای بهبود جریان‌های کاری رایج داده‌ها خواهیم کرد.

گرتل چگونه جا می افتد؟

 
 
در گرتل، مأموریت ما این است که کار با داده‌ها را آسان‌تر و ایمن‌تر کنیم. در صحبت با مشتریان، یکی از نکات دردناکی که اغلب در مورد آن می شنویم، زمان و تلاش لازم برای دسترسی دانشمندان داده به داده های حساس است. استفاده كردن گرتل سنتتیک، می توانیم با ایجاد یک کپی مصنوعی از مجموعه داده، خطر کار با داده های حساس را کاهش دهیم. با ادغام گرتل با Airflow، امکان ایجاد خطوط لوله خود خدمتی وجود دارد که به دانشمندان داده آسان می‌شود تا به سرعت داده‌های مورد نیاز خود را بدون نیاز به مهندس داده برای هر درخواست داده جدید دریافت کنند.

برای نشان دادن این قابلیت‌ها، یک خط لوله ETL ایجاد می‌کنیم که ویژگی‌های فعالیت کاربر را از پایگاه داده استخراج می‌کند، یک نسخه مصنوعی از مجموعه داده تولید می‌کند و مجموعه داده را در S3 ذخیره می‌کند. با مجموعه داده مصنوعی ذخیره شده در S3، می‌توان از آن توسط دانشمندان داده برای مدل‌سازی یا تحلیل پایین‌دستی بدون به خطر انداختن حریم خصوصی مشتری استفاده کرد.

برای شروع کار، اجازه دهید ابتدا یک نمای پرنده از خط لوله بیندازیم. هر گره در این نمودار یک مرحله خط لوله یا "وظیفه" را در اصطلاح جریان هوا نشان می دهد.



نمونه خط لوله سنتتیک گرتل در جریان هوا.

 

ما می توانیم خط لوله را به 3 مرحله تقسیم کنیم، مشابه آنچه ممکن است در خط لوله ETL پیدا کنید:

  • عصاره – وظیفه extract_features یک پایگاه داده را پرس و جو می کند و داده ها را به مجموعه ای از ویژگی هایی تبدیل می کند که می تواند توسط دانشمندان داده برای ساخت مدل ها استفاده شود.
  • ترکیب کردن – generate_synthetic_features ویژگی های استخراج شده را به عنوان ورودی دریافت می کند، یک مدل مصنوعی را آموزش می دهد و سپس مجموعه ای ترکیبی از ویژگی ها را با استفاده از API های گرتل و خدمات ابری تولید می کند.
  • theبار – upload_synthetic_features مجموعه ترکیبی ویژگی‌ها را در S3 ذخیره می‌کند، جایی که می‌توان آن را در هر مدل یا تحلیل پایین‌دستی وارد کرد.

در چند بخش بعدی با جزئیات بیشتر به هر یک از این سه مرحله خواهیم پرداخت. اگر می‌خواهید هر کد کد را دنبال کنید، می‌توانید به ادامه مطلب بروید خطوط لوله gretelai/gretel-airflow و تمام کدهای استفاده شده در این پست وبلاگ را دانلود کنید. مخزن همچنین حاوی دستورالعمل هایی است که می توانید برای شروع یک نمونه جریان هوا و اجرای خط لوله از انتها به انتها دنبال کنید.

علاوه بر این، قبل از تجزیه هر جزء، مشاهده کامل خط لوله جریان هوا ممکن است مفید باشد. dags/airbnb_user_bookings.py. قطعه کد در بخش‌های زیر از خط لوله رزرو کاربر مرتبط استخراج می‌شود.

استخراج ویژگی ها

 
 
اولین وظیفه، extract_features است که وظیفه استخراج داده های خام از پایگاه داده منبع و تبدیل آن به مجموعه ای از ویژگی ها را بر عهده دارد. این یک امر رایج است مهندسی ویژگی مشکلی که ممکن است در هر خط لوله یادگیری ماشینی یا تجزیه و تحلیل پیدا کنید.

در خط لوله مثال خود ما یک پایگاه داده PostgreSQL ارائه می کنیم و آن را با داده های رزرو از یک بارگذاری می کنیم مسابقه Airbnb Kaggle.

این مجموعه داده شامل دو جدول Users و Sessions است. Sessions حاوی یک مرجع کلید خارجی، user_id است. با استفاده از این رابطه، مجموعه‌ای از ویژگی‌های حاوی معیارهای رزرو مختلف را که توسط کاربر جمع‌آوری شده‌اند ایجاد می‌کنیم. شکل زیر پرس و جوی SQL مورد استفاده برای ساخت ویژگی ها را نشان می دهد.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count (DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds(s)seelasession_time_seconds(s)pselassion_time_seconds (s)xmine min_session_time_seconds، ( SELECT count(*) FROM sessions WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u. .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds, s.max_session_time_seconds, u.max_session_time_seconds, s.max_session_time_seconds, s.max_session_time_seconds, u.max_session_time_seconds, s.max_session_time_seconds, u.max_session_time_seconds, u.max_session_time_seconds


پرس و جوی SQL سپس از خط لوله جریان هوا ما اجرا می شود و با استفاده از تعریف وظیفه زیر در یک مکان S3 میانی نوشته می شود.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_porarylemedcTeatures. (mode="r+"، پسوند=".csv") به عنوان tmp_csv: postgres.copy_expert(f"copy ({sql_query}) to stdout با هدر csv، tmp_csv.name) s3.load_file( filename=tmp_csv.name, کلید = کلید، ) کلید بازگشت


ورودی وظیفه، sql_file، تعیین می کند که چه کوئری در پایگاه داده اجرا شود. این کوئری در وظیفه خوانده می شود و سپس در پایگاه داده اجرا می شود. سپس نتایج پرس و جو در S3 نوشته می شود و کلید فایل راه دور به عنوان خروجی کار برگردانده می شود.

اسکرین شات زیر یک مجموعه نتیجه نمونه از کوئری استخراج را از بالا نشان می دهد. در بخش بعدی نحوه ایجاد یک نسخه مصنوعی از این مجموعه داده را شرح خواهیم داد.



پیش نمایش نتیجه پرس و جو

ترکیب ویژگی ها با استفاده از گرتل API

 
 
برای تولید یک نسخه مصنوعی از هر ویژگی، ابتدا باید یک مدل مصنوعی را آموزش دهیم و سپس مدل را برای تولید رکوردهای مصنوعی اجرا کنیم. گرتل دارای مجموعه ای از SDK های پایتون است که ادغام آن را در وظایف Airflow آسان می کند.

ما علاوه بر SDK های کلاینت پایتون، یک را ایجاد کرده ایم گرتل قلاب جریان هوا که ارتباطات و اسرار Gretel API را مدیریت می کند. پس از راه اندازی Gretel Airflow Connection، اتصال به Gretel API به همین سادگی است

از واردات hooks.gretel GretelHook gretel = پروژه GretelHook() = gretel.get_project()


برای اطلاعات بیشتر در مورد نحوه پیکربندی اتصالات جریان هوا، لطفاً به مخزن Github ما مراجعه کنید README.

متغیر پروژه در مثال بالا می تواند به عنوان نقطه ورود اصلی برای آموزش و اجرای مدل های مصنوعی با استفاده از API گرتل استفاده شود. برای جزئیات بیشتر، می توانید ما را بررسی کنید اسناد API پایتون.

با اشاره به خط لوله رزرو، اکنون وظیفهgene_synthetic_features را بررسی می کنیم. این مرحله وظیفه آموزش مدل مصنوعی با استفاده از ویژگی های استخراج شده در کار قبلی را بر عهده دارد.

@task() defgene_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) cloud()submit_ poll(model) return model.get_artifact_link("data_preview")


با نگاه کردن به امضای متد، خواهید دید که مسیری را طی می کند، data_source. این مقدار به ویژگی های S3 استخراج شده در مرحله قبل اشاره دارد. در بخش بعدی نحوه اتصال همه این ورودی‌ها و خروجی‌ها با هم را بررسی خواهیم کرد.

هنگام ایجاد مدل با استفاده از project.create_model_obj، پارامتر model_config پیکربندی مدل مصنوعی مورد استفاده برای تولید مدل را نشان می‌دهد. در این خط لوله، ما از خود استفاده می کنیم پیکربندی مدل پیش فرض، اما بسیاری دیگر گزینه های پیکربندی در دسترس هستند.

پس از پیکربندی مدل، مدل () model.submit_cloud را فراخوانی می کنیم. این مدل را برای آموزش و تولید رکورد با استفاده از Gretel Cloud ارسال می کند. فراخوانی نظرسنجی (مدل) تا زمانی که مدل آموزش را کامل نکند، کار را مسدود می کند.

اکنون که مدل آموزش داده شده است، از get_artifact_link برای بازگرداندن پیوندی برای دانلود ویژگی های مصنوعی تولید شده استفاده می کنیم.



پیش نمایش داده از مجموعه مصنوعی ویژگی ها.

 

این پیوند مصنوع به عنوان ورودی برای مرحله نهایی upload_synthetic_features استفاده خواهد شد.

بارگذاری ویژگی های مصنوعی

 
 
ویژگی های اصلی استخراج شده است و یک نسخه مصنوعی ایجاد شده است. اکنون زمان آپلود ویژگی های مصنوعی فرا رسیده است تا مصرف کنندگان پایین دستی بتوانند به آنها دسترسی داشته باشند. در این مثال، ما از یک سطل S3 به عنوان مقصد نهایی مجموعه داده استفاده می کنیم.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() با open(data_set, "rb") به عنوان synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features, "rb. )


این کار بسیار ساده است. مقدار ورودی data_set حاوی یک پیوند HTTP امضا شده برای دانلود مجموعه داده مصنوعی از API گرتل است. این کار آن فایل را در Airflow worker می خواند و سپس از قلاب S3 که از قبل پیکربندی شده است برای آپلود فایل ویژگی مصنوعی در یک سطل S3 استفاده می کند، جایی که مصرف کنندگان یا مدل های پایین دستی می توانند به آن دسترسی داشته باشند.

هماهنگ کردن خط لوله

 
 
در طول سه بخش گذشته، همه کدهای مورد نیاز برای استخراج، ترکیب و بارگذاری یک مجموعه داده را بررسی کرده‌ایم. آخرین مرحله این است که هر یک از این وظایف را در یک خط لوله جریان هوا به هم متصل کنید.

اگر به ابتدای این پست به خاطر بیاورید، به طور خلاصه به مفهوم DAG اشاره کردیم. با استفاده از API TaskFlow Airflow، می‌توانیم این سه روش پایتون را در یک DAG بسازیم که ورودی‌ها، خروجی‌ها و ترتیب اجرای هر مرحله را تعریف می‌کند.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data =gene_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


اگر مسیر این فراخوانی متدها را دنبال کنید، در نهایت نموداری خواهید داشت که شبیه خط لوله اصلی ویژگی ما است.



خط لوله مصنوعی گرتل در جریان هوا.

 

اگر می‌خواهید این خط لوله را اجرا کنید، و آن را در عمل ببینید، به آن بروید همراه با مخزن Github. در آنجا دستورالعمل هایی در مورد نحوه شروع یک نمونه جریان هوا و اجرای خط لوله از انتها به انتها خواهید یافت.

بسته شدن همه چیز

 
 
اگر تا اینجا پیش رفته اید، دیده اید که چگونه گرتل می تواند در خط لوله داده ای که بر روی Airflow ساخته شده است ادغام شود. با ترکیب APIهای سازگار با توسعه گرتل، و سیستم قدرتمند قلاب ها و اپراتورهای Airflow، ساخت خطوط لوله ETL که داده ها را در دسترس تر و استفاده ایمن تر می کند، آسان است.

ما همچنین در مورد یک مورد استفاده مهندسی ویژگی مشترک صحبت کردیم که در آن داده های حساس ممکن است به راحتی قابل دسترسی نباشند. با تولید یک نسخه مصنوعی از مجموعه داده، خطر افشای هرگونه داده حساس را کاهش می‌دهیم، اما همچنان کاربرد مجموعه داده را حفظ می‌کنیم و در عین حال آن را به سرعت در اختیار کسانی قرار می‌دهیم که به آن نیاز دارند.

با در نظر گرفتن خط لوله ویژگی به صورت انتزاعی تر، اکنون الگویی داریم که می تواند برای هر تعداد پرس و جوی جدید SQL تغییر کاربری دهد. با استقرار یک نسخه جدید از خط لوله، و جایگزینی پرس و جو اولیه SQL، می‌توانیم هر درخواست بالقوه حساس را با یک مجموعه داده مصنوعی که حریم خصوصی مشتری را حفظ می‌کند، مقابله کنیم. تنها خط کدی که باید تغییر کند، مسیر فایل sql است. نیازی به مهندسی داده پیچیده نیست.

تشکر برای خواندن

 
 
برای ما ایمیل بفرستید hi@gretel.ai یا به ما بپیوندید شل اگر سوال یا نظری دارید ما دوست داریم بدانیم که شما چگونه از Airflow استفاده می کنید و چگونه می توانیم به بهترین وجه با خطوط لوله داده موجود شما یکپارچه شویم.

 
بیوگرافی: درو نیوبری مهندس نرم افزار در Gretel.ai است.

اصلی. مجدداً با اجازه دوباره ارسال شد.

مرتبط:

منبع: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

تمبر زمان:

بیشتر از kdnuggets