Gretel ve Apache Airflow kullanarak sentetik bir veri hattı oluşturun

Kaynak Düğüm: 1068200

Gretel ve Apache Airflow kullanarak sentetik bir veri hattı oluşturun

Bu blog gönderisinde, Gretel'in Sentetik Veri API'lerini ve Apache Airflow'u kullanarak bir PostgreSQL veritabanından sentetik veriler üreten bir ETL ardışık düzeni oluşturuyoruz.


By yenibahar, Gretel.ai şirketinde Yazılım Mühendisi

Gretel ve Apache Airflow kullanarak sentetik bir veri hattı oluşturun

Hey millet, benim adım Drew ve ben burada Gretel'de yazılım mühendisiyim. Son zamanlarda Gretel API'lerini mevcut araçlara entegre etme kalıpları üzerinde düşünüyordum, böylece güvenlik ve müşteri gizliliğinin yalnızca sonradan düşünülmüş veya kontrol edilecek bir kutu değil, birinci sınıf özellikler olduğu veri boru hatları oluşturmak kolaydı.

Gretel mühendisleri ve müşterileri arasında popüler olan bir veri mühendisliği aracı Apache Airflow'dur. Ayrıca Gretel ile harika çalışıyor. Bu blog gönderisinde size Airflow, Gretel ve PostgreSQL kullanarak sentetik bir veri hattının nasıl oluşturulacağını göstereceğiz. Hadi atlayalım!

Hava Akışı Nedir?

 
 
Hava akışı veri işlem hatları oluşturmak için yaygın olarak kullanılan bir iş akışı otomasyon aracıdır. Veri mühendislerinin veya veri bilimcilerinin Python ve diğer tanıdık yapıları kullanarak bu boru hatlarını programlı olarak tanımlamasına ve dağıtmasına olanak tanır. Airflow'un merkezinde, bir DAG veya yönlendirilmiş asiklik grafik kavramı bulunur. Bir Airflow DAG, boru hattı bileşenlerini, bağımlılıklarını ve yürütme sırasını tanımlamak için bir model ve API kümesi sağlar.

Verileri bir ürün veritabanından bir veri ambarına kopyalayan Airflow işlem hatları bulabilirsiniz. Diğer işlem hatları, normalleştirilmiş verileri analitik veya modelleme için uygun tek bir veri kümesinde birleştiren sorguları yürütebilir. Yine başka bir işlem hattı, temel iş ölçümlerini bir araya getiren günlük bir rapor yayınlayabilir. Bu kullanım durumları arasında paylaşılan ortak bir tema: verilerin sistemler arasında hareketini koordine etmek. Airflow'un parladığı yer burasıdır.

Airflow ve zengin ekosisteminden yararlanarak entegrasyonlar, veri mühendisleri ve bilim adamları, herhangi bir sayıda farklı aracı veya hizmeti, bakımı ve çalıştırılması kolay tek bir birleşik işlem hattında düzenleyebilir. Bu entegrasyon yeteneklerini anlayarak, şimdi Gretel'in ortak veri operasyonları iş akışlarını iyileştirmek için bir Airflow boru hattına nasıl entegre edilebileceği hakkında konuşmaya başlayacağız.

Gretel nasıl uyum sağlar?

 
 
Gretel'de misyonumuz, verilerle çalışmayı daha kolay ve daha güvenli hale getirmektir. Müşterilerle konuşurken, sık sık duyduğumuz bir sorun, veri bilimcilerinin hassas verilere erişmesini sağlamak için gereken zaman ve çabadır. kullanma Gretel Sentetik, veri kümesinin sentetik bir kopyasını oluşturarak hassas verilerle çalışma riskini azaltabiliriz. Gretel'i Airflow ile entegre ederek, veri bilimcilerin her yeni veri talebi için bir veri mühendisine ihtiyaç duymadan ihtiyaç duydukları verileri hızlı bir şekilde almalarını kolaylaştıran self servis işlem hatları oluşturmak mümkündür.

