Automatizați replicarea surselor relaționale într-un lac de date tranzacționale cu Apache Iceberg și AWS Glue

Automatizați replicarea surselor relaționale într-un lac de date tranzacționale cu Apache Iceberg și AWS Glue

Nodul sursă: 1958466

Organizațiile au ales să construiască lacuri de date pe deasupra Serviciul Amazon de stocare simplă (Amazon S3) de mulți ani. Un lac de date este cea mai populară alegere pentru organizații pentru a stoca toate datele organizaționale generate de diferite echipe, în domeniile de afaceri, din toate formatele diferite și chiar de-a lungul istoriei. Conform un studiu, compania obișnuită vede că volumul datelor lor crește cu o rată care depășește 50% pe an, gestionând de obicei o medie de 33 de surse de date unice pentru analiză.

Echipele încearcă adesea să reproducă mii de joburi din baze de date relaționale cu același model de extragere, transformare și încărcare (ETL). Există mult efort în menținerea stărilor de muncă și în programarea acestor locuri de muncă individuale. Această abordare ajută echipele să adauge tabele cu puține modificări și, de asemenea, menține starea postului cu un efort minim. Acest lucru poate duce la o îmbunătățire uriașă a cronologiei de dezvoltare și la urmărirea cu ușurință a joburilor.

În această postare, vă arătăm cum să replicați cu ușurință toate depozitele de date relaționale într-un lac de date tranzacționale într-un mod automat, cu o singură lucrare ETL folosind Apache Iceberg și AWS Adeziv.

Arhitectura soluțiilor

Lacurile de date sunt de obicei organizate folosind compartimente S3 separate pentru trei straturi de date: stratul brut care conține date în forma sa originală, stratul de etapă care conține date intermediare procesate optimizate pentru consum și stratul de analiză care conține date agregate pentru cazuri de utilizare specifice. În stratul brut, tabelele sunt de obicei organizate pe baza surselor lor de date, în timp ce tabelele din stratul de etapă sunt organizate pe baza domeniilor de afaceri cărora le aparțin.

Această postare oferă un Formarea AWS Cloud șablon care implementează o sarcină AWS Glue care citește o cale Amazon S3 pentru o sursă de date a stratului brut al lacului de date și ingerează datele în tabelele Apache Iceberg de pe stratul de scenă folosind Suport AWS Glue pentru cadrele lacurilor de date. Lucrarea se așteaptă ca tabelele din stratul brut să fie structurate astfel Serviciul de migrare a bazelor de date AWS (AWS DMS) le ingerează: schema, apoi tabelul, apoi fișierele de date.

Această soluție folosește Magazin de parametri AWS Systems Manager pentru configurarea tabelului. Ar trebui să modificați acest parametru specificând tabelele pe care doriți să le procesați și cum, inclusiv informații precum cheia primară, partițiile și domeniul de afaceri asociat. Lucrarea folosește aceste informații pentru a crea automat o bază de date (dacă nu există deja) pentru fiecare domeniu de afaceri, pentru a crea tabelele Iceberg și pentru a efectua încărcarea datelor.

În sfârșit, putem folosi Amazon Atena pentru a interoga datele din tabelele Iceberg.

Următoarea diagramă ilustrează această arhitectură.

Arhitectura soluțiilor

Această implementare are următoarele considerații:

  • Toate tabelele din sursa de date trebuie să aibă o cheie primară pentru a fi replicate folosind această soluție. Cheia primară poate fi o singură coloană sau o cheie compusă cu mai mult de o coloană.
  • Dacă lacul de date conține tabele care nu au nevoie de upsers sau nu au o cheie primară, le puteți exclude din configurația parametrilor și puteți implementa procese ETL tradiționale pentru a le ingera în lacul de date. Asta este în afara domeniului de aplicare al acestei postări.
  • Dacă există surse de date suplimentare care trebuie ingerate, puteți implementa mai multe stive CloudFormation, câte unul pentru a gestiona fiecare sursă de date.
  • Lucrarea AWS Glue este concepută pentru a procesa datele în două faze: încărcarea inițială care rulează după ce AWS DMS termină sarcina de încărcare completă și încărcarea incrementală care rulează conform unui program care aplică fișierele de captare a datelor de modificare (CDC) capturate de AWS DMS. Prelucrarea incrementală se realizează folosind un Marcaj de job AWS Glue.

