สร้างไปป์ไลน์ข้อมูลสังเคราะห์โดยใช้ Gretel และ Apache Airflow

โหนดต้นทาง: 1068200

สร้างไปป์ไลน์ข้อมูลสังเคราะห์โดยใช้ Gretel และ Apache Airflow

ในบล็อกโพสต์นี้ เราสร้างไปป์ไลน์ ETL ที่สร้างข้อมูลสังเคราะห์จากฐานข้อมูล PostgreSQL โดยใช้ Synthetic Data API ของ Gretel และ Apache Airflow


By ดรูว์ นิวเบอร์รี, Software Engineer ที่ Gretel.ai

สร้างไปป์ไลน์ข้อมูลสังเคราะห์โดยใช้ Gretel และ Apache Airflow

สวัสดีทุกคน ฉันชื่อดรูว์ เป็นวิศวกรซอฟต์แวร์ที่เกรเทล เมื่อเร็ว ๆ นี้ฉันได้คิดเกี่ยวกับรูปแบบในการผสานรวม Gretel API เข้ากับเครื่องมือที่มีอยู่ เพื่อให้ง่ายต่อการสร้างไปป์ไลน์ข้อมูลซึ่งความปลอดภัยและความเป็นส่วนตัวของลูกค้าเป็นคุณสมบัติชั้นหนึ่ง ไม่ใช่แค่การคิดภายหลังหรือช่องที่ต้องตรวจสอบ

เครื่องมือวิศวกรรมข้อมูลอย่างหนึ่งที่ได้รับความนิยมในหมู่วิศวกรและลูกค้าของ Gretel คือ Apache Airflow นอกจากนี้ยังทำงานได้ดีกับ Gretel ในบล็อกโพสต์นี้ เราจะแสดงวิธีสร้างไปป์ไลน์ข้อมูลสังเคราะห์โดยใช้ Airflow, Gretel และ PostgreSQL กระโดดเข้าไปกันเถอะ!

กระแสลมคืออะไร

 
 
Airflow เป็นเครื่องมืออัตโนมัติเวิร์กโฟลว์ที่ใช้ทั่วไปในการสร้างไปป์ไลน์ข้อมูล ช่วยให้วิศวกรข้อมูลหรือนักวิทยาศาสตร์ข้อมูลสามารถกำหนดและปรับใช้ไปป์ไลน์เหล่านี้โดยทางโปรแกรมโดยใช้ Python และโครงสร้างอื่นๆ ที่คุ้นเคย แกนหลักของการไหลของอากาศคือแนวคิดของ DAG หรือกราฟ acyclic แบบกำกับทิศทาง Airflow DAG จัดเตรียมโมเดลและชุดของ API สำหรับการกำหนดส่วนประกอบไปป์ไลน์ การขึ้นต่อกัน และลำดับการดำเนินการ

คุณอาจพบไปป์ไลน์ Airflow ที่จำลองข้อมูลจากฐานข้อมูลผลิตภัณฑ์ไปยังคลังข้อมูล ไปป์ไลน์อื่นอาจดำเนินการค้นหาที่รวมข้อมูลที่เป็นมาตรฐานไว้ในชุดข้อมูลเดียวที่เหมาะสำหรับการวิเคราะห์หรือการสร้างแบบจำลอง ไปป์ไลน์อื่นอาจเผยแพร่รายงานรายวันที่รวบรวมตัวชี้วัดทางธุรกิจที่สำคัญ ชุดรูปแบบทั่วไปที่ใช้ร่วมกันระหว่างกรณีการใช้งานเหล่านี้: การประสานงานการเคลื่อนไหวของข้อมูลข้ามระบบ นี่คือจุดที่ Airflow เปล่งประกาย

ใช้ประโยชน์จากกระแสลมและระบบนิเวศอันอุดมสมบูรณ์ของ การผสานรวมวิศวกรข้อมูลและนักวิทยาศาสตร์สามารถประสานเครื่องมือหรือบริการที่แตกต่างกันจำนวนเท่าใดก็ได้ไว้ในไปป์ไลน์เดียวที่บำรุงรักษาและดำเนินการได้ง่าย ด้วยความเข้าใจในความสามารถในการผสานรวมเหล่านี้ ตอนนี้เราจะเริ่มพูดถึงวิธีที่ Gretel อาจรวมเข้ากับไปป์ไลน์ Airflow เพื่อปรับปรุงเวิร์กโฟลว์ทั่วไปของข้อมูล

เกรเทลเข้ากันได้อย่างไร?

 
 
