Δημιουργήστε έναν αγωγό συνθετικών δεδομένων χρησιμοποιώντας το Gretel και το Apache Airflow

Κόμβος πηγής: 1068200

Δημιουργήστε έναν αγωγό συνθετικών δεδομένων χρησιμοποιώντας το Gretel και το Apache Airflow

Σε αυτήν την ανάρτηση ιστολογίου, χτίζουμε μια διοχέτευση ETL που δημιουργεί συνθετικά δεδομένα από μια βάση δεδομένων PostgreSQL χρησιμοποιώντας τα API Synthetic Data και το Apache Airflow της Gretel.


By Ντρου Νιούμπερι, Μηχανικός Λογισμικού στο Gretel.ai

Δημιουργήστε έναν αγωγό συνθετικών δεδομένων χρησιμοποιώντας το Gretel και το Apache Airflow

Γεια σας παιδιά, με λένε Ντρου και είμαι μηχανικός λογισμικού εδώ στην Γκρέτελ. Πρόσφατα σκέφτηκα μοτίβα για την ενσωμάτωση των Gretel API σε υπάρχοντα εργαλεία, έτσι ώστε να είναι εύκολο να δημιουργηθούν αγωγοί δεδομένων όπου η ασφάλεια και το απόρρητο των πελατών είναι χαρακτηριστικά πρώτης κατηγορίας, όχι απλώς μια μεταγενέστερη σκέψη ή πλαίσιο προς έλεγχο.

Ένα εργαλείο μηχανικής δεδομένων που είναι δημοφιλές μεταξύ των μηχανικών και των πελατών της Gretel είναι το Apache Airflow. Τυχαίνει επίσης να λειτουργεί τέλεια με την Γκρέτελ. Σε αυτήν την ανάρτηση ιστολογίου, θα σας δείξουμε πώς να δημιουργήσετε έναν αγωγό συνθετικών δεδομένων χρησιμοποιώντας Airflow, Gretel και PostgreSQL. Ας πηδήξουμε!

Τι είναι η ροή αέρα

 
 
Ροής αέρα είναι ένα εργαλείο αυτοματισμού ροής εργασιών που χρησιμοποιείται συνήθως για τη δημιουργία αγωγών δεδομένων. Επιτρέπει σε μηχανικούς δεδομένων ή επιστήμονες δεδομένων να ορίσουν και να αναπτύξουν μέσω προγραμματισμού αυτούς τους αγωγούς χρησιμοποιώντας Python και άλλες γνωστές δομές. Στον πυρήνα της ροής αέρα βρίσκεται η έννοια ενός DAG, ή κατευθυνόμενου ακυκλικού γραφήματος. Το Airflow DAG παρέχει ένα μοντέλο και ένα σύνολο API για τον καθορισμό των στοιχείων του αγωγού, τις εξαρτήσεις τους και τη σειρά εκτέλεσης.

Μπορεί να βρείτε αγωγούς Airflow που αναπαράγουν δεδομένα από μια βάση δεδομένων προϊόντων σε μια αποθήκη δεδομένων. Άλλοι αγωγοί ενδέχεται να εκτελούν ερωτήματα που ενώνουν κανονικοποιημένα δεδομένα σε ένα ενιαίο σύνολο δεδομένων κατάλληλο για ανάλυση ή μοντελοποίηση. Ωστόσο, ένας άλλος αγωγός μπορεί να δημοσιεύει μια ημερήσια αναφορά που συγκεντρώνει βασικές επιχειρηματικές μετρήσεις. Ένα κοινό θέμα που μοιράζεται μεταξύ αυτών των περιπτώσεων χρήσης: ο συντονισμός της κίνησης δεδομένων στα συστήματα. Εδώ λάμπει το Airflow.

Αξιοποιώντας το Airflow και το πλούσιο οικοσύστημά του ολοκληρώσεις, οι μηχανικοί δεδομένων και οι επιστήμονες μπορούν να ενορχηστρώσουν οποιονδήποτε αριθμό διαφορετικών εργαλείων ή υπηρεσιών σε έναν ενιαίο ενοποιημένο αγωγό που είναι εύκολο να συντηρηθεί και να λειτουργήσει. Με την κατανόηση αυτών των δυνατοτήτων ολοκλήρωσης, θα αρχίσουμε τώρα να μιλάμε για το πώς η Gretel μπορεί να ενσωματωθεί σε έναν αγωγό Airflow για να βελτιώσει τις κοινές ροές εργασίας λειτουργιών δεδομένων.

