Kombiner AWS Glue og Amazon MWAA for å bygge avanserte VPC-utvalg og failover-strategier | Amazon Web Services

Kombiner AWS Glue og Amazon MWAA for å bygge avanserte VPC-utvalg og failover-strategier | Amazon Web Services

Kilde node: 2490910

AWS Lim er en serverløs dataintegrasjonstjeneste som gjør det enkelt å oppdage, forberede, flytte og integrere data fra flere kilder for analyse, maskinlæring (ML) og applikasjonsutvikling.

AWS Glue-kunder må ofte oppfylle strenge sikkerhetskrav, som noen ganger innebærer å låse ned nettverkstilkoblingen som er tillatt for jobben, eller løpe inne i en spesifikk VPC for å få tilgang til en annen tjeneste. For å kjøre inne i VPC-en, må jobbene tildeles et enkelt undernett, men det mest passende undernettverket kan endres over tid (for eksempel basert på bruk og tilgjengelighet), så det kan hende du foretrekker å ta den avgjørelsen under kjøring, basert på din egen strategi.

Amazon administrerte arbeidsflyter for Apache Airflow (Amazon MWAA) er en AWS-tjeneste for å kjøre administrerte Airflow-arbeidsflyter, som gjør det mulig å skrive tilpasset logikk for å koordinere hvordan oppgaver som AWS Glue-jobber kjører.

I dette innlegget viser vi hvordan du kjører en AWS Glue-jobb som en del av en Airflow-arbeidsflyt, med dynamisk konfigurerbart valg av VPC-undernettet som er tilordnet jobben under kjøring.

Løsningsoversikt

For å kjøre inne i en VPC, må en AWS Glue-jobb tildeles minst en tilkobling som inkluderer nettverkskonfigurasjon. Enhver tilkobling gjør det mulig å spesifisere en VPC, subnett og sikkerhetsgruppe, men for enkelhets skyld bruker dette innlegget tilkoblinger av typen: NETTVERK, som bare definerer nettverkskonfigurasjonen og ikke involverer eksterne systemer.

Hvis jobben har et fast undernett tilordnet av en enkelt tilkobling, i tilfelle tjenestebrudd på Tilgjengelighetssoner eller hvis subnettet ikke er tilgjengelig av andre årsaker, kan ikke jobben kjøres. Videre krever hver node (driver eller arbeider) i en AWS Glue-jobb en IP-adresse tildelt fra subnettet. Når du kjører mange store jobber samtidig, kan dette føre til mangel på IP-adresser og at jobben kjører med færre noder enn beregnet eller ikke kjører i det hele tatt.

AWS Glue extract, transform and load (ETL)-jobber lar flere tilkoblinger spesifiseres med flere nettverkskonfigurasjoner. Jobben vil imidlertid alltid prøve å bruke tilkoblingenes nettverkskonfigurasjon i den oppførte rekkefølgen og velge den første som passerer helsekontroller og har minst to IP-adresser for å få jobben i gang, noe som kanskje ikke er det optimale alternativet.

Med denne løsningen kan du forbedre og tilpasse atferden ved å omorganisere tilkoblingene dynamisk og definere utvalgsprioriteten. Hvis et nytt forsøk er nødvendig, omprioriteres forbindelsene på nytt basert på strategien, fordi forholdene kan ha endret seg siden siste kjøring.

Som et resultat bidrar det til å forhindre at jobben ikke kjører eller kjører under kapasitet på grunn av mangel på IP-adresse i undernettverket eller til og med et strømbrudd, samtidig som det oppfyller kravene til nettverkssikkerhet og tilkobling.

Følgende diagram illustrerer løsningsarkitekturen.

Forutsetninger

For å følge trinnene i innlegget trenger du en bruker som kan logge på AWS-administrasjonskonsoll og har tillatelse til å få tilgang til Amazon MWAA, Amazon Virtual Private Cloud (Amazon VPC), og AWS Glue. AWS-regionen der du velger å distribuere løsningen trenger kapasitet til å lage en VPC og to elastiske IP-adresser. Standard regional kvote for begge typer ressurser er fem, så du må kanskje be om en økning via konsollen.

Du trenger også en AWS identitets- og tilgangsadministrasjon (IAM) rolle egnet for å kjøre AWS Glue-jobber hvis du ikke allerede har en. For instruksjoner, se Opprett en IAM-rolle for AWS Glue.