Există nouă pași pentru a finaliza acest tutorial:

  1. Configurați un punct final sursă pentru AWS DMS.
  2. Implementați soluția folosind AWS CloudFormation.
  3. Examinați sarcina de replicare AWS DMS.
  4. Opțional, adăugați permisiuni pentru criptare și decriptare sau Formația lacului AWS.
  5. Examinați configurația tabelului în Parameter Store.
  6. Efectuați încărcarea inițială a datelor.
  7. Efectuați încărcarea incrementală a datelor.
  8. Monitorizați ingerarea în masă.
  9. Programați încărcarea incrementală a datelor în lot.

Cerințe preliminare

Înainte de a începe acest tutorial, ar trebui să fiți deja familiarizați cu Iceberg. Dacă nu, puteți începe prin a replica un singur tabel urmând instrucțiunile din Implementați un UPSERT bazat pe CDC într-un lac de date folosind Apache Iceberg și AWS Glue. În plus, configurați următoarele:

Configurați un punct final sursă pentru AWS DMS

Înainte de a crea sarcina noastră AWS DMS, trebuie să setăm un punct final sursă pentru a se conecta la baza de date sursă:

  1. Pe consola AWS DMS, alegeți Puncte finale în panoul de navigare.
  2. Alege Creați punct final.
  3. Dacă baza de date rulează pe Amazon RDS, alegeți Selectați instanța RDS DB, apoi alegeți instanța din listă. În caz contrar, alegeți motorul sursă și furnizați informațiile de conectare fie prin Manager de secrete AWS sau manual.
  4. Pentru Identificatorul punctului final, introduceți un nume pentru punctul final; de exemplu, source-postgresql.
  5. Alege Creați punct final.

Implementați soluția folosind AWS CloudFormation

Creați o stivă CloudFormation folosind șablonul furnizat. Parcurgeți următorii pași:

  1. Alege Lansați stiva:
  2. Alege Pagina Următoare →.
  3. Furnizați un nume de stivă, cum ar fi transactionaldl-postgresql.
  4. Introduceți parametrii necesari:
    1. DMSS3EndpointIAMRoleARN – Rolul IAM ARN pentru AWS DMS pentru a scrie date în Amazon S3.
    2. ReplicationInstanceArn – ARN-ul instanței de replicare AWS DMS.
    3. S3BucketStage – Numele găleții existente utilizată pentru stratul de scenă al lacului de date.
    4. S3BucketGlue – Numele compartimentului S3 existent pentru stocarea scripturilor AWS Glue.
    5. S3BucketRaw – Numele găleții existente utilizată pentru stratul brut al lacului de date.
    6. SourceEndpointArn – ARN-ul punctului final AWS DMS pe care l-ați creat mai devreme.
    7. Numele sursei – Identificatorul arbitrar al sursei de date de replicat (de exemplu, postgres). Acesta este folosit pentru a defini calea S3 a lacului de date (stratul brut) unde vor fi stocate datele.
  5. Nu modificați următorii parametri:
    1. SourceS3BucketBlog – Numele compartimentului în care este stocat scriptul AWS Glue furnizat.
    2. SourceS3BucketPrefix – Numele prefixului compartimentului în care este stocat scriptul AWS Glue furnizat.
  6. Alege Pagina Următoare → de două ori.
  7. Selectați Recunosc că AWS CloudFormation ar putea crea resurse IAM cu nume personalizate.
  8. Alege Creați stivă.

După aproximativ 5 minute, stiva CloudFormation este implementată.

Examinați sarcina de replicare AWS DMS

Implementarea AWS CloudFormation a creat un punct final țintă AWS DMS pentru dvs. Din cauza a două setări specifice punctului final, datele vor fi ingerate pe măsură ce avem nevoie de ele pe Amazon S3.

  1. Pe consola AWS DMS, alegeți Puncte finale în panoul de navigare.
  2. Căutați și alegeți punctul final care începe cu dmsIcebergs3endpoint.
  3. Examinați setările punctului final:
    1. DataFormat este specificat ca parquet.
    2. TimestampColumnName va adăuga coloana last_update_time cu data creării înregistrărilor pe Amazon S3.