ที่ Gretel ภารกิจของเราคือทำให้ข้อมูลง่ายขึ้นและปลอดภัยยิ่งขึ้นในการทำงานด้วย เมื่อเราพูดคุยกับลูกค้า ประเด็นปัญหาหนึ่งที่เรามักได้ยินคือเวลาและความพยายามที่จำเป็นในการให้นักวิทยาศาสตร์ข้อมูลเข้าถึงข้อมูลที่ละเอียดอ่อนได้ โดยใช้ เกรเทล ซินธิติกส์เราสามารถลดความเสี่ยงในการทำงานกับข้อมูลที่ละเอียดอ่อนโดยการสร้างสำเนาสังเคราะห์ของชุดข้อมูล การผสานรวม Gretel กับ Airflow ทำให้สามารถสร้างไปป์ไลน์แบบบริการตนเองที่ช่วยให้นักวิทยาศาสตร์ข้อมูลสามารถรับข้อมูลที่ต้องการได้อย่างรวดเร็วโดยไม่ต้องมีวิศวกรข้อมูลสำหรับคำขอข้อมูลใหม่ทุกครั้ง

เพื่อแสดงความสามารถเหล่านี้ เราจะสร้างไปป์ไลน์ ETL ที่แยกคุณลักษณะกิจกรรมของผู้ใช้ออกจากฐานข้อมูล สร้างชุดข้อมูลเวอร์ชันสังเคราะห์ และบันทึกชุดข้อมูลไปยัง S3 ด้วยชุดข้อมูลสังเคราะห์ที่บันทึกไว้ใน S3 จากนั้นนักวิทยาศาสตร์ข้อมูลจะสามารถใช้ชุดข้อมูลดังกล่าวสำหรับการสร้างแบบจำลองหรือการวิเคราะห์แบบดาวน์สตรีมโดยไม่กระทบต่อความเป็นส่วนตัวของลูกค้า

อย่างแรกเลยคือใช้มุมมองตานกของท่อส่งน้ำมัน แต่ละโหนดในไดอะแกรมนี้แสดงถึงขั้นตอนไปป์ไลน์หรือ "งาน" ในแง่ของการไหลของอากาศ



ตัวอย่างท่อใยสังเคราะห์ Gretel บน Airflow

 

เราสามารถแบ่งไปป์ไลน์ออกเป็น 3 ขั้นตอน คล้ายกับที่คุณอาจพบในไปป์ไลน์ ETL:

  • สารสกัด – งาน extract_features จะทำการสืบค้นฐานข้อมูล และแปลงข้อมูลเป็นชุดของคุณสมบัติที่นักวิทยาศาสตร์ข้อมูลสามารถใช้สำหรับการสร้างแบบจำลอง
  • สังเคราะห์ – generate_synthetic_features จะนำฟีเจอร์ที่แยกออกมาเป็นอินพุต ฝึกโมเดลสังเคราะห์ แล้วสร้างชุดฟีเจอร์สังเคราะห์โดยใช้ Gretel API และบริการคลาวด์
  • โหลด – upload_synthetic_features บันทึกชุดคุณสมบัติสังเคราะห์ไปยัง S3 ซึ่งสามารถนำเข้าไปยังโมเดลดาวน์สตรีมหรือการวิเคราะห์ใดๆ

ในสองสามส่วนถัดไป เราจะเจาะลึกลงไปในแต่ละขั้นตอนทั้งสามนี้โดยละเอียดยิ่งขึ้น หากคุณต้องการติดตามตัวอย่างโค้ดแต่ละรายการ คุณสามารถตรงไปที่ gretelai/gretel-ท่อส่งลม และดาวน์โหลดรหัสทั้งหมดที่ใช้ในโพสต์บล็อกนี้ repo ยังมีคำแนะนำที่คุณสามารถทำตามเพื่อเริ่มอินสแตนซ์ Airflow และเรียกใช้ไปป์ไลน์แบบจุดต่อจุด

นอกจากนี้ การดูไปป์ไลน์ Airflow อย่างครบถ้วนอาจเป็นประโยชน์ ก่อนที่เราจะแยกส่วนประกอบแต่ละส่วน dags/airbnb_user_bookings.py. ข้อมูลโค้ดในส่วนต่อไปนี้จะแยกจากไปป์ไลน์การจองของผู้ใช้ที่เชื่อมโยง

สารสกัดจากคุณสมบัติ

 
 