Distribuer et Airflow-miljø og VPC

Først skal du distribuere et nytt Airflow-miljø, inkludert opprettelsen av en ny VPC med to offentlige undernett og to private. Dette er fordi Amazon MWAA krever Availability Zone-feiltoleranse, så den må kjøres på to subnett på to forskjellige Availability Zones i regionen. De offentlige undernettene brukes slik at NAT-gatewayen kan gi internettilgang for de private undernettene.

Fullfør følgende trinn:

  1. Lag en AWS skyformasjon malen på datamaskinen din ved å kopiere malen fra følgende Hurtigstartveiledning inn i en lokal tekstfil.
  2. Velg på AWS CloudFormation-konsollen Stabler i navigasjonsruten.
  3. Velg Lag stabel med alternativet Med nye ressurser (standard).
  4. Velg Last opp en malfil og velg den lokale malfilen.
  5. Velg neste.
  6. Fullfør oppsettstrinnene, skriv inn et navn for miljøet, og la resten av parameterne være standard.
  7. På det siste trinnet, anerkjenne at ressurser vil bli opprettet og velg Send.

Opprettelsen kan ta 20–30 minutter, til statusen til stabelen endres til CREATE_COMPLETE.

Ressursen som vil ta mest tid er luftstrømmiljøet. Mens den opprettes, kan du fortsette med følgende trinn, til du blir bedt om å åpne Airflow-grensesnittet.

  1. På stabelens Ressurser fanen, merk ID-ene for VPC-en og to private undernett (PrivateSubnet1 og PrivateSubnet2), for å bruke i neste trinn.

Lag AWS-limforbindelser

CloudFormation-malen distribuerer to private undernett. I dette trinnet oppretter du en AWS Glue-forbindelse til hver enkelt slik at AWS Glue-jobber kan kjøres i dem. Amazon MWAA la nylig til kapasiteten til å kjøre Airflow-klyngen på delte VPC-er, noe som reduserer kostnadene og forenkler nettverksadministrasjonen. For mer informasjon, se Vi introduserer delt VPC-støtte på Amazon MWAA.

Fullfør følgende trinn for å opprette forbindelsene:

  1. Velg på AWS Lim-konsollen Datatilkoblinger i navigasjonsruten.
  2. Velg Opprett forbindelse.
  3. Velg Network som datakilde.
  4. Velg VPC og privat undernett (PrivateSubnet1) opprettet av CloudFormation-stakken.
  5. Bruk standard sikkerhetsgruppe.
  6. Velg neste.
  7. For tilkoblingsnavnet, skriv inn MWAA-Glue-Blog-Subnet1.
  8. Se gjennom detaljene og fullfør opprettelsen.
  9. Gjenta disse trinnene med PrivateSubnet2 og navngi forbindelsen MWAA-Glue-Blog-Subnet2.

Opprett AWS Glue-jobben

Nå oppretter du AWS Glue-jobben som vil bli utløst senere av Airflow-arbeidsflyten. Jobben bruker tilkoblingene som ble opprettet i forrige seksjon, men i stedet for å tilordne dem direkte på jobben, som du vanligvis ville gjort, lar du jobbtilkoblingslisten stå tom i dette scenariet og lar arbeidsflyten bestemme hvilken som skal brukes under kjøring.

Jobbskriptet i dette tilfellet er ikke signifikant og er bare ment å demonstrere at jobben kjørte i et av undernettene, avhengig av tilkoblingen.

  1. Velg på AWS Lim-konsollen ETL jobb i navigasjonsruten, og velg deretter Skriptredaktør.
  2. La standardalternativene (Spark engine og Start frisk) og velg Lag script.
  3. Erstatt plassholderskriptet med følgende Python-kode:
    import ipaddress
    import socket subnets = { "PrivateSubnet1": "10.192.20.0/24", "PrivateSubnet2": "10.192.21.0/24"
    } ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.items(): if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr): subnet_name = subnet print(f"The driver node has been assigned the ip: {ip}" + f" which belongs to the subnet: {subnet_name}")
    

  4. Gi jobben nytt navn til AirflowBlogJob.
  5. Jobbdetaljer fanen, for IAM-rolle, velg hvilken som helst rolle og skriv inn 2 for antall arbeidere (bare for nøysomhet).
  6. Lagre disse endringene slik at jobben opprettes.

