Bangun saluran data sintetis menggunakan Gretel dan Apache Airflow

Node Sumber: 1068200

Bangun saluran data sintetis menggunakan Gretel dan Apache Airflow

Dalam posting blog ini, kami membangun saluran ETL yang menghasilkan data sintetis dari database PostgreSQL menggunakan API Data Sintetis Gretel dan Aliran Udara Apache.


By Menggambar Newberry, Insinyur Perangkat Lunak di Gretel.ai

Bangun saluran data sintetis menggunakan Gretel dan Apache Airflow

Hai teman-teman, nama saya Drew, dan saya seorang insinyur perangkat lunak di sini di Gretel. Baru-baru ini saya memikirkan pola untuk mengintegrasikan API Gretel ke dalam alat yang ada sehingga mudah untuk membangun saluran data di mana keamanan dan privasi pelanggan adalah fitur kelas satu, bukan hanya renungan atau kotak untuk diperiksa.

Salah satu alat rekayasa data yang populer di kalangan insinyur dan pelanggan Gretel adalah Apache Airflow. Itu juga bekerja dengan baik dengan Gretel. Dalam posting blog ini, kami akan menunjukkan kepada Anda cara membuat pipeline data sintetis menggunakan Airflow, Gretel, dan PostgreSQL. Ayo masuk!

Apa itu Aliran Udara?

 
 
Aliran udara adalah alat otomatisasi alur kerja yang biasa digunakan untuk membangun jalur pipa data. Ini memungkinkan insinyur data atau ilmuwan data untuk secara terprogram mendefinisikan dan menyebarkan saluran pipa ini menggunakan Python dan konstruksi familiar lainnya. Inti dari Aliran Udara adalah konsep DAG, atau grafik asiklik terarah. Airflow DAG menyediakan model dan set API untuk menentukan komponen pipeline, dependensi, dan urutan eksekusinya.

Anda mungkin menemukan saluran Airflow mereplikasi data dari database produk ke dalam gudang data. Pipeline lain mungkin menjalankan kueri yang menggabungkan data yang dinormalisasi menjadi satu set data yang cocok untuk analitik atau pemodelan. Namun saluran lain mungkin menerbitkan laporan harian yang menggabungkan metrik bisnis utama. Tema umum yang dibagikan di antara kasus penggunaan ini: mengoordinasikan pergerakan data lintas sistem. Di sinilah Aliran Udara bersinar.

Memanfaatkan Aliran Udara dan ekosistemnya yang kaya dari integrasi, insinyur data dan ilmuwan dapat mengatur sejumlah alat atau layanan yang berbeda ke dalam satu saluran terpadu yang mudah dirawat dan dioperasikan. Dengan pemahaman tentang kemampuan integrasi ini, sekarang kita akan mulai berbicara tentang bagaimana Gretel dapat diintegrasikan ke dalam pipa Airflow untuk meningkatkan alur kerja operasi data umum.

Bagaimana Gretel cocok?

 
 
Di Gretel, misi kami adalah membuat data lebih mudah dan aman untuk digunakan. Berbicara dengan pelanggan, satu masalah yang sering kita dengar adalah waktu dan upaya yang diperlukan untuk mendapatkan akses ilmuwan data ke data sensitif. Menggunakan Sintetis Gretel, kami dapat mengurangi risiko bekerja dengan data sensitif dengan membuat salinan sintetis dari kumpulan data. Dengan mengintegrasikan Gretel dengan Airflow, dimungkinkan untuk membuat saluran pipa swalayan yang memudahkan ilmuwan data untuk mendapatkan data yang mereka butuhkan dengan cepat tanpa memerlukan insinyur data untuk setiap permintaan data baru.

Untuk mendemonstrasikan kemampuan ini, kami akan membuat pipeline ETL yang mengekstrak fitur aktivitas pengguna dari database, membuat versi sintetis dari kumpulan data, dan menyimpan kumpulan data ke S3. Dengan set data sintetis yang disimpan di S3, data tersebut kemudian dapat digunakan oleh ilmuwan data untuk pemodelan atau analisis hilir tanpa mengorbankan privasi pelanggan.

Untuk memulainya, pertama-tama mari kita lihat sekilas tentang pipa. Setiap node dalam diagram ini mewakili langkah pipa, atau "tugas" dalam istilah Aliran Udara.



Contoh pipa sintetis Gretel di Airflow.

 

Kami dapat membagi jalur pipa menjadi 3 tahap, mirip dengan apa yang mungkin Anda temukan di jalur pipa ETL:

  • Ekstrak – Tugas extract_features akan menanyakan database, dan mengubah data menjadi sekumpulan fitur yang dapat digunakan oleh ilmuwan data untuk membangun model.
  • Mempersatukan – generate_synthetic_features akan mengambil fitur yang diekstraksi sebagai input, melatih model sintetis, dan kemudian menghasilkan serangkaian fitur sintetis menggunakan API Gretel dan layanan cloud.
  • Kami akan kembali.Beban – upload_synthetic_features menyimpan kumpulan fitur sintetis ke S3 di mana ia dapat diserap ke dalam model atau analisis hilir apa pun.