Πώς ταιριάζει η Γκρέτελ;

 
 
Στην Gretel, η αποστολή μας είναι να κάνουμε τα δεδομένα ευκολότερα και ασφαλέστερα στην εργασία. Μιλώντας με πελάτες, ένα επίπονο σημείο για το οποίο ακούμε συχνά είναι ο χρόνος και η προσπάθεια που απαιτείται για να αποκτήσουν οι επιστήμονες δεδομένων πρόσβαση σε ευαίσθητα δεδομένα. Χρησιμοποιώντας Gretel Synthetics, μπορούμε να μειώσουμε τον κίνδυνο εργασίας με ευαίσθητα δεδομένα δημιουργώντας ένα συνθετικό αντίγραφο του συνόλου δεδομένων. Με την ενσωμάτωση της Gretel με το Airflow, είναι δυνατό να δημιουργηθούν αγωγοί αυτοεξυπηρέτησης που διευκολύνουν τους επιστήμονες δεδομένων να λάβουν γρήγορα τα δεδομένα που χρειάζονται χωρίς να απαιτείται μηχανικός δεδομένων για κάθε νέο αίτημα δεδομένων.

Για να δείξουμε αυτές τις δυνατότητες, θα δημιουργήσουμε μια διοχέτευση ETL που εξάγει χαρακτηριστικά δραστηριότητας χρήστη από μια βάση δεδομένων, δημιουργεί μια συνθετική έκδοση του συνόλου δεδομένων και αποθηκεύει το σύνολο δεδομένων στο S3. Με το συνθετικό σύνολο δεδομένων που είναι αποθηκευμένο στο S3, μπορεί στη συνέχεια να χρησιμοποιηθεί από επιστήμονες δεδομένων για μεταγενέστερη μοντελοποίηση ή ανάλυση χωρίς να διακυβεύεται το απόρρητο των πελατών.

Για να ξεκινήσουμε τα πράγματα, ας ρίξουμε πρώτα μια πανοραμική θέα του αγωγού. Κάθε κόμβος σε αυτό το διάγραμμα αντιπροσωπεύει ένα βήμα αγωγού ή «εργασία» με όρους ροής αέρα.



Παράδειγμα αγωγού συνθετικών Gretel στο Airflow.

 

Μπορούμε να χωρίσουμε τον αγωγό σε 3 στάδια, παρόμοια με αυτά που μπορεί να βρείτε σε έναν αγωγό ETL:

  • Εκχύλισμα – Η εργασία extract_features θα υποβάλει ερώτημα σε μια βάση δεδομένων και θα μετατρέψει τα δεδομένα σε ένα σύνολο χαρακτηριστικών που μπορούν να χρησιμοποιηθούν από επιστήμονες δεδομένων για την κατασκευή μοντέλων.
  • Συνθέτω – Το generate_synthetic_features θα λάβει τα εξαγόμενα χαρακτηριστικά ως είσοδο, θα εκπαιδεύσει ένα συνθετικό μοντέλο και στη συνέχεια θα δημιουργήσει ένα συνθετικό σύνολο χαρακτηριστικών χρησιμοποιώντας Gretel API και υπηρεσίες cloud.
  • Φορτίο – Το upload_synthetic_features αποθηκεύει το συνθετικό σύνολο χαρακτηριστικών στο S3, όπου μπορεί να ενσωματωθεί σε οποιοδήποτε μοντέλο ή ανάλυση κατάντη.

Στις επόμενες ενότητες θα βουτήξουμε σε καθένα από αυτά τα τρία βήματα με περισσότερες λεπτομέρειες. Εάν θέλετε να ακολουθήσετε μαζί με κάθε δείγμα κώδικα, μπορείτε να μεταβείτε στο gretelai/gretel-airflow-pipelines και κατεβάστε όλο τον κώδικα που χρησιμοποιείται σε αυτήν την ανάρτηση ιστολογίου. Το αποθετήριο περιέχει επίσης οδηγίες που μπορείτε να ακολουθήσετε για να ξεκινήσετε μια παρουσία ροής αέρα και να εκτελέσετε τον αγωγό από άκρη σε άκρη.