Gi AWS Glue-tillatelser til Airflow-miljørollen

Rollen opprettet for Airflow av CloudFormation-malen gir de grunnleggende tillatelsene til å kjøre arbeidsflyter, men ikke til å samhandle med andre tjenester som AWS Glue. I et produksjonsprosjekt ville du definere dine egne maler med disse tilleggstillatelsene, men i dette innlegget legger du for enkelhets skyld til tilleggstillatelsene som en innebygd policy. Fullfør følgende trinn:

  1. Velg på IAM-konsollen Roller i navigasjonsruten.
  2. Finn rollen opprettet av malen; det vil starte med navnet du tildelte CloudFormation-stakken og deretter -MwaaExecutionRole-.
  3. På siden med rolledetaljer, på Legg til tillatelser meny, velg Lag inline policy.
  4. Bytt fra visuell til JSON-modus og skriv inn følgende JSON i tekstboksen. Den forutsetter at AWS Glue-rollen du har følger konvensjonen om å starte med AWSGlueServiceRole. For økt sikkerhet kan du erstatte jokertegnressursen på ec2:DescribeSubnets tillatelse med ARN-ene til de to private undernettene fra CloudFormation-stakken.
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetConnection" ], "Resource": [ "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*", "arn:aws:glue:*:*:catalog" ] }, { "Effect": "Allow", "Action": [ "glue:UpdateJob", "glue:GetJob", "glue:StartJobRun", "glue:GetJobRun" ], "Resource": [ "arn:aws:glue:*:*:job/AirflowBlogJob", "arn:aws:glue:*:*:job/BlogAirflow" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeSubnets" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole*" } ]
    }
    

  5. Velg neste.
  6. Enter GlueRelatedPermissions som policynavn og fullfør opprettelsen.

I dette eksemplet bruker vi en ETL-skriptjobb; for en visuell jobb, fordi den genererer skriptet automatisk ved lagring, vil Airflow-rollen trenge tillatelse til å skrive til den konfigurerte skriptbanen på Amazon enkel lagringstjeneste (Amazon S3).

Opprett Airflow DAG

En Airflow-arbeidsflyt er basert på en Directed Acyclic Graph (DAG), som er definert av en Python-fil som programmatisk spesifiserer de forskjellige oppgavene som er involvert og dens gjensidige avhengigheter. Fullfør følgende skript for å lage DAG:

  1. Lag en lokal fil med navnet glue_job_dag.py ved hjelp av et tekstredigeringsprogram.