งานแรก extract_features มีหน้าที่ในการดึงข้อมูลดิบจากฐานข้อมูลต้นทางและแปลงเป็นชุดของคุณสมบัติ เป็นเรื่องธรรมดา วิศวกรรมคุณลักษณะ ปัญหาที่คุณอาจพบในการเรียนรู้ของเครื่องหรือไปป์ไลน์การวิเคราะห์

ในไปป์ไลน์ตัวอย่างของเรา เราจะจัดเตรียมฐานข้อมูล PostgreSQL และโหลดด้วยข้อมูลการจองจาก an การแข่งขัน Airbnb Kaggle.

ชุดข้อมูลนี้ประกอบด้วยสองตาราง ผู้ใช้และเซสชัน เซสชันมีการอ้างอิงคีย์ภายนอก user_id เมื่อใช้ความสัมพันธ์นี้ เราจะสร้างชุดคุณลักษณะที่มีเมตริกการจองต่างๆ ที่รวบรวมโดยผู้ใช้ รูปต่อไปนี้แสดงถึงแบบสอบถาม SQL ที่ใช้ในการสร้างคุณลักษณะ

C session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) เวลา AS_second_session_ min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds_idle โดยผู้ใช้ FT LI เซสชัน _ คุณสมบัติ ON


จากนั้น เคียวรี SQL จะถูกดำเนินการจากไปป์ไลน์ Airflow ของเราและเขียนไปยังตำแหน่ง S3 ระดับกลางโดยใช้คำจำกัดความของงานต่อไปนี้

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" with NamedTemporaryFile (mode="r+", suffix=".csv") เป็น tmp_csv: postgres.copy_expert( f"copy ({sql_query}) เป็น stdout พร้อมส่วนหัว csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, คีย์=คีย์, ) คีย์ส่งคืน


อินพุตของงาน sql_file เป็นตัวกำหนดว่าคิวรีใดที่จะรันบนฐานข้อมูล แบบสอบถามนี้จะถูกอ่านในงานแล้วดำเนินการกับฐานข้อมูล ผลลัพธ์ของการสืบค้นจะถูกเขียนไปยัง S3 และคีย์ไฟล์ระยะไกลจะถูกส่งคืนเป็นผลลัพธ์ของงาน

ภาพหน้าจอด้านล่างแสดงตัวอย่างชุดผลลัพธ์ของข้อความค้นหาการแยกจากด้านบน เราจะอธิบายวิธีสร้างเวอร์ชันสังเคราะห์ของชุดข้อมูลนี้ในหัวข้อถัดไป



ดูตัวอย่างผลการค้นหา

สังเคราะห์คุณสมบัติโดยใช้ Gretel APIs

 
 
ในการสร้างเวอร์ชันสังเคราะห์ของแต่ละคุณลักษณะ ขั้นแรกเราต้องฝึกแบบจำลองสังเคราะห์ จากนั้นเรียกใช้แบบจำลองเพื่อสร้างเรกคอร์ดสังเคราะห์ Gretel มีชุด Python SDK ที่ทำให้ง่ายต่อการรวมเข้ากับงาน Airflow

นอกจาก Python Client SDK แล้ว เราได้สร้าง a ตะขอเกี่ยวการไหลของอากาศ Gretel ที่จัดการการเชื่อมต่อและความลับของ Gretel API หลังจากตั้งค่า Gretel Airflow Connection แล้ว การเชื่อมต่อกับ Gretel API นั้นง่ายพอๆ กับ

จาก hooks.gretel นำเข้า GretelHook gretel = GretelHook () โครงการ = gretel.get_project ()


สำหรับข้อมูลเพิ่มเติมเกี่ยวกับการกำหนดค่าการเชื่อมต่อ Airflow โปรดดูที่ที่เก็บ Github README.

ตัวแปรโปรเจ็กต์ในตัวอย่างด้านบนสามารถใช้เป็นจุดเริ่มต้นหลักสำหรับการฝึกและเรียกใช้โมเดลสังเคราะห์โดยใช้ API ของ Gretel สำหรับรายละเอียดเพิ่มเติม คุณสามารถตรวจสอบของเรา เอกสาร Python API.

อ้างอิงกลับไปที่ไปป์ไลน์การจอง ตอนนี้เราจะตรวจสอบงาน generate_synthetic_features ขั้นตอนนี้รับผิดชอบในการฝึกโมเดลสังเคราะห์โดยใช้คุณลักษณะที่แยกไว้ในงานก่อนหน้านี้

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() แบบสำรวจความคิดเห็น (model) ส่งคืน model.get_artifact_link("data_preview")


