בנה צינור נתונים סינתטי באמצעות גרטל ואפצ'י זרימת אוויר

צומת המקור: 1068200

בנה צינור נתונים סינתטי באמצעות גרטל ואפצ'י זרימת אוויר

בפוסט זה בבלוג, אנו בונים צינור ETL ​​שיוצר נתונים סינתטיים ממסד נתונים PostgreSQL באמצעות ממשקי ה-API של נתונים סינתטיים של Gretel ו-Apache Airflow.


By דרו ניוברי, מהנדס תוכנה ב-Gretel.ai

בנה צינור נתונים סינתטי באמצעות גרטל ואפצ'י זרימת אוויר

היי אנשים, שמי דרו, ואני מהנדס תוכנה כאן בגרטל. לאחרונה חשבתי על דפוסים לשילוב ממשקי API של Gretel בכלים קיימים, כך שיהיה קל לבנות צינורות נתונים שבהם אבטחה ופרטיות לקוחות הם תכונות מהמעלה הראשונה, לא רק מחשבה שלאחר מכן או תיבה לבדיקה.

כלי הנדסת נתונים אחד פופולרי בקרב מהנדסי ולקוחות גרטל הוא Apache Airflow. זה קורה גם עובד מצוין עם גרטל. בפוסט זה בבלוג, נראה לך כיצד לבנות צינור נתונים סינתטי באמצעות Airflow, Gretel ו-PostgreSQL. בואו נקפוץ פנימה!

מהי זרימת אוויר

 
 
זרימת אוויר הוא כלי אוטומציה של זרימת עבודה בשימוש נפוץ לבניית צינורות נתונים. זה מאפשר למהנדסי נתונים או מדעני נתונים להגדיר ולפרוס צינורות אלה באופן פרוגרמטי באמצעות Python ומבנים מוכרים אחרים. בבסיסה של Airflow הוא הרעיון של DAG, או גרף א-ציקלי מכוון. Airflow DAG מספק מודל וקבוצה של ממשקי API להגדרת רכיבי צינור, התלות שלהם וסדר הביצוע שלהם.

ייתכן שתמצא צינורות Airflow המשכפלים נתונים ממסד נתונים של מוצרים לתוך מחסן נתונים. צינורות אחרים עשויים להפעיל שאילתות המצטרפות נתונים מנורמלים למערך נתונים יחיד המתאים לניתוח או מודלים. צינור נוסף עשוי לפרסם דוח יומי המצטבר מדדי מפתח עסקיים. נושא משותף משותף בין מקרי השימוש הללו: תיאום תנועת הנתונים בין מערכות. זה המקום בו Airflow זורח.

מינוף זרימת האוויר והמערכת האקולוגית העשירה שלה של ואינטגרציות, מהנדסי נתונים ומדענים יכולים לתזמר כל מספר של כלים או שירותים שונים לתוך צינור מאוחד אחד שקל לתחזק ולתפעול. עם הבנה של יכולות האינטגרציה הללו, כעת נתחיל לדבר על האופן שבו Gretel עשויה להשתלב בצינור Airflow כדי לשפר את זרימות העבודה הנפוצות של פעולות נתונים.

איך גרטל משתלבת?

 
 
ב-Gretel, המשימה שלנו היא להפוך את הנתונים לקלים יותר ובטוחים יותר לעבוד איתם. כשמדברים עם לקוחות, נקודת כאב אחת שאנו שומעים עליה לעתים קרובות היא הזמן והמאמץ הנדרשים כדי לקבל גישה למדעני נתונים לנתונים רגישים. באמצעות גרטל סינתטי, נוכל להפחית את הסיכון של עבודה עם נתונים רגישים על ידי יצירת עותק סינתטי של מערך הנתונים. על ידי שילוב Gretel עם Airflow, ניתן ליצור צינורות בשירות עצמי המקלים על מדעני נתונים לקבל במהירות את הנתונים הדרושים להם מבלי לדרוש מהנדס נתונים עבור כל בקשת נתונים חדשה.

כדי להדגים את היכולות הללו, נבנה צינור ETL ​​שמחלץ תכונות פעילות משתמש ממסד נתונים, יוצר גרסה סינתטית של מערך הנתונים ושומר את מערך הנתונים ב-S3. עם מערך הנתונים הסינטטי שנשמר ב-S3, הוא יכול לשמש מדעני נתונים לצורך מודלים או ניתוח במורד הזרם מבלי לפגוע בפרטיות הלקוח.

כדי להתניע את העניינים, בוא נתבונן תחילה במבט ממעוף הציפור של הצינור. כל צומת בתרשים זה מייצג שלב בצינור, או "משימה" במונחי זרימת אוויר.



דוגמה לצינור הסינטטי של Gretel ב-Airflow.

 

