Automatiser replikering af relationelle kilder til en transaktionsdatasø med Apache Iceberg og AWS Glue

Automatiser replikering af relationelle kilder til en transaktionsdatasø med Apache Iceberg og AWS Glue

Kildeknude: 1958466

Organisationer har valgt at bygge datasøer ovenpå Amazon Simple Storage Service (Amazon S3) i mange år. En datasø er det mest populære valg for organisationer til at gemme alle deres organisationsdata genereret af forskellige teams, på tværs af forretningsdomæner, fra alle forskellige formater og endda gennem historien. Ifølge et studie, ser den gennemsnitlige virksomhed mængden af ​​deres data vokse med en hastighed, der overstiger 50 % om året, og administrerer normalt i gennemsnit 33 unikke datakilder til analyse.

Teams forsøger ofte at replikere tusindvis af job fra relationelle databaser med det samme udtræk, transformation og indlæsning (ETL) mønster. Der er en stor indsats i at opretholde jobtilstandene og planlægge disse individuelle job. Denne tilgang hjælper teamene med at tilføje tabeller med få ændringer og opretholder også jobstatus med minimal indsats. Dette kan føre til en enorm forbedring af udviklingstidslinjen og let sporing af job.

I dette indlæg viser vi dig, hvordan du nemt kan replikere alle dine relationelle datalagre til en transaktionsdatasø på en automatiseret måde med et enkelt ETL-job ved hjælp af Apache Iceberg og AWS Lim.

Løsningsarkitektur

Datasøer er normalt organiseret ved at bruge separate S3-buckets til tre datalag: rålaget, der indeholder data i sin oprindelige form, faselaget, der indeholder mellemliggende behandlede data, der er optimeret til forbrug, og analyselaget, der indeholder aggregerede data til specifikke anvendelsestilfælde. I rålaget er tabeller normalt organiseret baseret på deres datakilder, hvorimod tabeller i faselaget er organiseret baseret på de forretningsdomæner, de tilhører.

Dette indlæg giver en AWS CloudFormation skabelon, der implementerer et AWS Glue-job, der læser en Amazon S3-sti for én datakilde i datasøens rålag og indtager dataene i Apache Iceberg-tabeller på scenelaget ved hjælp af AWS Glue-understøttelse til data lake frameworks. Jobbet forventer, at tabeller i rålaget er struktureret på den måde AWS Database Migration Service (AWS DMS) indtager dem: skema, derefter tabel og derefter datafiler.

Denne løsning bruger AWS Systems Manager Parameter Store til tabelkonfiguration. Du bør ændre denne parameter for at specificere de tabeller, du vil behandle og hvordan, herunder oplysninger såsom primærnøgle, partitioner og det tilknyttede forretningsdomæne. Jobbet bruger disse oplysninger til automatisk at oprette en database (hvis den ikke allerede eksisterer) for hvert forretningsdomæne, oprette Iceberg-tabellerne og udføre dataindlæsningen.

Endelig kan vi bruge Amazonas Athena for at forespørge dataene i Iceberg-tabellerne.

Følgende diagram illustrerer denne arkitektur.

Løsningsarkitektur

Denne implementering har følgende overvejelser:

  • Alle tabeller fra datakilden skal have en primær nøgle for at blive replikeret ved hjælp af denne løsning. Den primære nøgle kan være en enkelt kolonne eller en sammensat nøgle med mere end én kolonne.
  • Hvis datasøen indeholder tabeller, der ikke behøver upserts eller ikke har en primær nøgle, kan du ekskludere dem fra parameterkonfigurationen og implementere traditionelle ETL-processer for at indlæse dem i datasøen. Det er uden for rammerne af dette indlæg.
  • Hvis der er yderligere datakilder, der skal indlæses, kan du implementere flere CloudFormation-stakke, en til at håndtere hver datakilde.
  • AWS Glue-jobbet er designet til at behandle data i to faser: den indledende belastning, der kører efter AWS DMS afslutter den fulde load-opgave, og den trinvise belastning, der kører efter en tidsplan, der anvender ændringsdatafangst-filer (CDC) optaget af AWS DMS. Inkrementel behandling udføres ved hjælp af en AWS Lim job bogmærke.