Dalam beberapa bagian berikutnya kita akan menyelami masing-masing dari tiga langkah ini secara lebih rinci. Jika Anda ingin mengikuti setiap contoh kode, Anda dapat menuju ke gretelai/gretel-aliran udara-pipa dan unduh semua kode yang digunakan dalam posting blog ini. Repo juga berisi instruksi yang dapat Anda ikuti untuk memulai instans Airflow dan menjalankan pipeline dari ujung ke ujung.

Selain itu, mungkin berguna untuk melihat pipa Aliran Udara secara keseluruhan, sebelum kita membedah setiap komponen, dags/airbnb_user_bookings.py. Cuplikan kode di bagian berikut diekstraksi dari saluran pemesanan pengguna yang ditautkan.

Ekstrak Fitur

 
 
Tugas pertama, extract_features bertanggung jawab untuk mengekstraksi data mentah dari database sumber dan mengubahnya menjadi sekumpulan fitur. Ini adalah hal yang umum rekayasa fitur masalah yang mungkin Anda temukan di pembelajaran mesin atau saluran analitik apa pun.

Dalam contoh pipa kami, kami akan menyediakan database PostgreSQL dan memuatnya dengan data pemesanan dari Kompetisi Kaggle Airbnb.

Dataset ini berisi dua tabel, Pengguna dan Sesi. Sesi berisi referensi kunci asing, user_id. Dengan menggunakan hubungan ini, kami akan membuat serangkaian fitur yang berisi berbagai metrik pemesanan yang dikumpulkan oleh pengguna. Gambar berikut mewakili kueri SQL yang digunakan untuk membangun fitur.

DENGAN session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)_) AS max_session_time_seconds min_session_time_seconds, ( SELECT count(*) FROM session s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM session GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .bahasa, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds pengguna LEFT_ONfitur_pengguna


Kueri SQL kemudian dieksekusi dari saluran Airflow kami dan ditulis ke lokasi S3 perantara menggunakan definisi tugas berikut.

@task() def extract_features(sql_file: str) -> str: konteks = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" dengan NamedTemporaryFile (mode="r+", suffix=".csv") sebagai tmp_csv: postgres.copy_expert( f"copy ({sql_query}) ke stdout dengan header csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, kunci=kunci, ) kunci kembali


Input ke tugas, sql_file, menentukan kueri apa yang akan dijalankan di database. Kueri ini akan dibacakan ke tugas dan kemudian dieksekusi terhadap database. Hasil kueri kemudian akan ditulis ke S3 dan kunci file jarak jauh akan dikembalikan sebagai output tugas.

Tangkapan layar di bawah ini menunjukkan kumpulan hasil sampel dari kueri ekstraksi dari atas. Kami akan menjelaskan cara membuat versi sintetis dari kumpulan data ini di bagian selanjutnya.



Pratinjau hasil kueri.

Sintesiskan Fitur menggunakan API Gretel

 
 
Untuk menghasilkan versi sintetis dari setiap fitur, pertama-tama kita harus melatih model sintetis, lalu menjalankan model untuk menghasilkan rekaman sintetis. Gretel memiliki seperangkat SDK Python yang memudahkan integrasi ke dalam tugas-tugas Airflow.

Selain SDK Klien Python, kami telah membuat Kait Aliran Udara Gretel yang mengelola koneksi dan rahasia API Gretel. Setelah menyiapkan Koneksi Aliran Udara Gretel, menghubungkan ke API Gretel semudah

dari hooks.gretel impor GretelHook gretel = GretelHook() project = gretel.get_project()


Untuk informasi lebih lanjut tentang cara mengonfigurasi koneksi Airflow, silakan merujuk ke repositori Github kami README.

Variabel proyek dalam contoh di atas dapat digunakan sebagai titik masuk utama untuk melatih dan menjalankan model sintetis menggunakan API Gretel. Untuk lebih jelasnya, Anda dapat memeriksa kami Dokumen API Python.

Merujuk kembali ke alur pemesanan, sekarang kita akan meninjau tugas generate_synthetic_features. Langkah ini bertanggung jawab untuk melatih model sintetis menggunakan fitur yang diekstraksi dalam tugas sebelumnya.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) mengembalikan model.get_artifact_link("data_preview")


Melihat tanda tangan metode, Anda akan melihatnya mengambil jalur, data_source. Nilai ini menunjukkan fitur S3 yang diekstraksi pada langkah sebelumnya. Di bagian selanjutnya kita akan membahas bagaimana semua input dan output ini dihubungkan bersama.