אנו יכולים לחלק את הצינור ל-3 שלבים, בדומה למה שאתה עשוי למצוא בצינור ETL:

  • להוציא – המשימה extract_features תבצע שאילתות על מסד נתונים, ותהפוך את הנתונים לסט של תכונות שיכולות לשמש מדעני נתונים לבניית מודלים.
  • לסנתז - gener_synthetic_features ייקח את התכונות שחולצו כקלט, תאמנו מודל סינתטי, ולאחר מכן ייצרו סט סינתטי של תכונות באמצעות ממשקי API של Gretel ושירותי ענן.
  • לִטעוֹן - upload_synthetic_features שומר את קבוצת התכונות הסינתטית ב-S3 שם ניתן להטמיע אותה בכל מודל או ניתוח במורד הזרם.

בחלקים הבאים נצלול לכל אחד משלושת השלבים הללו בפירוט רב יותר. אם ברצונך לעקוב אחר כל דגימת קוד, תוכל לגשת אל gretelai/gretel-airflow-pipelines והורד את כל הקוד המשמש בפוסט זה בבלוג. ה-repo מכיל גם הוראות שתוכל לבצע כדי להתחיל מופע Airflow ולהפעיל את הצינור מקצה לקצה.

בנוסף, זה עשוי להיות מועיל לראות את צינור זרימת האוויר במלואו, לפני שננתח כל רכיב, dags/airbnb_user_bookings.py. קטעי הקוד בקטעים הבאים נחלצים מצינור הזמנת המשתמש המקושר.

חלץ תכונות

 
 
המשימה הראשונה, extract_features אחראית לחילוץ נתונים גולמיים ממסד הנתונים של המקור והפיכתם לסט של תכונות. זהו נפוץ הנדסת תכונות בעיה שאתה עשוי למצוא בכל צינור למידת מכונה או ניתוח.

בצינור לדוגמה שלנו נספק מסד נתונים PostgreSQL ונטען אותו עם נתוני הזמנה מ-an תחרות Airbnb Kaggle.

מערך נתונים זה מכיל שתי טבלאות, משתמשים ופעילויות באתר. Sessions מכיל הפניה למפתח זר, user_id. באמצעות קשר זה, ניצור קבוצת תכונות המכילה מדדי הזמנה שונים המצטברים לפי משתמש. האיור הבא מייצג את שאילתת SQL המשמשת לבניית התכונות.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max_secsrounds_seconds)session min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_secondsO


שאילתת ה-SQL מבוצעת לאחר מכן מצינור זרימת האוויר שלנו ונכתבת למיקום S3 ביניים באמצעות הגדרת המשימה הבאה.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" עם NamedTemporaryFi (mode="r+", suffix=".csv") בתור tmp_csv: postgres.copy_expert( f"copy ({sql_query}) ל-stdout עם כותרת csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, מפתח=מפתח, ) מפתח החזרה


הקלט למשימה, sql_file, קובע איזו שאילתה להפעיל על מסד הנתונים. שאילתה זו תוקרא למשימה ולאחר מכן תבוצע כנגד מסד הנתונים. לאחר מכן תוצאות השאילתה ייכתבו ל-S3 ומפתח הקובץ המרוחק יוחזר כפלט של המשימה.

צילום המסך שלהלן מציג סט תוצאות לדוגמה של שאילתת החילוץ מלמעלה. נתאר כיצד ליצור גרסה סינתטית של מערך נתונים זה בסעיף הבא.



תצוגה מקדימה של תוצאת שאילתה.

סנתז תכונות באמצעות ממשקי API של Gretel

 
 
כדי ליצור גרסה סינתטית של כל תכונה, עלינו תחילה לאמן מודל סינתטי, ולאחר מכן להפעיל את המודל כדי ליצור רשומות סינתטיות. לגרטל יש קבוצה של ערכות פיתוח התוכנה של Python שמקלות על ההשתלבות במשימות Airflow.

בנוסף ל- Python Client SDK, יצרנו א וו זרימת אוויר גרטל שמנהלת חיבורים וסודות של Gretel API. לאחר הגדרת חיבור Gretel Airflow, החיבור ל-Gretel API הוא קל כמו

מאת hooks.gretel ייבוא ​​GretelHook gretel = GretelHook() project = gretel.get_project()


למידע נוסף על אופן התצורה של חיבורי Airflow, עיין במאגר Github שלנו README.

משתנה הפרויקט בדוגמה למעלה יכול לשמש כנקודת הכניסה הראשית לאימון והרצת מודלים סינתטיים באמצעות ה-API של Gretel. לפרטים נוספים, אתה יכול לבדוק את שלנו מסמכי Python API.

בהתייחסות לצינור ההזמנות, נסקור כעת את משימת gener_synthetic_features. שלב זה אחראי לאימון המודל הסינתטי תוך שימוש בתכונות שחולצו במשימה הקודמת.

@task() def gener_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) return model.get_artifact_link("data_preview")


בהסתכלות על חתימת השיטה, תראה שהיא לוקחת נתיב, data_source. ערך זה מצביע על תכונות S3 שחולצו בשלב הקודם. בסעיף מאוחר יותר נעבור על האופן שבו כל הכניסות והיציאות הללו מחוברות יחד.