Bu yetenekleri göstermek için, bir veritabanından kullanıcı etkinliği özelliklerini çıkaran, veri kümesinin sentetik bir sürümünü oluşturan ve veri kümesini S3'e kaydeden bir ETL ardışık düzen oluşturacağız. S3'te kaydedilen sentetik veri kümesiyle, veri bilimcileri tarafından müşteri gizliliğinden ödün vermeden aşağı yönlü modelleme veya analiz için kullanılabilir.

Başlamak için, önce boru hattına kuş bakışı bakalım. Bu şemadaki her düğüm, bir boru hattı adımını veya Hava Akışı terimleriyle "görevi" temsil eder.



Airflow'ta örnek Gretel sentetik boru hattı.

 

Bir ETL ardışık düzeninde bulabileceklerinize benzer şekilde, ardışık düzeni 3 aşamaya ayırabiliriz:

  • Çıkarmak – Extract_features görevi bir veritabanını sorgulayacak ve verileri, model oluşturmak için veri bilimcileri tarafından kullanılabilecek bir dizi özelliğe dönüştürecektir.
  • sentez – create_synthetic_features, çıkarılan özellikleri girdi olarak alacak, sentetik bir model eğitecek ve ardından Gretel API'lerini ve bulut hizmetlerini kullanarak sentetik bir dizi özellik üretecektir.
  • Yük – upload_synthetic_features, sentetik özellik setini herhangi bir aşağı akış modeline veya analize alınabileceği S3'e kaydeder.

Sonraki birkaç bölümde, bu üç adımın her birine daha ayrıntılı olarak dalacağız. Her bir kod örneğini takip etmek isterseniz, şuraya gidebilirsiniz: gretelai/gretel-hava akımı-boru hatları ve bu blog gönderisinde kullanılan tüm kodu indirin. Depo ayrıca bir Airflow örneğini başlatmak ve ardışık düzeni uçtan uca çalıştırmak için izleyebileceğiniz yönergeler içerir.

Ek olarak, her bir bileşeni incelemeden önce Airflow boru hattını bütünüyle incelemek faydalı olabilir. dags/airbnb_user_bookings.py. Aşağıdaki bölümlerdeki kod parçacıkları, bağlantılı kullanıcı ayırma işlem hattından çıkarılır.

Özellikleri Çıkart

 
 
İlk görev olan Extract_features, kaynak veritabanından ham verileri çıkarmaktan ve bir dizi özelliğe dönüştürmekten sorumludur. Bu yaygın bir özellik mühendisliği Herhangi bir makine öğrenimi veya analitik işlem hattında bulabileceğiniz sorun.

Örnek işlem hattımızda bir PostgreSQL veritabanı sağlayacağız ve onu bir Airbnb Kaggle Yarışması.

Bu veri kümesi, Kullanıcılar ve Oturumlar olmak üzere iki tablo içerir. Oturumlar, user_id adlı bir yabancı anahtar başvurusu içerir. Bu ilişkiyi kullanarak, kullanıcı tarafından toplanan çeşitli rezervasyon ölçümlerini içeren bir dizi özellik oluşturacağız. Aşağıdaki şekil, özellikleri oluşturmak için kullanılan SQL sorgusunu temsil etmektedir.