Setările punctului final AWS DMS

Implementarea creează, de asemenea, o sarcină de replicare AWS DMS care începe cu dmsicebergtask.

  1. Alege Sarcini de replicare în panoul de navigare și căutați sarcina.

Veți vedea că Tip de sarcină este marcat ca Încărcare completă, replicare continuă. AWS DMS va efectua o încărcare completă inițială a datelor existente, apoi va crea fișiere incrementale cu modificări efectuate în baza de date sursă.

Pe Reguli de cartografiere fila, există două tipuri de reguli:

  • O regulă de selecție cu numele schemei sursă și al tabelelor care vor fi ingerate din baza de date sursă. În mod implicit, utilizează baza de date eșantion furnizată în cerințele preliminare, dms_sample, și toate tabelele cu cuvântul cheie %.
  • Două reguli de transformare care includ în fișierele țintă de pe Amazon S3 numele schemei și numele tabelului ca coloane. Acesta este folosit de jobul nostru AWS Glue pentru a ști căror tabele corespund fișierele din lacul de date.

Pentru a afla mai multe despre cum să personalizați acest lucru pentru propriile surse de date, consultați Reguli și acțiuni de selecție.

Reguli de cartografiere AWS

Să modificăm câteva configurații pentru a finaliza pregătirea sarcinilor noastre.

  1. Pe Acţiuni meniu, alegeți Modifica.
  2. În Setări sarcini secțiune, sub Opriți sarcina după finalizarea încărcării complete, alege Opriți după aplicarea modificărilor din cache.

În acest fel, putem controla încărcarea inițială și generarea incrementală a fișierelor în doi pași diferiți. Folosim această abordare în doi pași pentru a rula jobul AWS Glue o dată la fiecare pas.

  1. În Jurnalele de sarcini, alege Activați jurnalele CloudWatch.
  2. Alege Economisiți.
  3. Așteptați aproximativ 1 minut pentru ca starea sarcinii de migrare a bazei de date să se afișeze ca Gata.

Adăugați permisiuni pentru criptare și decriptare sau Lake Formation

Opțional, puteți adăuga permisiuni pentru criptare și decriptare sau Lake Formation.

Adăugați permisiuni de criptare și decriptare

Dacă gălețile dvs. S3 utilizate pentru straturile brute și de scenă sunt criptate folosind AWS Service Management Service (AWS KMS) chei gestionate de client, trebuie să adăugați permisiuni pentru a permite jobului AWS Glue să acceseze datele:

Adăugați permisiuni Lake Formation

Dacă gestionați permisiunile folosind Lake Formation, trebuie să permiteți jobului dvs. AWS Glue să creeze bazele de date și tabelele domeniului dvs. prin rolul IAM GlueJobRole.

  1. Acordați permisiuni pentru a crea baze de date (pentru instrucțiuni, consultați Crearea unei baze de date).
  2. Acordați permisiuni SUPER pentru default Bază de date.
  3. Acordați permisiuni privind locația datelor.
  4. Dacă creați baze de date manual, acordați permisiuni pentru toate bazele de date pentru a crea tabele. A se referi la Acordarea permisiunilor de tabel utilizând consola Lake Formation și metoda resurselor numite or Acordarea de permisiuni Data Catalog folosind metoda LF-TBAC în funcție de cazul dvs. de utilizare.

După ce finalizați etapa ulterioară de a efectua încărcarea inițială a datelor, asigurați-vă că adăugați și permisiuni pentru consumatori pentru a interoga tabelele. Rolul locului de muncă va deveni proprietarul tuturor tabelelor create, iar administratorul lacului de date poate apoi acorda granturi pentru utilizatori suplimentari.

Examinați configurația tabelului în Parameter Store

Lucrarea AWS Glue care realizează ingerarea datelor în tabelele Iceberg utilizează specificația tabelului furnizată în Parameter Store. Parcurgeți următorii pași pentru a examina magazinul de parametri care a fost configurat automat pentru dvs. Dacă este necesar, modificați în funcție de nevoile dvs.

  1. Pe consola Parameter Store, alegeți Parametrii mei în panoul de navigare.