เมื่อดูที่ลายเซ็นเมธอด คุณจะเห็นว่ามันใช้พาธ data_source ค่านี้ชี้ไปที่ฟีเจอร์ S3 ที่แยกในขั้นตอนก่อนหน้า ในตอนต่อไป เราจะอธิบายวิธีการต่อสายอินพุตและเอาท์พุตเหล่านี้เข้าด้วยกัน

เมื่อสร้างโมเดลโดยใช้ project.create_model_obj พารามิเตอร์ model_config แสดงถึงการกำหนดค่าโมเดลสังเคราะห์ที่ใช้ในการสร้างโมเดล ในไพพ์ไลน์นี้ เราใช้ของเรา การกำหนดค่าโมเดลเริ่มต้นแต่อื่นๆอีกมากมาย ตัวเลือกการกำหนดค่า มี

หลังจากกำหนดค่าโมเดลแล้ว เราจะเรียก model.submit_cloud() สิ่งนี้จะส่งแบบจำลองสำหรับการฝึกอบรมและการสร้างบันทึกโดยใช้ Gretel Cloud การเรียกแบบสำรวจความคิดเห็น (แบบจำลอง) จะบล็อกงานจนกว่าแบบจำลองจะเสร็จสิ้นการฝึกอบรม

เมื่อโมเดลได้รับการฝึกอบรมแล้ว เราจะใช้ get_artifact_link เพื่อส่งคืนลิงก์เพื่อดาวน์โหลดคุณลักษณะการสังเคราะห์ที่สร้างขึ้น



การแสดงตัวอย่างข้อมูลของชุดคุณสมบัติสังเคราะห์

 

ลิงก์อาร์ติแฟกต์นี้จะใช้เป็นอินพุตของขั้นตอน upload_synthetic_features สุดท้าย

โหลดคุณสมบัติสังเคราะห์

 
 
ฟีเจอร์ดั้งเดิมถูกดึงออกมา และสร้างเวอร์ชันสังเคราะห์ขึ้น ตอนนี้ได้เวลาอัปโหลดฟีเจอร์สังเคราะห์เพื่อให้ผู้บริโภคดาวน์สตรีมเข้าถึงได้ ในตัวอย่างนี้ เราจะใช้บัคเก็ต S3 เป็นปลายทางสุดท้ายสำหรับชุดข้อมูล

@task() def upload_synthetic_features(data_set: str): context = get_current_context() พร้อม open(data_set, "rb") เป็น synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_featuresv_synthetic.cs )


งานนี้ค่อนข้างตรงไปตรงมา ค่าอินพุต data_set มีลิงก์ HTTP ที่ลงชื่อเพื่อดาวน์โหลดชุดข้อมูลสังเคราะห์จาก API ของ Gretel งานจะอ่านไฟล์นั้นในผู้ปฏิบัติงาน Airflow จากนั้นใช้เบ็ด S3 ที่กำหนดค่าไว้แล้วเพื่ออัปโหลดไฟล์คุณสมบัติสังเคราะห์ไปยังบัคเก็ต S3 ที่ผู้บริโภคหรือรุ่นดาวน์สตรีมสามารถเข้าถึงได้

การจัดวางท่อ

 
 
ในสามส่วนสุดท้าย เราได้อธิบายโค้ดทั้งหมดที่จำเป็นในการแยก สังเคราะห์ และโหลดชุดข้อมูล ขั้นตอนสุดท้ายคือการผูกแต่ละงานเหล่านี้เข้าด้วยกันเป็นไปป์ไลน์ Airflow เดียว

หากคุณย้อนกลับไปที่จุดเริ่มต้นของโพสต์นี้ เราได้กล่าวถึงแนวคิดของ DAG สั้นๆ การใช้ TaskFlow API ของ Airflow เราสามารถเขียนวิธี Python สามวิธีเหล่านี้ลงใน DAG ที่กำหนดอินพุต เอาต์พุต และลำดับแต่ละขั้นตอนจะทำงาน

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = สร้าง_synthetic_features(feature_path) upload_synthetic_features (synthetic_data)


หากคุณทำตามเส้นทางของการเรียกเมธอดเหล่านี้ ในที่สุด คุณจะได้กราฟที่ดูเหมือนไปป์ไลน์ฟีเจอร์ดั้งเดิมของเรา



ไปป์ไลน์สังเคราะห์ Gretel บน Airflow

 

หากคุณต้องการเรียกใช้ไปป์ไลน์นี้และเห็นการทำงานจริง ตรงไปที่ ที่มาพร้อมกับที่เก็บ Github. คุณจะพบคำแนะนำเกี่ยวกับวิธีการเริ่มต้นอินสแตนซ์ Airflow และเรียกใช้ไปป์ไลน์จากจุดสิ้นสุดไปยังจุดสิ้นสุด