Der er ni trin til at fuldføre denne øvelse:

  1. Konfigurer et kildeslutpunkt for AWS DMS.
  2. Implementer løsningen ved hjælp af AWS CloudFormation.
  3. Gennemgå AWS DMS-replikeringsopgaven.
  4. Tilføj eventuelt tilladelser til kryptering og dekryptering eller AWS søformation.
  5. Gennemgå tabelkonfigurationen i Parameter Store.
  6. Udfør indledende dataindlæsning.
  7. Udfør trinvis dataindlæsning.
  8. Overvåg bordindtagelse.
  9. Planlæg trinvis batchdataindlæsning.

Forudsætninger

Før du starter denne tutorial, bør du allerede være bekendt med Iceberg. Hvis du ikke er det, kan du komme i gang ved at replikere en enkelt tabel ved at følge instruktionerne i Implementer en CDC-baseret UPSERT i en datasø ved hjælp af Apache Iceberg og AWS Glue. Derudover skal du konfigurere følgende:

Konfigurer et kildeslutpunkt for AWS DMS

Før vi opretter vores AWS DMS-opgave, skal vi konfigurere et kildeslutpunkt for at oprette forbindelse til kildedatabasen:

  1. På AWS DMS-konsollen skal du vælge Endpoints i navigationsruden.
  2. Vælg Opret slutpunkt.
  3. Hvis din database kører på Amazon RDS, skal du vælge Vælg RDS DB-instans, og vælg derefter forekomsten fra listen. Ellers skal du vælge kildemotoren og angive forbindelsesoplysningerne enten igennem AWS Secrets Manager eller manuelt.
  4. Til Endpoint identifier, indtast et navn til slutpunktet; for eksempel source-postgresql.
  5. Vælg Opret slutpunkt.

Implementer løsningen ved hjælp af AWS CloudFormation

Opret en CloudFormation-stak ved hjælp af den medfølgende skabelon. Udfør følgende trin:

  1. Vælg Start stak:
  2. Vælg Næste.
  3. Angiv et staknavn, som f.eks transactionaldl-postgresql.
  4. Indtast de nødvendige parametre:
    1. DMSS3EndpointIAMRoleARN – IAM-rollen ARN for AWS DMS til at skrive data ind i Amazon S3.
    2. ReplicationInstanceArn – AWS DMS-replikeringsinstansen ARN.
    3. S3BucketStage – Navnet på den eksisterende spand, der bruges til datasøens scenelag.
    4. S3BucketLim – Navnet på den eksisterende S3-spand til opbevaring af AWS Glue-scripts.
    5. S3BucketRaw – Navnet på den eksisterende spand, der bruges til datasøens rålag.
    6. SourceEndpointArn – AWS DMS-slutpunktet ARN, som du oprettede tidligere.
    7. Kildenavn – Den vilkårlige identifikator for den datakilde, der skal replikeres (f.eks. postgres). Dette bruges til at definere S3-stien for datasøen (rålaget), hvor data vil blive lagret.
  5. Rediger ikke følgende parametre:
    1. SourceS3BucketBlog – Bøttens navn, hvor det medfølgende AWS Glue-script er gemt.
    2. SourceS3BucketPrefix – Bøttepræfiksnavnet, hvor det medfølgende AWS Glue-script er gemt.
  6. Vælg Næste to gange.
  7. Type Jeg anerkender, at AWS CloudFormation kan skabe IAM-ressourcer med brugerdefinerede navne.
  8. Vælg Opret stak.

Efter cirka 5 minutter er CloudFormation-stakken implementeret.

Gennemgå AWS DMS-replikeringsopgaven

AWS CloudFormation-implementeringen skabte et AWS DMS-målslutpunkt for dig. På grund af to specifikke slutpunktsindstillinger vil dataene blive indtaget, efterhånden som vi har brug for dem på Amazon S3.

  1. På AWS DMS-konsollen skal du vælge Endpoints i navigationsruden.
  2. Søg efter og vælg det slutpunkt, der begynder med dmsIcebergs3endpoint.
  3. Gennemgå slutpunktindstillingerne:
    1. DataFormat er angivet som parquet.
    2. TimestampColumnName vil tilføje kolonnen last_update_time med datoen for oprettelse af posterne på Amazon S3.