Stiva CloudFormation a creat doi parametri:

  • iceberg-config pentru configurații de locuri de muncă
  • iceberg-tables pentru configurarea tabelului
  1. Alegeți parametrul mese-aisberg.

Structura JSON conține informații pe care AWS Glue le folosește pentru a citi datele și a scrie tabelele Iceberg pe domeniul țintă:

  • Un obiect pe masă – Numele obiectului este creat folosind numele schemei, un punct și numele tabelului; de exemplu, schema.table.
  • cheia principala – Acest lucru ar trebui specificat pentru fiecare tabel sursă. Puteți furniza o singură coloană sau o listă de coloane separate prin virgulă (fără spații).
  • partitionCols – Opțional, aceasta parțiază coloanele pentru tabelele țintă. Dacă nu doriți să creați tabele partiționate, furnizați un șir gol. În caz contrar, furnizați o singură coloană sau o listă de coloane separate prin virgulă de utilizat (fără spații).
  1. Dacă doriți să utilizați propria sursă de date, utilizați următorul cod JSON și înlocuiți textul în CAPS din șablonul furnizat. Dacă utilizați sursa de date eșantion furnizată, păstrați setările implicite:
{ "SCHEMA_NAME.TABLE_NAME_1": { "primaryKey": "ONLY_PRIMARY_KEY", "domain": "TARGET_DOMAIN", "partitionCols": "" }, "SCHEMA_NAME.TABLE_NAME_2": { "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY", "domain": "TARGET_DOMAIN", "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO" }
}
  1. Alege Salvează modificările.

Efectuați încărcarea inițială a datelor

Acum că configurarea necesară este terminată, ingerăm datele inițiale. Acest pas include trei părți: ingerarea datelor din baza de date relațională sursă în stratul brut al lacului de date, crearea tabelelor Iceberg pe stratul scenic al lacului de date și verificarea rezultatelor folosind Athena.

Ingerați date în stratul brut al lacului de date

Pentru a ingera date din sursa de date relaționale (PostgreSQL dacă utilizați eșantionul furnizat) în lacul nostru de date tranzacționale folosind Iceberg, parcurgeți următorii pași:

  1. Pe consola AWS DMS, alegeți Sarcini de migrare a bazei de date în panoul de navigare.
  2. Selectați sarcina de replicare pe care ați creat-o și pe Acţiuni meniu, alegeți Reporniți/Reluați.
  3. Așteptați aproximativ 5 minute pentru finalizarea sarcinii de replicare. Puteți monitoriza tabelele ingerate pe Statistici fila sarcinii de replicare.

Statistici de încărcare completă AWS DMS

După câteva minute, sarcina se termină cu mesajul Încărcare completă completă.

  1. Pe consola Amazon S3, alegeți găleata pe care ați definit-o ca strat brut.

Sub prefixul S3 definit pe AWS DMS (de exemplu, postgres), ar trebui să vedeți o ierarhie de foldere cu următoarea structură:

  • Schemă
    • Numele tabelului
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

Obiecte de încărcare completă AWS DMS create pe S3

Dacă găleata S3 este goală, verificați Depanarea sarcinilor de migrare în AWS Database Migration Service înainte de a rula sarcina AWS Glue.

Creați și introduceți date în tabelele Iceberg

Înainte de a rula jobul, să navigăm în scriptul jobului AWS Glue furnizat ca parte a stivei CloudFormation pentru a înțelege comportamentul acestuia.

  1. Pe consola AWS Glue Studio, alegeți Locuri de munca în panoul de navigare.
  2. Căutați jobul care începe cu IcebergJob- și un sufix al numelui stivei dvs. CloudFormation (de exemplu, IcebergJob-transactionaldl-postgresql).
  3. Alege jobul.

Evaluare a postului AWS Glue ETL

Scriptul de job primește configurația de care are nevoie din Parameter Store. Functia getConfigFromSSM() returnează configurații legate de job, cum ar fi compartimentele sursă și țintă de unde datele trebuie citite și scrise. Variabila ssmparam_table_values conțin informații legate de tabel, cum ar fi domeniul de date, numele tabelului, coloanele de partiție și cheia primară a tabelelor care trebuie ingerate. Vedeți următorul cod Python:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables" # Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