การห่อของขึ้น

 
 
หากคุณมาไกลถึงขนาดนี้ คุณจะเห็นว่า Gretel สามารถรวมเข้ากับไปป์ไลน์ข้อมูลที่สร้างขึ้นบน Airflow ได้อย่างไร ด้วยการรวม API ที่เป็นมิตรกับนักพัฒนาของ Gretel เข้ากับระบบตะขอและตัวดำเนินการที่ทรงพลังของ Airflow ทำให้ง่ายต่อการสร้างไปป์ไลน์ ETL ที่ทำให้เข้าถึงข้อมูลได้มากขึ้นและปลอดภัยยิ่งขึ้นในการใช้งาน

นอกจากนี้เรายังได้พูดคุยเกี่ยวกับกรณีการใช้งานวิศวกรรมคุณลักษณะทั่วไปซึ่งข้อมูลที่ละเอียดอ่อนอาจไม่สามารถเข้าถึงได้ทันที ด้วยการสร้างชุดข้อมูลเวอร์ชันสังเคราะห์ เราลดความเสี่ยงในการเปิดเผยข้อมูลที่ละเอียดอ่อน แต่ยังคงรักษายูทิลิตี้ของชุดข้อมูลไว้ในขณะที่ทำให้ผู้ที่ต้องการใช้ได้อย่างรวดเร็ว

เมื่อนึกถึงไปป์ไลน์ฟีเจอร์ในแง่นามธรรมมากขึ้น ตอนนี้เรามีรูปแบบที่สามารถนำไปใช้ใหม่สำหรับการสืบค้น SQL ใหม่จำนวนเท่าใดก็ได้ ด้วยการปรับใช้ไปป์ไลน์เวอร์ชันใหม่และสลับการสืบค้น SQL เริ่มต้น เราสามารถจัดการกับการสืบค้นที่อาจมีความละเอียดอ่อนด้วยชุดข้อมูลสังเคราะห์ที่รักษาความเป็นส่วนตัวของลูกค้า โค้ดบรรทัดเดียวที่ต้องเปลี่ยนคือพาธไปยังไฟล์ sql ไม่จำเป็นต้องมีวิศวกรรมข้อมูลที่ซับซ้อน

ขอบคุณที่อ่าน

 
 
ส่งอีเมลถึงเราที่ สวัสดี@gretel.ai หรือมาร่วมกับเราใน หย่อน หากคุณมีคำถามหรือความคิดเห็น เราชอบที่จะได้ยินว่าคุณใช้ Airflow อย่างไรและเราจะผสานรวมกับไปป์ไลน์ข้อมูลที่มีอยู่ของคุณได้อย่างไร

 
Bio: ดรูว์ นิวเบอร์รี เป็นวิศวกรซอฟต์แวร์ที่ Gretel.ai

Original. โพสต์ใหม่โดยได้รับอนุญาต

ที่เกี่ยวข้อง



เรื่องเด่นใน 30 วันที่ผ่านมา
เป็นที่นิยม
  1. ความแตกต่างระหว่างนักวิทยาศาสตร์ข้อมูลและวิศวกร ML
  2. 3 เหตุผลที่คุณควรใช้แบบจำลองการถดถอยเชิงเส้นแทนโครงข่ายประสาทเทียม
  3. คำถามและคำตอบในการสัมภาษณ์วิทยาศาสตร์ข้อมูลทั่วไปส่วนใหญ่
  4. GitHub Copilot โอเพ่นซอร์สทางเลือก
  5. คำแนะนำสำหรับการเรียนรู้ Data Science จากผู้อำนวยการฝ่ายวิจัยของ Google
แบ่งปันมากที่สุด
  1. ความแตกต่างระหว่างนักวิทยาศาสตร์ข้อมูลและวิศวกร ML
  2. วิธีค้นหาดาต้าเฟรมของ Pandas
  3. ทำไมคุณควรเรียนรู้ "Productive Data Science" และอย่างไร
  4. ไม่เพียงแต่สำหรับ Deep Learning เท่านั้น: GPUs เร่งความเร็ว Data Science & Data Analytics ได้อย่างไร
  5. การเขียนแอปพลิเคชัน Python แบบกระจายครั้งแรกของคุณด้วย Ray

ที่มา: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

ประทับเวลา:

เพิ่มเติมจาก KD นักเก็ต