Επιπλέον, μπορεί να είναι χρήσιμο να προβάλετε ολόκληρο τον αγωγό ροής αέρα, προτού αναλύσουμε κάθε στοιχείο, dags/airbnb_user_bookings.py. Τα αποσπάσματα κώδικα στις ακόλουθες ενότητες εξάγονται από τη διοχέτευση κρατήσεων συνδεδεμένου χρήστη.

Εξαγωγή Χαρακτηριστικά

 
 
Η πρώτη εργασία, το extract_features είναι υπεύθυνη για την εξαγωγή ακατέργαστων δεδομένων από τη βάση δεδομένων πηγής και τη μετατροπή τους σε ένα σύνολο χαρακτηριστικών. Αυτό είναι κοινό μηχανική χαρακτηριστικών πρόβλημα που μπορεί να βρείτε σε οποιαδήποτε διοχέτευση μηχανικής εκμάθησης ή αναλυτικών στοιχείων.

Στο παράδειγμά μας θα παρέχουμε μια βάση δεδομένων PostgreSQL και θα τη φορτώσουμε με δεδομένα κράτησης από ένα Διαγωνισμός Kaggle Airbnb.

Αυτό το σύνολο δεδομένων περιέχει δύο πίνακες, Χρήστες και Συνόδους. Το Sessions περιέχει μια αναφορά ξένου κλειδιού, user_id. Χρησιμοποιώντας αυτήν τη σχέση, θα δημιουργήσουμε ένα σύνολο λειτουργιών που θα περιέχουν διάφορες μετρήσεις κρατήσεων συγκεντρωμένες ανά χρήστη. Το παρακάτω σχήμα αντιπροσωπεύει το ερώτημα SQL που χρησιμοποιείται για τη δημιουργία των δυνατοτήτων.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count (DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_lapsed)) AS avg_session_time_seconds(s)session_time_seconds round(min(secs_lapsed)) AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u. γλώσσα χρήστες u ON u.id = s.user_id LIMIT 5000


Στη συνέχεια, το ερώτημα SQL εκτελείται από τη γραμμή ροής αέρα μας και γράφεται σε μια ενδιάμεση θέση S3 χρησιμοποιώντας τον ακόλουθο ορισμό εργασιών.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_bookingsvarylemedc. (mode="r+", επίθημα=".csv") ως tmp_csv: postgres.copy_expert( f"copy ({sql_query}) στο stdout με κεφαλίδα csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, κλειδί=κλειδί, ) κλειδί επιστροφής


Η είσοδος στην εργασία, sql_file, καθορίζει ποιο ερώτημα θα εκτελεστεί στη βάση δεδομένων. Αυτό το ερώτημα θα αναγνωσθεί στην εργασία και στη συνέχεια θα εκτελεστεί στη βάση δεδομένων. Τα αποτελέσματα του ερωτήματος θα γραφτούν στη συνέχεια στο S3 και το κλειδί απομακρυσμένου αρχείου θα επιστραφεί ως έξοδος της εργασίας.

Το παρακάτω στιγμιότυπο οθόνης δείχνει ένα δείγμα συνόλου αποτελεσμάτων του ερωτήματος εξαγωγής από πάνω. Θα περιγράψουμε πώς να δημιουργήσετε μια συνθετική έκδοση αυτού του συνόλου δεδομένων στην επόμενη ενότητα.



Προεπισκόπηση αποτελεσμάτων ερωτήματος.

Σύνθεση δυνατοτήτων χρησιμοποιώντας Gretel API

 
 
Για να δημιουργήσουμε μια συνθετική έκδοση κάθε χαρακτηριστικού, πρέπει πρώτα να εκπαιδεύσουμε ένα συνθετικό μοντέλο και μετά να εκτελέσουμε το μοντέλο για να δημιουργήσουμε συνθετικές εγγραφές. Η Gretel διαθέτει ένα σύνολο Python SDK που καθιστούν εύκολη την ενσωμάτωση σε εργασίες Airflow.

Εκτός από τα Python Client SDK, δημιουργήσαμε ένα Γάντζος ροής αέρα Gretel που διαχειρίζεται τις συνδέσεις και τα μυστικά του Gretel API. Μετά τη ρύθμιση μιας Gretel Airflow Connection, η σύνδεση στο Gretel API είναι τόσο εύκολη όσο

από hooks.gretel εισαγωγή GretelHook gretel = GretelHook() project = gretel.get_project()


Για περισσότερες πληροφορίες σχετικά με τον τρόπο διαμόρφωσης των συνδέσεων Airflow, ανατρέξτε στο αποθετήριο Github README.

Η μεταβλητή έργου στο παραπάνω παράδειγμα μπορεί να χρησιμοποιηθεί ως το κύριο σημείο εισόδου για εκπαίδευση και εκτέλεση συνθετικών μοντέλων χρησιμοποιώντας το API της Gretel. Για περισσότερες λεπτομέρειες, μπορείτε να ανατρέξετε στο δικό μας Έγγραφα Python API.

Αναφερόμενοι στο πρόγραμμα κρατήσεων, θα εξετάσουμε τώρα την εργασία generate_synthetic_features. Αυτό το βήμα είναι υπεύθυνο για την εκπαίδευση του συνθετικού μοντέλου χρησιμοποιώντας τα χαρακτηριστικά που εξήχθησαν στην προηγούμενη εργασία.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) cloud()submit_ poll(model) return model.get_artifact_link("data_preview")