Scriptul folosește un nume de catalog arbitrar pentru Iceberg, care este definit ca my_catalog. Acest lucru este implementat în AWS Glue Data Catalog folosind configurațiile Spark, astfel încât o operație SQL care indică my_catalog va fi aplicată în Data Catalog. Vezi următorul cod:

catalog_name = 'my_catalog'
errored_table_list = [] # Iceberg configuration
spark = SparkSession.builder .config('spark.sql.warehouse.dir', warehouse_path) .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') .getOrCreate()

Scriptul iterează peste tabelele definite în Parameter Store și efectuează logica pentru detectarea dacă tabelul există și dacă datele primite sunt o încărcare inițială sau un upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values: # Get table data isTableExists = False schemaName, tableName = key.split('.') logger.info(f'Processing table : {tableName}')

initialLoadRecordsSparkSQL() funcția încarcă datele inițiale atunci când nu este prezentă nicio coloană de operare în fișierele S3. AWS DMS adaugă această coloană numai la fișierele de date Parquet produse de replicarea continuă (CDC). Încărcarea datelor se realizează folosind comanda INSERT INTO cu SparkSQL. Vezi următorul cod:

sqltemp = Template(""" INSERT INTO $catalog_name.$dbName.$tableName ($insertTableColumnList) SELECT $insertTableColumnList FROM insertTable $partitionStrSQL """)
SQLQUERY = sqltemp.substitute( catalog_name = catalog_name, dbName = dbName, tableName = tableName, insertTableColumnList = insertTableColumnList[ : -1], partitionStrSQL = partitionStrSQL) logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Acum rulăm lucrarea AWS Glue pentru a ingera datele inițiale în tabelele Iceberg. Stiva CloudFormation adaugă --datalake-formats parametrul, adăugând bibliotecile Iceberg necesare la job.

  1. Alege Rulați jobul.
  2. Alege Job Runs pentru a monitoriza starea. Așteptați până când starea este Run Succeeded.

Verificați datele încărcate

Pentru a confirma că sarcina a procesat datele conform așteptărilor, parcurgeți următorii pași:

  1. Pe consola Athena, alegeți Editorul interogărilor în panoul de navigare.
  2. Verifica AwsDataCatalog este selectat ca sursă de date.
  3. În Baza de date, alegeți domeniul de date pe care doriți să îl explorați, pe baza configurației pe care ați definit-o în magazinul de parametri. Dacă utilizați baza de date exemplu furnizată, utilizați sports.

În Tabele și vederi, putem vedea lista de tabele care au fost create de jobul AWS Glue.

  1. Alegeți meniul de opțiuni (trei puncte) de lângă numele primului tabel, apoi alegeți Previzualizarea datelor.

Puteți vedea datele încărcate în tabelele Iceberg. Datele inițiale de revizuire Amazon Athena au fost încărcate

Efectuați încărcarea incrementală a datelor

Acum începem să captăm modificările din baza noastră de date relațională și să le aplicăm lacului de date tranzacționale. Acest pas este, de asemenea, împărțit în trei părți: capturarea modificărilor, aplicarea lor pe tabelele Iceberg și verificarea rezultatelor.

Capturați modificările din baza de date relațională

Datorită configurației pe care am specificat-o, sarcina de replicare s-a oprit după rularea fazei de încărcare completă. Acum repornim sarcina de a adăuga fișiere incrementale cu modificări în stratul brut al lacului de date.

  1. Pe consola AWS DMS, selectați sarcina creată și executată înainte.
  2. Pe Acţiuni meniu, alegeți Relua.
  3. Alege Începeți sarcina pentru a începe să capteze modificări.
  4. Pentru a declanșa crearea de noi fișiere pe lacul de date, efectuați inserări, actualizări sau ștergeri în tabelele bazei de date sursă folosind instrumentul preferat de administrare a bazei de date. Dacă utilizați baza de date exemplu furnizată, puteți rula următoarele comenzi SQL:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31'; update dms_sample.mlb_data set bats = 'R'