I hvert av de følgende trinnene gir vi en kodebit for å legge inn i filen og en forklaring på hva det gjør.

  1. Følgende kodebit legger til de nødvendige Python-modulimportene. Modulene er allerede installert på Airflow; hvis det ikke var tilfelle, må du bruke en requirements.txt fil for å indikere til Airflow hvilke moduler som skal installeres. Den definerer også Boto3-klientene som koden skal bruke senere. Som standard vil de bruke samme rolle og region som Airflow, det er derfor du konfigurerer før rollen med de ekstra tillatelsene som kreves.
    import boto3
    from pendulum import datetime, duration
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, task
    from airflow.models import Variable
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator glue_client = boto3.client('glue')
    ec2 = boto3.client('ec2')
    

  2. Følgende kodebit legger til tre funksjoner for å implementere tilkoblingsrekkefølgestrategien, som definerer hvordan du skal omorganisere tilkoblingene som er gitt for å etablere deres prioritet. Dette er bare et eksempel; du kan bygge din egendefinerte kode for å implementere din egen logikk, i henhold til dine behov. Koden sjekker først IP-ene som er tilgjengelige på hvert tilkoblingsundernett og skiller de som har nok IP-er tilgjengelig til å kjøre jobben med full kapasitet og de som kan brukes fordi de har minst to IP-er tilgjengelig, som er minimum en jobb trenger å start. Hvis strategien er satt til random, vil den randomisere rekkefølgen innenfor hver av tilkoblingsgruppene som er beskrevet tidligere og legge til eventuelle andre tilkoblinger. Hvis strategien er capacity, vil den bestille dem fra de fleste IP-er gratis til færrest.
    def get_available_ips_from_connection(glue_connection_name): conn_response = glue_client.get_connection(Name=glue_connection_name) connection_properties = conn_response['Connection']['PhysicalConnectionRequirements'] subnet_id = connection_properties['SubnetId'] subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id]) return subnet_response['Subnets'][0]['AvailableIpAddressCount'] def get_connections_free_ips(glue_connection_names, num_workers): good_connections = [] usable_connections = [] for connection_name in glue_connection_names: try: available_ips = get_available_ips_from_connection(connection_name) # Priority to connections that can hold the full cluster and we haven't just tried if available_ips >= num_workers: good_connections.append((connection_name, available_ips)) elif available_ips >= 2: # The bare minimum to start a Glue job usable_connections.append((connection_name, available_ips)) except Exception as e: print(f"[WARNING] Failed to check the free ips for:{connection_name}, will skip. Exception: {e}") return good_connections, usable_connections def prioritize_connections(connection_list, num_workers, strategy): (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers) print(f"Good connections: {good_connections}") print(f"Usable connections: {usable_connections}") all_conn = [] if strategy=="random": shuffle(good_connections) shuffle(usable_connections) # Good connections have priority all_conn = good_connections + usable_connections elif strategy=="capacity": # We can sort both at the same time all_conn = good_connections + usable_connections all_conn.sort(key=lambda x: -x[1]) else: raise ValueError(f"Unknown strategy specified: {strategy}") result = [c[0] for c in all_conn] # Just need the name # Keep at the end any other connections that could not be checked for ips result += [c for c in connection_list if c not in result] return result
    

  3. Følgende kode oppretter selve DAG med kjørejobboppgaven, som oppdaterer jobben med tilkoblingsrekkefølgen definert av strategien, kjører den og venter på resultatene. Jobbnavnet, tilkoblingene og strategien kommer fra Airflow-variabler, slik at det enkelt kan konfigureres og oppdateres. Den har to gjenforsøk med eksponentiell backoff konfigurert, så hvis oppgavene mislykkes, vil den gjenta hele oppgaven inkludert tilkoblingsvalget. Kanskje det beste valget nå er en annen tilkobling, eller subnettet som tidligere ble valgt tilfeldig, er i en tilgjengelighetssone som for øyeblikket lider av en strømbrudd, og ved å velge en annen kan den komme seg.
    with DAG( dag_id='glue_job_dag', schedule_interval=None, # Run on demand only start_date=datetime(2000, 1, 1), # A start date is required max_active_runs=1, catchup=False
    ) as glue_dag: @task( task_id="glue_task", retries=2, retry_delay=duration(seconds = 30), retry_exponential_backoff=True ) def run_job_task(**ctx): glue_connections = Variable.get("glue_job_dag.glue_connections").strip().split(',') glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip() strategy= Variable.get('glue_job_dag.strategy', 'random') # random or capacity print(f"Connections available: {glue_connections}") print(f"Glue job name: {glue_jobname}") print(f"Strategy to use: {strategy}") job_props = glue_client.get_job(JobName=glue_jobname)['Job'] num_workers = job_props['NumberOfWorkers'] glue_connections = prioritize_connections(glue_connections, num_workers, strategy) print(f"Running Glue job with the connection order: {glue_connections}") existing_connections = job_props.get('Connections',{}).get('Connections', []) # Preserve other connections that we don't manage other_connections = [con for con in existing_connections if con not in glue_connections] job_props['Connections'] = {"Connections": glue_connections + other_connections} # Clean up properties so we can reuse the dict for the update request for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']: del job_props[prop_name] GlueJobOperator( task_id='submit_job', job_name=glue_jobname, iam_role_name=job_props['Role'].split('/')[-1], update_config=True, create_job_kwargs=job_props, wait_for_completion=True ).execute(ctx) run_job_task()
    

Opprett Airflow-arbeidsflyten