Saat membuat model menggunakan project.create_model_obj, parameter model_config mewakili konfigurasi model sintetis yang digunakan untuk menghasilkan model. Di saluran ini, kami menggunakan konfigurasi model default, tapi banyak lainnya opsi konfigurasi tersedia.

Setelah model dikonfigurasi, kita memanggil model.submit_cloud(). Ini akan mengirimkan model untuk pelatihan dan pembuatan catatan menggunakan Gretel Cloud. Memanggil poll(model) akan memblokir tugas hingga model menyelesaikan pelatihan.

Sekarang model telah dilatih, kami akan menggunakan get_artifact_link untuk mengembalikan tautan untuk mengunduh fitur sintetis yang dihasilkan.



Pratinjau data kumpulan fitur sintetis.

 

Tautan artefak ini akan digunakan sebagai masukan ke langkah upload_synthetic_features terakhir.

Muat Fitur Sintetis

 
 
Fitur asli telah diekstraksi, dan versi sintetis telah dibuat. Kini saatnya mengunggah fitur sintetis agar bisa diakses oleh konsumen hilir. Dalam contoh ini, kita akan menggunakan ember S3 sebagai tujuan akhir untuk kumpulan data.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() dengan open(data_set, "rb") sebagai synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Tugas ini cukup mudah. Nilai input data_set berisi tautan HTTP yang ditandatangani untuk mengunduh kumpulan data sintetis dari API Gretel. Tugas akan membaca file itu ke dalam pekerja Airflow, dan kemudian menggunakan kait S3 yang sudah dikonfigurasi untuk mengunggah file fitur sintetis ke bucket S3 tempat konsumen atau model hilir dapat mengaksesnya.

Mengorkestrasi Pipeline

 
 
Selama tiga bagian terakhir, kami telah mempelajari semua kode yang diperlukan untuk mengekstrak, mensintesis, dan memuat kumpulan data. Langkah terakhir adalah mengikat masing-masing tugas ini menjadi satu pipa Aliran Udara.

Jika Anda ingat kembali ke awal posting ini, kami secara singkat menyebutkan konsep DAG. Menggunakan API TaskFlow Airflow, kita dapat menyusun ketiga metode Python ini menjadi DAG yang mendefinisikan input, output, dan urutan setiap langkah yang akan dijalankan.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Jika Anda mengikuti jalur pemanggilan metode ini, pada akhirnya Anda akan mendapatkan grafik yang terlihat seperti saluran fitur asli kami.



Pipa sintetis Gretel di Airflow.

 

Jika Anda ingin menjalankan saluran ini, dan melihatnya beraksi, pergilah ke menyertai repositori Github. Di sana Anda akan menemukan petunjuk tentang cara memulai instans Airflow dan menjalankan pipeline dari ujung ke ujung.

Membungkus segalanya

 
 
Jika Anda telah sampai sejauh ini, Anda telah melihat bagaimana Gretel dapat diintegrasikan ke dalam pipa data yang dibangun di atas Airflow. Dengan menggabungkan API ramah pengembang Gretel, dan sistem kait dan operator Airflow yang kuat, mudah untuk membangun saluran pipa ETL yang membuat data lebih mudah diakses dan lebih aman untuk digunakan.

Kami juga berbicara tentang kasus penggunaan rekayasa fitur umum di mana data sensitif mungkin tidak mudah diakses. Dengan membuat versi sintetik dari kumpulan data, kami mengurangi risiko tereksposnya data sensitif apa pun, namun tetap mempertahankan kegunaan kumpulan data sekaligus membuatnya tersedia dengan cepat bagi mereka yang membutuhkannya.

Memikirkan saluran fitur dalam istilah yang lebih abstrak, kami sekarang memiliki pola yang dapat digunakan kembali untuk sejumlah kueri SQL baru. Dengan men-deploy versi baru dari pipeline, dan menukar kueri SQL awal, kami dapat menampilkan kueri yang berpotensi sensitif dengan set data sintetis yang menjaga privasi pelanggan. Satu-satunya baris kode yang perlu diubah adalah jalur ke file sql. Tidak diperlukan rekayasa data yang rumit.

Terima kasih sudah membaca

 
 
Kirim email di hai@gretel.ai atau datang bergabung bersama kami Kendur jika Anda memiliki pertanyaan atau komentar. Kami ingin mendengar bagaimana Anda menggunakan Airflow dan bagaimana kami dapat berintegrasi dengan baik dengan saluran data Anda yang ada.

 
Bio: Menggambar Newberry adalah Insinyur Perangkat Lunak di Gretel.ai.

Original. Diposting ulang dengan izin.

Terkait:

Sumber: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-Apache-airflow.html

Stempel Waktu:

Lebih dari KDnugget