AWS DMS-endepunktsindstillinger

Implementeringen opretter også en AWS DMS-replikeringsopgave, der begynder med dmsicebergtask.

  1. Vælg Replikeringsopgaver i navigationsruden og søg efter opgaven.

Du vil se, at Opgavetype er markeret som Fuld belastning, løbende replikering. AWS DMS vil udføre en indledende fuld belastning af eksisterende data og derefter oprette trinvise filer med ændringer udført i kildedatabasen.

Kortlægningsregler fanen, er der to typer regler:

  • En udvælgelsesregel med navnet på kildeskemaet og tabeller, der indlæses fra kildedatabasen. Som standard bruger den prøvedatabasen, der er angivet i forudsætningerne, dms_sample, og alle tabeller med søgeordet %.
  • To transformationsregler, der inkluderer i målfilerne på Amazon S3 skemanavnet og tabelnavnet som kolonner. Dette bruges af vores AWS Glue-job til at vide, hvilke tabeller filerne i datasøen svarer til.

For at lære mere om, hvordan du tilpasser dette til dine egne datakilder, se Udvælgelsesregler og handlinger.

AWS kortlægningsregler

Lad os ændre nogle konfigurationer for at afslutte vores opgaveforberedelse.

  1. handlinger menu, vælg Ændre.
  2. I Opgaveindstillinger afsnit under Stop opgaven efter fuld belastning, vælg Stop efter at have anvendt cachelagrede ændringer.

På denne måde kan vi kontrollere den indledende belastning og den trinvise filgenerering som to forskellige trin. Vi bruger denne to-trins tilgang til at køre AWS Glue-jobbet én gang for hvert trin.

  1. Under Opgavelogs, vælg Slå CloudWatch-logfiler til.
  2. Vælg Gem.
  3. Vent ca. 1 minut, indtil status for databasemigreringsopgaven vises som Ready.

Tilføj tilladelser til kryptering og dekryptering eller Lake Formation

Du kan eventuelt tilføje tilladelser til kryptering og dekryptering eller Lake Formation.

Tilføj kryptering og dekrypteringstilladelser

Hvis dine S3-bøtter, der bruges til rå- og scenelagene, er krypteret vha AWS Key Management Service (AWS KMS) kundeadministrerede nøgler, skal du tilføje tilladelser for at tillade AWS Glue-jobbet at få adgang til dataene:

Tilføj Lake Formation-tilladelser

Hvis du administrerer tilladelser ved hjælp af Lake Formation, skal du tillade dit AWS Glue-job at oprette dit domænes databaser og tabeller gennem IAM-rollen GlueJobRole.

  1. Giv tilladelser til at oprette databaser (for instruktioner, se Oprettelse af en database).
  2. Giv SUPER-tilladelser til default databasen.
  3. Giv dataplaceringstilladelser.
  4. Hvis du opretter databaser manuelt, skal du give alle databaser tilladelse til at oprette tabeller. Henvise til Tildeling af tabeltilladelser ved hjælp af Lake Formation-konsollen og den navngivne ressourcemetode or Tildeling af datakatalogtilladelser ved hjælp af LF-TBAC-metoden i henhold til din brugssituation.

Når du har gennemført det senere trin med at udføre den indledende dataindlæsning, skal du sørge for også at tilføje tilladelser for forbrugere til at forespørge i tabellerne. Jobrollen bliver ejer af alle de oprettede tabeller, og datasø-administratoren kan derefter udføre bevillinger til yderligere brugere.

Gennemgå tabelkonfigurationen i Parameter Store

AWS Glue-jobbet, der udfører dataindtagelsen i Iceberg-tabeller, bruger tabelspecifikationen i Parameter Store. Udfør følgende trin for at gennemgå parameterlageret, der blev konfigureret automatisk for dig. Hvis det er nødvendigt, ændres efter dine egne behov.

  1. På Parameter Store-konsollen skal du vælge Mine parametre i navigationsruden.