Nå oppretter du en arbeidsflyt som påkaller AWS Glue-jobben du nettopp opprettet:

  1. På Amazon S3-konsollen, finn bøtten opprettet av CloudFormation-malen, som vil ha et navn som starter med navnet på stabelen og deretter -environmentbucket- (for eksempel, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Opprett en mappe som heter i den bøtta dags, og i den mappen laster du opp DAG-filen glue_job_dag.py som du opprettet i forrige del.
  3. På Amazon MWAA-konsollen, naviger til miljøet du distribuerte med CloudFormation-stakken.

Hvis statusen ikke er ennå Tilgjengelig, vent til den når den tilstanden. Det bør ikke ta mer enn 30 minutter siden du implementerte CloudFormation-stakken.

  1. Velg miljøkoblingen i tabellen for å se miljødetaljene.

Den er konfigurert til å plukke opp DAG-er fra bøtta og mappen du brukte i de forrige trinnene. Airflow vil overvåke den mappen for endringer.

  1. Velg Åpne Airflow UI for å åpne en ny fane med tilgang til Airflow UI, ved å bruke den integrerte IAM-sikkerheten for å logge deg på.

Hvis det er noe problem med DAG-filen du opprettet, vil den vise en feil på toppen av siden som indikerer linjene som er berørt. I så fall, se gjennom trinnene og last opp på nytt. Etter noen sekunder vil den analysere den og oppdatere eller fjerne feilbanneret.

  1. admin meny, velg Variabler.
  2. Legg til tre variabler med følgende nøkler og verdier:
    1. nøkkel glue_job_dag.glue_connections med verdi MWAA-Glue-Blog-Subnet1,MWAA-Glue-Blog-Subnet2.
    2. nøkkel glue_job_dag.glue_job_name med verdi AirflowBlogJob.
    3. nøkkel glue_job_dag.strategy med verdi capacity.

Kjør jobben med en dynamisk subnett-tilordning

Nå er du klar til å kjøre arbeidsflyten og se at strategien dynamisk omorganiserer tilkoblingene.

  1. På Airflow UI velger du DAGs, og på raden glue_job_dag, velg avspillingsikonet.
  2. Søk meny, velg Oppgaveforekomster.
  3. Rull til høyre i forekomsttabellen for å vise Log Url og velg ikonet på den for å åpne loggen.

Loggen vil oppdateres etter hvert som oppgaven kjører; du kan finne linjen som starter med "Kjøre limjobb med tilkoblingsrekkefølgen:" og de forrige linjene som viser detaljer om tilkoblings-IP-ene og kategorien som er tildelt. Hvis det oppstår en feil, vil du se detaljene i denne loggen.

  1. Velg på AWS Lim-konsollen ETL jobb i navigasjonsruten, og velg deretter jobben AirflowBlogJob.
  2. Kjører fanen, velg kjøreforekomsten og deretter Utdatalogger lenke, som åpner en ny fane.
  3. På den nye fanen bruker du loggstrømkoblingen for å åpne den.

Den vil vise IP-en som driveren ble tildelt og hvilket subnett den tilhører, som skal samsvare med tilkoblingen angitt av Airflow (hvis loggen ikke vises, velg Fortsett så den blir oppdatert så snart den er tilgjengelig).

  1. Rediger Airflow-variabelen på Airflow-grensesnittet glue_job_dag.strategy å sette den til random.
  2. Kjør DAG flere ganger og se hvordan bestillingen endres.

Rydd opp

Hvis du ikke lenger trenger distribusjonen, slett ressursene for å unngå ytterligere kostnader:

  1. Slett Python-skriptet du lastet opp, slik at S3-bøtten kan slettes automatisk i neste trinn.
  2. Slett CloudFormation-stakken.
  3. Slett AWS Glue-jobben.
  4. Slett skriptet som jobben lagret i Amazon S3.
  5. Slett forbindelsene du opprettet som en del av dette innlegget.

konklusjonen

I dette innlegget viste vi hvordan AWS Glue og Amazon MWAA kan jobbe sammen for å bygge mer avanserte tilpassede arbeidsflyter, samtidig som drifts- og administrasjonsoverhead minimeres. Denne løsningen gir deg mer kontroll over hvordan AWS Glue-jobben din kjører for å møte spesielle drifts-, nettverks- eller sikkerhetskrav.

Du kan distribuere ditt eget Amazon MWAA-miljø på flere måter, for eksempel med mal brukt i dette innlegget, på Amazon MWAA-konsollen, eller ved å bruke AWS CLI. Du kan også implementere dine egne strategier for å orkestrere AWS Glue-jobber, basert på nettverksarkitekturen og kravene (for eksempel for å kjøre jobben nærmere dataene når det er mulig).


Om forfatterne

Michael Greenshtein er en analytisk spesialistløsningsarkitekt for offentlig sektor.

Gonzalo Herreros er senior Big Data Architect på AWS Glue-teamet.

Tidstempel:

Mer fra AWS Big Data