where mlb_id=506560 and bats='L'; update dms_sample.sporting_event set start_date = current_date where id=11 and sold_out=0;
  1. Pe pagina cu detaliile sarcinii AWS DMS, alegeți Statistici de tabel pentru a vedea modificările capturate.
    Statistici AWS DMS CDC
  2. Deschideți stratul brut al lacului de date pentru a găsi un fișier nou care conține modificările incrementale în interiorul prefixului fiecărui tabel, de exemplu sub sporting_event prefix.

Înregistrarea cu modificări pentru sporting_event tabelul arată ca următoarea captură de ecran.

Obiectele AWS DMS au migrat în S3 cu CDC

Observați Op coloană la început identificată cu o actualizare (U). De asemenea, a doua valoare de dată/oră este coloana de control adăugată de AWS DMS cu ora la care a fost capturată modificarea.

Schema de fișiere CDC pe Amazon S3

Aplicați modificări pe tabelele Iceberg folosind AWS Glue

Acum rulăm din nou jobul AWS Glue și va procesa automat doar noile fișiere incrementale, deoarece marcajul jobului este activat. Să revizuim cum funcționează.

dedupCDCRecords() funcția efectuează deduplicarea datelor deoarece mai multe modificări la un singur ID de înregistrare pot fi capturate în același fișier de date pe Amazon S3. Deduplicarea se realizează pe baza last_update_time coloană adăugată de AWS DMS care indică marcajul de timp al când a fost capturată modificarea. Vedeți următorul cod Python:

def dedupCDCRecords(inputDf, keylist): IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize) inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF)) NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'") UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')") finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf) return finalInputDF

Pe linia 99, the upsertRecordsSparkSQL() funcția efectuează upsertarea într-un mod similar cu încărcarea inițială, dar de data aceasta cu o comandă SQL MERGE.

Examinați modificările aplicate

Deschideți consola Athena și rulați o interogare care selectează înregistrările modificate din baza de date sursă. Dacă utilizați baza de date exemplu furnizată, utilizați una dintre următoarele interogări SQL:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Datele CDC de revizuire Amazon Athena au fost încărcate

Monitorizați ingerarea în masă

Scriptul de job AWS Glue este codificat cu simplu Gestionarea excepțiilor Python pentru a detecta erorile în timpul procesării unui anumit tabel. Marcajul jobului este salvat după ce fiecare tabel termină procesarea cu succes, pentru a evita reprocesarea tabelelor dacă executarea jobului este reîncercată pentru tabelele cu erori.

Interfața liniei de comandă AWS (AWS CLI) oferă a get-job-bookmark comandă pentru AWS Glue care oferă informații despre starea marcajului pentru fiecare tabel procesat.

  1. Pe consola AWS Glue Studio, alegeți lucrarea ETL.
  2. Alege Job Runs fila și copiați ID-ul de rulare a lucrării.
  3. Rulați următoarea comandă pe un terminal autentificat pentru AWS CLI, înlocuind <GLUE_JOB_RUN_ID> pe linia 1 cu valoarea pe care ați copiat-o. Dacă stiva dvs. CloudFormation nu este numită transactionaldl-postgresql, furnizați numele jobului dvs. pe linia 2 din script:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

În această soluție, atunci când o procesare a unui tabel provoacă o excepție, jobul AWS Glue nu va eșua în conformitate cu această logică. În schimb, tabelul va fi adăugat într-o matrice care este tipărită după finalizarea lucrării. Într-un astfel de scenariu, jobul va fi marcat ca eșuat după ce încearcă să proceseze restul tabelelor detectate pe sursa de date brute. În acest fel, tabelele fără erori nu trebuie să aștepte până când utilizatorul identifică și rezolvă problema de pe tabelele aflate în conflict. Utilizatorul poate detecta rapid rulările de job care au avut probleme utilizând starea de rulare a jobului AWS Glue și poate identifica tabelele specifice care cauzează problema folosind jurnalele CloudWatch pentru executarea jobului.

  1. Scriptul de job implementează această caracteristică cu următorul cod Python:
# Performed for every table try: # Table processing logic except Exception as e: logger.info(f'There is an issue with table: {tableName}') logger.info(f'The exception is : {e}') errored_table_list.append(tableName) continue job.commit()
if (len(errored_table_list)): logger.info('Total number of errored tables are ',len(errored_table_list)) logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ') raise Exception(f'***** Some tables failed to process.')