CloudFormation-stakken skabte to parametre:

  • iceberg-config til jobkonfigurationer
  • iceberg-tables til tabelkonfiguration
  1. Vælg parameteren isbjerg-borde.

JSON-strukturen indeholder information, som AWS Glue bruger til at læse data og skrive Iceberg-tabellerne på måldomænet:

  • Et objekt pr. bord – Navnet på objektet oprettes ved hjælp af skemanavnet, et punktum og tabelnavnet; for eksempel, schema.table.
  • primærnøgle – Dette bør angives for hver kildetabel. Du kan angive en enkelt kolonne eller en kommasepareret liste over kolonner (uden mellemrum).
  • partitionCols – Dette opdeler valgfrit kolonner for måltabeller. Hvis du ikke ønsker at oprette partitionerede tabeller, skal du angive en tom streng. Ellers skal du angive en enkelt kolonne eller en kommasepareret liste over kolonner, der skal bruges (uden mellemrum).
  1. Hvis du vil bruge din egen datakilde, skal du bruge følgende JSON-kode og erstatte teksten i CAPS fra den medfølgende skabelon. Hvis du bruger den medfølgende eksempeldatakilde, skal du beholde standardindstillingerne:
{ "SCHEMA_NAME.TABLE_NAME_1": { "primaryKey": "ONLY_PRIMARY_KEY", "domain": "TARGET_DOMAIN", "partitionCols": "" }, "SCHEMA_NAME.TABLE_NAME_2": { "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY", "domain": "TARGET_DOMAIN", "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO" }
}
  1. Vælg Gem ændringer.

Udfør indledende dataindlæsning

Nu hvor den nødvendige konfiguration er færdig, indtager vi de indledende data. Dette trin omfatter tre dele: indlæsning af data fra den relationelle kildedatabase i datasøens rå lag, oprettelse af Iceberg-tabellerne på datasøens faselag og verifikation af resultater ved hjælp af Athena.

Indtag data i datasøens rå lag

For at indlæse data fra den relationelle datakilde (PostgreSQL, hvis du bruger den leverede prøve) til vores transaktionsdatasø ved hjælp af Iceberg, skal du udføre følgende trin:

  1. På AWS DMS-konsollen skal du vælge Databasemigreringsopgaver i navigationsruden.
  2. Vælg den replikeringsopgave, du har oprettet, og på handlinger menu, vælg Genstart/Genoptag.
  3. Vent ca. 5 minutter på, at replikeringsopgaven er fuldført. Du kan overvåge tabellerne indtaget på Statistik fanen i replikeringsopgaven.

AWS DMS fuld load statistik

Efter nogle minutter afsluttes opgaven med beskeden Fuld belastning færdig.

  1. På Amazon S3-konsollen skal du vælge den bøtte, du definerede som rålaget.

Under S3-præfikset defineret på AWS DMS (f.eks. postgres), bør du se et hierarki af mapper med følgende struktur:

  • Planlæg
    • Tabelnavn
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

AWS DMS fuld load objekter oprettet på S3

Hvis din S3-spand er tom, skal du gennemgå den Fejlfinding af migreringsopgaver i AWS Database Migration Service før du kører AWS Glue-jobbet.

Opret og indtag data i Iceberg-tabeller

Før vi kører jobbet, lad os navigere i scriptet til AWS Glue-jobbet, der er leveret som en del af CloudFormation-stakken, for at forstå dets adfærd.

  1. På AWS Glue Studio-konsollen skal du vælge Karriere i navigationsruden.
  2. Søg efter det job, der starter med IcebergJob- og et suffiks af dit CloudFormation-staknavn (f.eks. IcebergJob-transactionaldl-postgresql).
  3. Vælg jobbet.

AWS Glue ETL jobanmeldelse

Jobscriptet får den konfiguration, det har brug for, fra Parameter Store. Funktionen getConfigFromSSM() returnerer jobrelaterede konfigurationer såsom kilde- og målinddelinger, hvorfra dataene skal læses og skrives. Variablen ssmparam_table_values indeholde tabelrelaterede oplysninger såsom datadomæne, tabelnavn, partitionskolonner og primærnøgle for de tabeller, der skal indlæses. Se følgende Python-kode:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables" # Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