Κοιτάζοντας την υπογραφή της μεθόδου, θα δείτε ότι παίρνει μια διαδρομή, data_source. Αυτή η τιμή δείχνει τα χαρακτηριστικά S3 που εξήχθησαν στο προηγούμενο βήμα. Σε επόμενη ενότητα θα δούμε πώς όλες αυτές οι είσοδοι και οι έξοδοι συνδέονται μεταξύ τους.

Κατά τη δημιουργία του μοντέλου χρησιμοποιώντας το project.create_model_obj, η παράμετρος model_config αντιπροσωπεύει τη διαμόρφωση συνθετικού μοντέλου που χρησιμοποιείται για τη δημιουργία του μοντέλου. Σε αυτόν τον αγωγό, χρησιμοποιούμε το δικό μας προεπιλεγμένη διαμόρφωση μοντέλου, αλλά πολλά άλλα επιλογές διαμόρφωσης είναι διαθέσιμες.

Αφού διαμορφωθεί το μοντέλο, καλούμε model.submit_cloud(). Αυτό θα υποβάλει το μοντέλο για εκπαίδευση και δημιουργία ρεκόρ χρησιμοποιώντας το Gretel Cloud. Η κλήση δημοσκόπησης(μοντέλο) θα μπλοκάρει την εργασία έως ότου το μοντέλο ολοκληρώσει την εκπαίδευση.

Τώρα που το μοντέλο έχει εκπαιδευτεί, θα χρησιμοποιήσουμε το get_artifact_link για να επιστρέψουμε έναν σύνδεσμο για τη λήψη των συνθετικών χαρακτηριστικών που δημιουργούνται.



Προεπισκόπηση δεδομένων του συνθετικού συνόλου χαρακτηριστικών.

 

Αυτός ο σύνδεσμος τεχνουργήματος θα χρησιμοποιηθεί ως είσοδος στο τελικό βήμα upload_synthetic_features.

Φόρτωση συνθετικών χαρακτηριστικών

 
 
Τα αρχικά χαρακτηριστικά έχουν εξαχθεί και έχει δημιουργηθεί μια συνθετική έκδοση. Τώρα ήρθε η ώρα να ανεβάσετε τα συνθετικά χαρακτηριστικά, ώστε να είναι προσβάσιμα από μεταγενέστερους καταναλωτές. Σε αυτό το παράδειγμα, θα χρησιμοποιήσουμε έναν κάδο S3 ως τελικό προορισμό για το σύνολο δεδομένων.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() με open(data_set, "rb") ως synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._svynthetic. )


Αυτή η εργασία είναι αρκετά απλή. Η τιμή εισαγωγής του συνόλου δεδομένων περιέχει έναν υπογεγραμμένο σύνδεσμο HTTP για λήψη του συνθετικού δεδομένων από το API της Γκρέτελ. Η εργασία θα διαβάσει αυτό το αρχείο στο Airflow Worker και, στη συνέχεια, θα χρησιμοποιήσει το ήδη διαμορφωμένο άγκιστρο S3 για να ανεβάσει το αρχείο συνθετικών χαρακτηριστικών σε έναν κάδο S3 όπου οι μεταγενέστεροι καταναλωτές ή μοντέλα μπορούν να έχουν πρόσβαση σε αυτό.

Ενορχηστρώνοντας τον αγωγό

 
 