İLE session_features_by_user AS ( user_id, sayım(*) AS_of_actions_taken, count(DISTINCT action_type) AS Number_of_unique_actions, round(ort(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) AS_saniye_relapsed) AS_of_session min_session_time_seconds, ( WHERE s.user_id = user_id AND s.action_type = 'booking_request') FROM oturumlardan SEÇİN oturumlar GROUP BY user_id'ye göre total_bookings ) u.id'yi user_id, u.gender, u.age, u olarak SEÇİN .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_saniye, s.min_session_time_saniye, s.max_session_time_saniye, s.max_session_time_saniye, s.user_features 'den kullanıcı kimliğine göre 5000 MİT_kullanıcı kimliğine göre.


SQL sorgusu daha sonra Airflow boru hattımızdan yürütülür ve aşağıdaki görev tanımı kullanılarak bir ara S3 konumuna yazılır.

@task() def Extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" NamedTemporaryFile ile (mode="r+", suffix=".csv") tmp_csv olarak: postgres.copy_expert( f"kopyala ({sql_query}) csv başlığıyla stdout'a", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, anahtar=anahtar, ) dönüş anahtarı


Görevin girişi, sql_file, veritabanında hangi sorgunun çalıştırılacağını belirler. Bu sorgu göreve okunacak ve ardından veritabanına karşı yürütülecektir. Sorgunun sonuçları daha sonra S3'e yazılacak ve uzak dosya anahtarı görevin bir çıktısı olarak döndürülecektir.

Aşağıdaki ekran görüntüsü, yukarıdan çıkarma sorgusunun örnek bir sonuç kümesini göstermektedir. Bir sonraki bölümde bu veri setinin sentetik versiyonunun nasıl oluşturulacağını anlatacağız.



Sorgu sonucu önizlemesi.

Gretel API'lerini kullanarak Özellikleri sentezleyin

 
 
Her özelliğin sentetik bir versiyonunu oluşturmak için önce sentetik bir modeli eğitmeli ve ardından modeli sentetik kayıtlar oluşturmak için çalıştırmalıyız. Gretel, Airflow görevlerine entegrasyonu kolaylaştıran bir dizi Python SDK'ya sahiptir.

Python İstemci SDK'larına ek olarak, bir Gretel Hava Akış Kancası Gretel API bağlantılarını ve sırlarını yöneten. Bir Gretel Hava Akışı Bağlantısı kurduktan sonra, Gretel API'sine bağlanmak kadar kolaydır.

hooks.gretel'den GretelHook'u içe aktar gretel = GretelHook() projesi = gretel.get_project()


Airflow bağlantılarının nasıl yapılandırılacağı hakkında daha fazla bilgi için lütfen Github depomuza bakın. README.

Yukarıdaki örnekteki proje değişkeni, Gretel'in API'sini kullanarak sentetik modelleri eğitmek ve çalıştırmak için ana giriş noktası olarak kullanılabilir. Daha fazla ayrıntı için, bizim kontrol edebilirsiniz Python API belgeleri.

Ayırma ardışık düzenine geri dönersek, şimdi create_synthetic_features görevini gözden geçireceğiz. Bu adım, önceki görevde çıkarılan özellikleri kullanarak sentetik modelin eğitiminden sorumludur.

@task() def create_synthetic_features(data_source: str) -> str: proje = gretel.get_project() model = proje.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() anket(model) dönüş modeli.get_artifact_link("data_preview")


Metot imzasına baktığınızda, data_source bir yol izlediğini göreceksiniz. Bu değer, önceki adımda çıkarılan S3 özelliklerine işaret eder. Daha sonraki bir bölümde, tüm bu giriş ve çıkışların birbirine nasıl bağlandığını inceleyeceğiz.

Modeli project.create_model_obj kullanarak oluştururken, model_config parametresi, modeli oluşturmak için kullanılan sentetik model yapılandırmasını temsil eder. Bu boru hattında, bizim varsayılan model yapılandırması, ancak diğer birçok yapılandırma seçenekleri kullanılabilir.

Model yapılandırıldıktan sonra model.submit_cloud()'u çağırırız. Bu, modeli Gretel Cloud kullanarak eğitim ve kayıt üretimi için sunacaktır. poll(model) öğesinin çağrılması, model eğitimi tamamlayana kadar görevi engeller.

Artık model eğitildiğine göre, oluşturulan sentetik özellikleri indirmek için bir bağlantı döndürmek için get_artifact_link'i kullanacağız.



Sentetik özellikler kümesinin veri önizlemesi.

 

Bu yapıt bağlantısı, son upload_synthetic_features adımına girdi olarak kullanılacaktır.

Sentetik Özellikleri Yükle

 
 
Orijinal özellikler çıkarıldı ve sentetik bir versiyon oluşturuldu. Şimdi, alt tüketiciler tarafından erişilebilmesi için sentetik özellikleri yükleme zamanı. Bu örnekte, veri kümesi için son hedef olarak bir S3 paketini kullanacağız.

@task() def upload_synthetic_features(data_set: str): bağlam = get_current_context() ile open(data_set, "rb") synth_features olarak: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_v",synthetic. )


Bu görev oldukça basittir. data_set giriş değeri, sentetik veri kümesini Gretel'in API'sinden indirmek için imzalanmış bir HTTP bağlantısı içerir. Görev, bu dosyayı Airflow çalışanına okuyacak ve ardından sentetik özellik dosyasını alt tüketicilerin veya modellerin erişebileceği bir S3 klasörüne yüklemek için önceden yapılandırılmış S3 kancasını kullanacak.

Boru Hattını Düzenlemek

 
 
Son üç bölümde, bir veri kümesini ayıklamak, sentezlemek ve yüklemek için gereken tüm kodları inceledik. Son adım, bu görevlerin her birini tek bir Hava Akışı boru hattına bağlamaktır.

Bu yazının başına geri dönerseniz, DAG kavramından kısaca bahsettik. Airflow'un TaskFlow API'sini kullanarak, bu üç Python yöntemini girdileri, çıktıları ve her adımın çalıştırılacağı sırayı tanımlayan bir DAG'de birleştirebiliriz.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) sentetik_data = create_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Bu yöntem çağrılarının yolunu izlerseniz, sonunda orijinal özellik ardışık düzenimize benzeyen bir grafik elde edersiniz.



Airflow'ta Gretel sentetik boru hattı.

 

Bu boru hattını çalıştırmak ve eylem halinde görmek istiyorsanız, şuraya gidin: eşlik eden Github deposu. Orada bir Airflow örneğinin nasıl başlatılacağına ve ardışık düzenin uçtan uca nasıl çalıştırılacağına ilişkin talimatları bulacaksınız.

Bir şeyler sarmak

 
 
Buraya kadar geldiyseniz, Gretel'in Airflow üzerine kurulu bir veri hattına nasıl entegre edilebileceğini gördünüz. Gretel'in geliştirici dostu API'leri ile Airflow'un güçlü kanca ve operatör sistemini birleştirerek, verileri daha erişilebilir ve kullanımı daha güvenli hale getiren ETL boru hatları oluşturmak kolaydır.

Hassas verilere kolayca erişilemeyebilecek ortak bir özellik mühendisliği kullanım örneğinden de bahsettik. Veri kümesinin sentetik bir sürümünü oluşturarak, herhangi bir hassas verinin açığa çıkma riskini azaltır, ancak yine de veri kümesinin faydasını korurken, ihtiyaç duyanlar için hızlı bir şekilde kullanılabilir hale getiririz.

Özellik ardışık düzenini daha soyut terimlerle düşünürsek, artık herhangi bir sayıda yeni SQL sorgusu için yeniden kullanılabilecek bir modelimiz var. İşlem hattının yeni bir sürümünü dağıtarak ve ilk SQL sorgusunu değiştirerek, potansiyel olarak hassas herhangi bir sorguyu müşteri gizliliğini koruyan sentetik bir veri kümesiyle önleyebiliriz. Değiştirilmesi gereken tek kod satırı, sql dosyasının yoludur. Karmaşık veri mühendisliği gerekmez.

Okurken teşekkürler

 
 
Adresinden bize bir e-posta gönderin merhaba@gretel.ai ya da gel bize katıl Gevşeklik herhangi bir sorunuz veya yorumunuz varsa. Airflow'u nasıl kullandığınızı ve mevcut veri ardışık düzenlerinizle en iyi şekilde nasıl entegre olabileceğimizi öğrenmek isteriz.

 
Bio: yenibahar Gretel.ai'de Yazılım Mühendisi.

orijinal. İzinle yeniden yayınlandı.

İlgili:

Kaynak: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Zaman Damgası:

Den fazla KDNuggets