Scriptet bruger et vilkårligt katalognavn for Iceberg, der er defineret som mit_katalog. Dette er implementeret på AWS Glue Data Catalog ved hjælp af Spark-konfigurationer, så en SQL-handling, der peger på my_catalog, vil blive anvendt på Data Catalog. Se følgende kode:

catalog_name = 'my_catalog'
errored_table_list = [] # Iceberg configuration
spark = SparkSession.builder .config('spark.sql.warehouse.dir', warehouse_path) .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') .getOrCreate()

Scriptet itererer over de tabeller, der er defineret i Parameter Store og udfører logikken til at detektere, om tabellen eksisterer, og om de indkommende data er en indledende indlæsning eller en upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values: # Get table data isTableExists = False schemaName, tableName = key.split('.') logger.info(f'Processing table : {tableName}')

initialLoadRecordsSparkSQL() funktionen indlæser startdata, når der ikke er nogen operationskolonne i S3-filerne. AWS DMS tilføjer kun denne kolonne til Parket-datafiler produceret af den kontinuerlige replikering (CDC). Dataindlæsningen udføres ved hjælp af kommandoen INSERT INTO med SparkSQL. Se følgende kode:

sqltemp = Template(""" INSERT INTO $catalog_name.$dbName.$tableName ($insertTableColumnList) SELECT $insertTableColumnList FROM insertTable $partitionStrSQL """)
SQLQUERY = sqltemp.substitute( catalog_name = catalog_name, dbName = dbName, tableName = tableName, insertTableColumnList = insertTableColumnList[ : -1], partitionStrSQL = partitionStrSQL) logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Nu kører vi AWS Glue-jobbet for at indlæse de indledende data i Iceberg-tabellerne. CloudFormation-stakken tilføjer --datalake-formats parameter ved at tilføje de nødvendige Iceberg-biblioteker til jobbet.

  1. Vælg Kør job.
  2. Vælg Job kører at overvåge status. Vent til status er Kørsel lykkedes.

Bekræft de indlæste data

For at bekræfte, at jobbet behandlede dataene som forventet, skal du udføre følgende trin:

  1. Vælg på Athena-konsollen Query Editor i navigationsruden.
  2. Bekræft AwsDataCatalog er valgt som datakilde.
  3. Under Database, vælg det datadomæne, du vil udforske, baseret på den konfiguration, du definerede i parameterlageret. Hvis du bruger den medfølgende eksempeldatabase, skal du bruge sports.

Under Tabeller og udsigter, kan vi se listen over tabeller, der blev oprettet af AWS Glue-jobbet.

  1. Vælg indstillingsmenuen (tre prikker) ud for det første tabelnavn, og vælg derefter Forhåndsvis data.

Du kan se dataene indlæst i Iceberg-tabeller. Amazon Athena gennemgang indledende data indlæst

Udfør trinvis dataindlæsning

Nu begynder vi at fange ændringer fra vores relationelle database og anvende dem på transaktionsdatasøen. Dette trin er også opdelt i tre dele: at fange ændringerne, anvende dem på Iceberg-tabellerne og verificere resultaterne.

Hent ændringer fra relationsdatabasen

På grund af den konfiguration, vi specificerede, stoppede replikeringsopgaven efter at have kørt hele indlæsningsfasen. Nu genstarter vi opgaven for at tilføje trinvise filer med ændringer i datasøens rå lag.

  1. På AWS DMS-konsollen skal du vælge den opgave, vi oprettede og kørte før.
  2. handlinger menu, vælg CV.
  3. Vælg Start opgave for at begynde at fange ændringer.
  4. For at udløse oprettelse af nye filer på datasøen skal du udføre indsættelser, opdateringer eller sletninger på tabellerne i din kildedatabase ved hjælp af dit foretrukne databaseadministrationsværktøj. Hvis du bruger den medfølgende eksempeldatabase, kan du køre følgende SQL-kommandoer:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31'; update dms_sample.mlb_data set bats = 'R'