בעת יצירת המודל באמצעות project.create_model_obj, הפראם model_config מייצג את תצורת המודל הסינתטי המשמשת ליצירת המודל. בצינור הזה, אנחנו משתמשים ב- תצורת דגם ברירת המחדל, אבל רבים אחרים אפשרויות תצורה זמינים.

לאחר הגדרת המודל, אנו קוראים model.submit_cloud(). זה יגיש את המודל להדרכה ויצירת רשומות באמצעות Gretel Cloud. קריאה לסקר(מודל) תחסום את המשימה עד שהמודל ישלים אימון.

כעת, לאחר שהמודל הוכשר, נשתמש ב-get_artifact_link כדי להחזיר קישור להורדת התכונות הסינתטיות שנוצרו.



תצוגה מקדימה של נתונים של קבוצת התכונות הסינתטית.

 

קישור חפץ זה ישמש כקלט לשלב ה- upload_synthetic_features הסופי.

טען תכונות סינתטיות

 
 
התכונות המקוריות חולצו, ונוצרה גרסה סינתטית. עכשיו הגיע הזמן להעלות את התכונות הסינתטיות כך שיוכלו לגשת אליהם לצרכנים במורד הזרם. בדוגמה זו, אנו הולכים להשתמש בדלי S3 כיעד הסופי עבור מערך הנתונים.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() עם open(data_set, "rb") בתור synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


משימה זו היא די פשוטה. ערך הקלט של data_set מכיל קישור HTTP חתום להורדת מערך הנתונים הסינטטי מה-API של Gretel. המשימה תקרא את הקובץ הזה לתוך ה-Airflow Worker, ולאחר מכן תשתמש ב-S3 שהוגדר כבר כדי להעלות את קובץ התכונות הסינתטי לדלי S3 שבו צרכנים או דגמים במורד הזרם יכולים לגשת אליו.

תזמורת הצינור

 
 
במהלך שלושת הסעיפים האחרונים עברנו על כל הקוד הנדרש כדי לחלץ, לסנתז ולטעון מערך נתונים. השלב האחרון הוא לקשור כל אחת מהמשימות הללו יחד לצינור זרימת אוויר יחיד.

אם תזכרו בחזרה לתחילת הפוסט הזה, הזכרנו בקצרה את הרעיון של DAG. באמצעות ה-TaskFlow API של Airflow נוכל לחבר את שלוש שיטות ה-Python הללו ל-DAG שמגדיר את הכניסות, הפלטים והסדר שכל שלב יופעל.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = gener_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


אם תעקבו אחר הנתיבים של קריאות השיטה הללו, בסופו של דבר תקבלו גרף שנראה כמו צינור התכונות המקורי שלנו.



צינור סינטטי גרטל ב-Airflow.

 

אם אתה רוצה להפעיל את הצינור הזה, ולראות אותו בפעולה, עבור אל מאגר Github הנלווה. שם תמצא הוראות כיצד להתחיל מופע Airflow ולהפעיל את הצינור מקצה לקצה.

עוטפת את הדברים

 
 
אם הגעתם עד לכאן, ראיתם כיצד ניתן לשלב את גרטל בצינור נתונים שנבנה על Airflow. על ידי שילוב של ממשקי ה-API הידידותיים למפתחים של Gretel, ומערכת ה-hooks והמפעילים החזקה של Airflow, קל לבנות צינורות ETL שהופכים את הנתונים לנגישים ובטוחים יותר לשימוש.

דיברנו גם על מקרה שימוש נפוץ בהנדסת תכונה שבה נתונים רגישים עשויים שלא להיות נגישים בקלות. על ידי יצירת גרסה סינתטית של מערך הנתונים, אנו מפחיתים את הסיכון לחשיפת נתונים רגישים כלשהם, אך עדיין שומרים על התועלת של מערך הנתונים תוך ביצוע מהיר של זמינותם למי שזקוק לו.

כשחושבים על צינור התכונות במונחים מופשטים יותר, יש לנו כעת דפוס שניתן ליישם מחדש עבור כל מספר של שאילתות SQL חדשות. על ידי פריסת גרסה חדשה של הצינור, והחלפת שאילתת ה-SQL הראשונית, נוכל להציג כל שאילתה רגישה עם מערך נתונים סינתטי ששומר על פרטיות הלקוח. שורת הקוד היחידה שצריך לשנות היא הנתיב לקובץ sql. אין צורך בהנדסת נתונים מורכבת.

תודה על הקריאה

 
 
שלחו לנו דוא"ל בשעה hi@gretel.ai או בואו להצטרף אלינו רפוי אם יש לך שאלות או הערות. נשמח לשמוע כיצד אתה משתמש ב-Airflow וכיצד נוכל להשתלב בצורה הטובה ביותר עם צינורות הנתונים הקיימים שלך.

 
Bio you דרו ניוברי הוא מהנדס תוכנה ב-Gretel.ai.

מְקוֹרִי. פורסם מחדש באישור.

מידע נוסף:

מקור: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

בול זמן:

עוד מ KDnuggets