Κατά τις τρεις τελευταίες ενότητες, διαβάσαμε όλο τον κώδικα που απαιτείται για την εξαγωγή, τη σύνθεση και τη φόρτωση ενός συνόλου δεδομένων. Το τελευταίο βήμα είναι να συνδέσετε καθεμία από αυτές τις εργασίες σε έναν ενιαίο αγωγό ροής αέρα.

Αν θυμάστε στην αρχή αυτής της ανάρτησης, αναφέραμε εν συντομία την έννοια του DAG. Χρησιμοποιώντας το API TaskFlow του Airflow, μπορούμε να συνθέσουμε αυτές τις τρεις μεθόδους Python σε ένα DAG που ορίζει τις εισόδους, τις εξόδους και τη σειρά που θα εκτελεστεί κάθε βήμα.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Εάν ακολουθήσετε τη διαδρομή αυτών των κλήσεων μεθόδων, θα λάβετε τελικά ένα γράφημα που μοιάζει με την αρχική γραμμή χαρακτηριστικών μας.



Συνθετικός αγωγός Gretel στο Airflow.

 

Εάν θέλετε να εκτελέσετε αυτόν τον αγωγό και να τον δείτε σε δράση, κατευθυνθείτε στο που συνοδεύει το αποθετήριο Github. Εκεί θα βρείτε οδηγίες για το πώς να ξεκινήσετε μια παρουσία ροής αέρα και να εκτελέσετε τον αγωγό από άκρη σε άκρη.

Τυλίξτε τα πράγματα

 
 
Εάν το έχετε φτάσει μέχρι εδώ, έχετε δει πώς η Gretel μπορεί να ενσωματωθεί σε έναν αγωγό δεδομένων που βασίζεται στο Airflow. Συνδυάζοντας τα φιλικά προς τους προγραμματιστές API της Gretel και το ισχυρό σύστημα αγκίστρων και χειριστών της Airflow, είναι εύκολο να δημιουργηθούν αγωγοί ETL που κάνουν τα δεδομένα πιο προσβάσιμα και ασφαλέστερα στη χρήση.

Μιλήσαμε επίσης για μια κοινή περίπτωση χρήσης μηχανικής χαρακτηριστικών όπου τα ευαίσθητα δεδομένα ενδέχεται να μην είναι εύκολα προσβάσιμα. Δημιουργώντας μια συνθετική έκδοση του συνόλου δεδομένων, μειώνουμε τον κίνδυνο έκθεσης οποιωνδήποτε ευαίσθητων δεδομένων, αλλά διατηρούμε τη χρησιμότητα του συνόλου δεδομένων ενώ το καθιστούμε γρήγορα διαθέσιμο σε όσους το χρειάζονται.

Σκεφτόμενοι τη διοχέτευση χαρακτηριστικών με πιο αφηρημένους όρους, έχουμε τώρα ένα μοτίβο που μπορεί να επαναπροσδιοριστεί για οποιοδήποτε αριθμό νέων ερωτημάτων SQL. Με την ανάπτυξη μιας νέας έκδοσης του pipeline και την εναλλαγή του αρχικού ερωτήματος SQL, μπορούμε να αντιμετωπίσουμε οποιοδήποτε δυνητικά ευαίσθητο ερώτημα με ένα συνθετικό σύνολο δεδομένων που διατηρεί το απόρρητο των πελατών. Η μόνη γραμμή κώδικα που πρέπει να αλλάξει είναι η διαδρομή προς το αρχείο sql. Δεν απαιτείται πολύπλοκη μηχανική δεδομένων.

Ευχαριστώ για την ανάγνωση

 
 
Στείλτε μας ένα email στο hi@gretel.ai ή ελάτε μαζί μας Χαλαρότητα εάν έχετε ερωτήσεις ή σχόλια. Θα θέλαμε να μάθουμε πώς χρησιμοποιείτε το Airflow και πώς μπορούμε να ενσωματωθούμε καλύτερα με τις υπάρχουσες σωληνώσεις δεδομένων σας.

 
Bio: Ντρου Νιούμπερι είναι Μηχανικός Λογισμικού στο Gretel.ai.

Πρωτότυπο. Αναδημοσιεύτηκε με άδεια.

Συγγενεύων:

Πηγή: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Σφραγίδα ώρας:

Περισσότερα από KDnuggets