where mlb_id=506560 and bats='L'; update dms_sample.sporting_event set start_date = current_date where id=11 and sold_out=0;
  1. På siden med AWS DMS-opgaveoplysninger skal du vælge Tabel statistik fanen for at se de registrerede ændringer.
    AWS DMS CDC statistik
  2. Åbn det rå lag af datasøen for at finde en ny fil, der indeholder de trinvise ændringer i hver tabels præfiks, for eksempel under sporting_event præfiks.

Rekorden med ændringer for sporting_event tabel ser ud som følgende skærmbillede.

AWS DMS-objekter migrerede til S3 med CDC

Læg mærke til Op kolonne i begyndelsen identificeret med en opdatering (U). Den anden dato/tidsværdi er også kontrolkolonnen tilføjet af AWS DMS med det tidspunkt, hvor ændringen blev registreret.

CDC-filskema på Amazon S3

Anvend ændringer på Iceberg-bordene ved hjælp af AWS-lim

Nu kører vi AWS Glue-jobbet igen, og det vil automatisk kun behandle de nye inkrementelle filer, da jobbogmærket er aktiveret. Lad os gennemgå, hvordan det virker.

dedupCDCRecords() funktion udfører deduplikering af data, fordi flere ændringer af et enkelt registrerings-id kunne fanges i den samme datafil på Amazon S3. Deduplikering udføres baseret på last_update_time kolonne tilføjet af AWS DMS, der angiver tidsstemplet for, hvornår ændringen blev registreret. Se følgende Python-kode:

def dedupCDCRecords(inputDf, keylist): IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize) inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF)) NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'") UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')") finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf) return finalInputDF

På linje 99 upsertRecordsSparkSQL() funktion udfører upsert på samme måde som den oprindelige load, men denne gang med en SQL MERGE-kommando.

Gennemgå de anvendte ændringer

Åbn Athena-konsollen, og kør en forespørgsel, der vælger de ændrede poster i kildedatabasen. Hvis du bruger den medfølgende eksempeldatabase, skal du bruge en af ​​følgende SQL-forespørgsler:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Amazon Athena anmeldelse cdc data indlæst

Overvåg bordindtagelse

AWS Glue job scriptet er kodet med enkel Python undtagelseshåndtering at fange fejl under behandling af en specifik tabel. Jobbogmærket gemmes, efter at hver tabel er færdigbehandlet, for at undgå genbearbejdning af tabeller, hvis jobkørsel prøves igen for tabellerne med fejl.

AWS kommandolinjegrænseflade (AWS CLI) giver en get-job-bookmark kommando til AWS Glue, der giver indsigt i status for bogmærket for hver behandlet tabel.

  1. På AWS Glue Studio-konsollen skal du vælge ETL-jobbet.
  2. Vælg den Job kører fanen og kopier jobkørsels-id'et.
  3. Kør følgende kommando på en terminal, der er godkendt til AWS CLI, og erstatter <GLUE_JOB_RUN_ID> på linje 1 med den værdi, du kopierede. Hvis din CloudFormation-stak ikke er navngivet transactionaldl-postgresql, angiv navnet på dit job på linje 2 i scriptet:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

I denne løsning, når en tabelbehandling forårsager en undtagelse, vil AWS Glue-jobbet ikke fejle ifølge denne logik. I stedet vil tabellen blive tilføjet til en matrix, der udskrives, når jobbet er fuldført. I et sådant scenarie vil jobbet blive markeret som mislykket, efter at det forsøger at behandle resten af ​​de tabeller, der er fundet på den rå datakilde. På denne måde behøver tabeller uden fejl ikke at vente, indtil brugeren identificerer og løser problemet på de modstridende tabeller. Brugeren kan hurtigt opdage jobkørsler, der havde problemer ved hjælp af AWS Glue-jobkørselsstatus, og identificere, hvilke specifikke tabeller der forårsager problemet ved hjælp af CloudWatch-logfilerne for jobkørslen.

  1. Jobscriptet implementerer denne funktion med følgende Python-kode:
# Performed for every table try: # Table processing logic except Exception as e: logger.info(f'There is an issue with table: {tableName}') logger.info(f'The exception is : {e}') errored_table_list.append(tableName) continue job.commit()
if (len(errored_table_list)): logger.info('Total number of errored tables are ',len(errored_table_list)) logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ') raise Exception(f'***** Some tables failed to process.')

Følgende skærmbillede viser, hvordan CloudWatch-logfilerne ser ud efter tabeller, der forårsager fejl under behandlingen.

AWS Limjobovervågning med logs

Afstemt med AWS Well-Architected Framework Data Analytics Lens praksis, kan du tilpasse mere sofistikerede kontrolmekanismer for at identificere og underrette interessenter, når der opstår fejl på datapipelines. Du kan f.eks. bruge en Amazon DynamoDB kontroltabel til at gemme alle tabeller og jobkørsler med fejl, eller vha Amazon Simple Notification Service (Amazon SNS) til sende alarmer til operatører når visse kriterier er opfyldt.

Planlæg trinvis batchdataindlæsning

CloudFormation-stakken implementerer en Amazon Eventbridge regel (deaktiveret som standard), der kan udløse AWS Glue-jobbet til at køre efter en tidsplan. For at angive din egen tidsplan og aktivere reglen skal du udføre følgende trin:

  1. Vælg på EventBridge-konsollen Regler i navigationsruden.
  2. Søg efter reglen foran med navnet på din CloudFormation-stak efterfulgt af JobTrigger (for eksempel, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Vælg reglen.
  4. Under Arrangementsplan, vælg Redigere.

Standardplanen er konfigureret til at udløse hver time.

  1. Angiv den tidsplan, du vil udføre jobbet.
  2. Derudover kan du bruge en EventBridge cron udtryk ved at vælge En finmasket tidsplan.
    Amazon EventBridge tidsplan ETL job
  3. Når du er færdig med at opsætte cron-udtrykket, skal du vælge Næste tre gange, og til sidst vælge Opdater regel for at gemme ændringer.

Reglen oprettes deaktiveret som standard for at give dig mulighed for at køre den indledende dataindlæsning først.

  1. Aktiver reglen ved at vælge Aktiver.

Du kan bruge Overvågning fanen for at se regelpåkaldelser eller direkte på AWS-limen Job Run detaljer.

Konklusion

Efter at have implementeret denne løsning, har du automatiseret indlæsningen af ​​dine tabeller på en enkelt relationel datakilde. Organisationer, der bruger en datasø som deres centrale dataplatform, skal normalt håndtere flere, nogle gange endda snesevis af datakilder. Også flere og flere use cases kræver, at organisationer implementerer transaktionsevner til datasøen. Du kan bruge denne løsning til at fremskynde overtagelsen af ​​sådanne muligheder på tværs af alle dine relationelle datakilder for at muliggøre nye business use cases, automatisere implementeringsprocessen for at få mere værdi ud af dine data.


Om forfatterne

Luis Gerardo BaezaLuis Gerardo Baeza er en Big Data Architect i Amazon Web Services (AWS) Data Lab. Han har 12 års erfaring med at hjælpe organisationer inden for sundheds-, finans- og uddannelsessektoren med at indføre virksomhedsarkitekturprogrammer, cloud computing og dataanalysefunktioner. Luis hjælper i øjeblikket organisationer på tværs af Latinamerika med at accelerere strategiske datainitiativer.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu er en dataarkitekt i Amazon Web Services (AWS) Data Lab. Han har 10 års erfaring med implementering af dataindlæsning, transformation og visualiseringsprocesser. SaiKiran hjælper i øjeblikket organisationer i Nordamerika med at indføre moderne dataarkitekturer såsom datasøer og datamesh. Han har erfaring i detail-, fly- og finanssektoren.

Narendra MerlaNarendra Merla er en dataarkitekt i Amazon Web Services (AWS) Data Lab. Han har 12 års erfaring med at designe og produktionsalisere både realtids- og batch-orienterede datapipelines og bygge datasøer i både cloud- og lokale miljøer. Narendra hjælper i øjeblikket organisationer i Nordamerika med at bygge og designe robuste dataarkitekturer og har erfaring inden for telekom- og finanssektoren.

Tidsstempel:

Mere fra AWS Big Data