Următoarea captură de ecran arată cum caută jurnalele CloudWatch tabele care cauzează erori la procesare.

Monitorizarea lucrărilor AWS Glue cu jurnalele

Aliniat cu Lentila de analiză a datelor cadru bine arhitecturat AWS practicilor, puteți adapta mecanisme de control mai sofisticate pentru a identifica și a notifica părțile interesate atunci când apar erori pe conductele de date. De exemplu, puteți utiliza un Amazon DynamoDB tabel de control pentru a stoca toate tabelele și lucrările rulează cu erori sau utilizând Serviciul de notificare simplă Amazon (Amazon SNS) la trimite alerte operatorilor când sunt îndeplinite anumite criterii.

Programați încărcarea incrementală a datelor în lot

Stiva CloudFormation implementează un Amazon EventBridge regulă (dezactivată în mod implicit) care poate declanșa operația AWS Glue să ruleze conform unui program. Pentru a vă oferi propriul program și pentru a activa regula, parcurgeți următorii pași:

  1. Pe consola EventBridge, alegeți Reguli în panoul de navigare.
  2. Căutați regula cu prefixul numelui stivei dvs. CloudFormation urmat de JobTrigger (de exemplu, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Alegeți regula.
  4. În Program de evenimente, alege Editati.

Programul implicit este configurat să se declanșeze în fiecare oră.

  1. Furnizați programul pe care doriți să rulați lucrarea.
  2. În plus, puteți utiliza un Expresia cron EventBridge prin selectarea Un program fin.
    Amazon EventBridge program ETL job
  3. Când terminați de configurat expresia cron, alegeți Pagina Următoare → de trei ori și în cele din urmă alege Actualizați regula pentru a salva modificările.

Regula este creată implicit dezactivată pentru a vă permite să rulați mai întâi încărcarea inițială a datelor.

  1. Activați regula alegând Permite.

Aveți posibilitatea să utilizați Monitorizarea pentru a vedea invocări de reguli sau direct pe AWS Glue Job Run Detalii.

Concluzie

După implementarea acestei soluții, ați automatizat asimilarea tabelelor pe o singură sursă de date relaționale. Organizațiile care folosesc un lac de date ca platformă centrală de date trebuie de obicei să gestioneze mai multe, uneori chiar zeci de surse de date. De asemenea, din ce în ce mai multe cazuri de utilizare impun organizațiilor să implementeze capabilități tranzacționale în lacul de date. Puteți folosi această soluție pentru a accelera adoptarea unor astfel de capabilități în toate sursele de date relaționale pentru a permite noi cazuri de utilizare în afaceri, automatizând procesul de implementare pentru a obține mai multă valoare din datele dumneavoastră.


Despre Autori

Luis Gerardo BaezaLuis Gerardo Baeza este arhitect Big Data în Laboratorul de date Amazon Web Services (AWS). Are 12 ani de experiență în a ajuta organizații din sectoarele de sănătate, financiar și educație să adopte programe de arhitectură pentru întreprinderi, cloud computing și capabilități de analiză a datelor. Luis ajută în prezent organizațiile din America Latină să accelereze inițiativele strategice de date.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu este arhitect de date în laboratorul de date Amazon Web Services (AWS). Are 10 ani de experiență în implementarea proceselor de încărcare, transformare și vizualizare a datelor. În prezent, SaiKiran ajută organizațiile din America de Nord să adopte arhitecturi moderne de date, cum ar fi data lake și data mesh. Are experiență în sectorul retail, al companiilor aeriene și al finanțelor.

Narendra MerlaNarendra Merla este arhitect de date în laboratorul de date Amazon Web Services (AWS). Are 12 ani de experiență în proiectarea și producția de conducte de date atât în ​​timp real, cât și orientate pe loturi și în construirea lacurilor de date atât în ​​medii cloud, cât și în medii locale. În prezent, Narendra ajută organizațiile din America de Nord să construiască și să proiecteze arhitecturi de date robuste și are experiență în sectoarele telecomunicațiilor și finanțelor.

Timestamp-ul:

Mai mult de la AWS Big Data