Združite AWS Glue in Amazon MWAA, da ustvarite napredno izbiro VPC in strategije za preklop | Spletne storitve Amazon

Združite AWS Glue in Amazon MWAA, da ustvarite napredno izbiro VPC in strategije za preklop | Spletne storitve Amazon

Izvorno vozlišče: 2490910

AWS lepilo je storitev integracije podatkov brez strežnika, ki omogoča preprosto odkrivanje, pripravo, premikanje in integracijo podatkov iz več virov za analitiko, strojno učenje (ML) in razvoj aplikacij.

Stranke AWS Glue morajo pogosto izpolnjevati stroge varnostne zahteve, ki včasih vključujejo zaklepanje omrežne povezljivosti, dovoljene za opravilo, ali izvajanje znotraj določenega VPC za dostop do druge storitve. Za izvajanje znotraj VPC je treba opravila dodeliti enemu podomrežju, vendar se lahko najprimernejše podomrežje sčasoma spremeni (na primer glede na uporabo in razpoložljivost), zato se boste morda raje odločili med izvajanjem, na podlagi na lastno strategijo.

Delovni tokovi, ki jih upravlja Amazon za Apache Airflow (Amazon MWAA) je storitev AWS za izvajanje upravljanih delovnih tokov Airflow, ki omogočajo pisanje logike po meri za usklajevanje izvajanja nalog, kot so opravila AWS Glue.

V tej objavi prikazujemo, kako zagnati opravilo AWS Glue kot del delovnega toka Airflow z dinamično nastavljivo izbiro podomrežja VPC, ki je dodeljeno opravilu med izvajanjem.

Pregled rešitev

Za delovanje znotraj VPC mora biti opravilu AWS Glue dodeljena vsaj povezava, ki vključuje konfiguracijo omrežja. Vsaka povezava omogoča določanje VPC, podomrežja in varnostne skupine, toda zaradi enostavnosti ta objava uporablja povezave vrste: NETWORK, ki samo definira konfiguracijo omrežja in ne vključuje zunanjih sistemov.

Če ima opravilo fiksno podomrežje, dodeljeno z eno samo povezavo, v primeru izpada storitve na Območja razpoložljivosti ali če podomrežje ni na voljo iz drugih razlogov, se opravilo ne more izvajati. Poleg tega vsako vozlišče (gonilnik ali delavec) v opravilu AWS Glue zahteva naslov IP, dodeljen iz podomrežja. Pri hkratnem izvajanju številnih velikih opravil lahko to privede do pomanjkanja naslova IP in opravila teče z manj vozlišči, kot je predvideno, ali pa se sploh ne izvaja.

Opravila pridobivanja, preoblikovanja in nalaganja (ETL) AWS Glue omogočajo določitev več povezav z več konfiguracijami omrežja. Vendar bo opravilo vedno poskušalo uporabiti omrežno konfiguracijo povezav v navedenem vrstnem redu in izbralo prvo, ki bo prestala zdravstvene preglede in ima vsaj dva naslova IP za začetek dela, kar morda ni najboljša možnost.

S to rešitvijo lahko izboljšate in prilagodite to vedenje tako, da dinamično preuredite povezave in definirate prednost izbire. Če je potreben ponovni poskus, se povezavam znova dodeli prednost glede na strategijo, ker so se pogoji morda spremenili od zadnjega zagona.

Posledično pomaga preprečevati, da se opravilo ne bi izvajalo ali delovalo pod zmogljivostjo zaradi pomanjkanja naslova IP podomrežja ali celo izpada, hkrati pa izpolnjuje zahteve glede varnosti omrežja in povezljivosti.

Naslednji diagram prikazuje arhitekturo rešitev.

Predpogoji

Če želite slediti korakom objave, potrebujete uporabnika, ki se lahko prijavi v Konzola za upravljanje AWS in ima dovoljenje za dostop do Amazon MWAA, Navidezni zasebni oblak Amazon (Amazon VPC) in AWS Glue. Regija AWS, kjer se odločite za namestitev rešitve, potrebuje zmogljivost za ustvarjanje VPC in dva elastična naslova IP. Privzeta regionalna kvota za obe vrsti virov je pet, zato boste morda morali zahtevati povečanje prek konzole.

Potrebujete tudi AWS upravljanje identitete in dostopa (IAM) primerna vloga za izvajanje opravil AWS Glue, če je še nimate. Za navodila glejte Ustvarite vlogo IAM za AWS Glue.

Namestite okolje Airflow in VPC

Najprej boste uvedli novo okolje Airflow, vključno z ustvarjanjem novega VPC z dvema javnima podomrežjema in dvema zasebnima. To je zato, ker Amazon MWAA zahteva toleranco napak območij razpoložljivosti, zato mora delovati v dveh podomrežjih na dveh različnih območjih razpoložljivosti v regiji. Javna podomrežja se uporabljajo tako, da lahko NAT Gateway zagotovi internetni dostop za zasebna podomrežja.

Izvedite naslednje korake:

  1. Ustvari Oblikovanje oblaka AWS predlogo v računalniku, tako da kopirate predlogo iz naslednjega hitri vodnik za začetek v lokalno besedilno datoteko.
  2. Na konzoli AWS CloudFormation izberite Skladovnice v podoknu za krmarjenje.
  3. Izberite Ustvari sklad z možnostjo Z novimi viri (standardno).
  4. Izberite Naložite datoteko predloge in izberite datoteko lokalne predloge.
  5. Izberite Naslednji.
  6. Dokončajte nastavitvene korake, vnesite ime za okolje in pustite ostale parametre privzete.
  7. Na zadnjem koraku potrdite, da bodo viri ustvarjeni, in izberite Prijave se.

Ustvarjanje lahko traja 20–30 minut, dokler se stanje sklada ne spremeni v CREATE_COMPLETE.

Vir, ki bo vzel največ časa, je okolje Airflow. Medtem ko se ustvarja, lahko nadaljujete z naslednjimi koraki, dokler ne boste morali odpreti uporabniškega vmesnika Airflow.

  1. Na skladovnici viri zavihku zabeležite ID-je za VPC in dve zasebni podomrežji (PrivateSubnet1 in PrivateSubnet2), za uporabo v naslednjem koraku.

Ustvarite povezave AWS Glue

Predloga CloudFormation razmesti dve zasebni podomrežji. V tem koraku z vsakim ustvarite povezavo AWS Glue, tako da se lahko v njih izvajajo opravila AWS Glue. Amazon MWAA je nedavno dodal zmogljivost za izvajanje gruče Airflow na skupnih VPC-jih, kar zmanjša stroške in poenostavi upravljanje omrežja. Za več informacij glejte Predstavljamo skupno podporo VPC na Amazon MWAA.

Izvedite naslednje korake, da ustvarite povezave:

  1. Na konzoli AWS Glue izberite Podatkovne povezave v podoknu za krmarjenje.
  2. Izberite Ustvarite povezavo.
  3. Izberite mreža kot vir podatkov.
  4. Izberite VPC in zasebno podomrežje (PrivateSubnet1), ki jih je ustvaril sklad CloudFormation.
  5. Uporabite privzeto varnostno skupino.
  6. Izberite Naslednji.
  7. Za ime povezave vnesite MWAA-Glue-Blog-Subnet1.
  8. Preglejte podrobnosti in dokončajte ustvarjanje.
  9. Ponovite te korake z uporabo PrivateSubnet2 in poimenujte povezavo MWAA-Glue-Blog-Subnet2.

Ustvarite opravilo AWS Glue

Zdaj ustvarite opravilo AWS Glue, ki ga bo pozneje sprožil potek dela Airflow. Opravilo uporablja povezave, ustvarjene v prejšnjem razdelku, vendar namesto, da bi jih dodelili neposredno opravilu, kot bi običajno storili, v tem scenariju pustite seznam povezav opravila prazen in prepustite delovnemu toku, da odloči, katero bo uporabil med izvajanjem.

Skript opravila v tem primeru ni pomemben in je namenjen samo prikazu izvajanja opravila v enem od podomrežij, odvisno od povezave.

  1. Na konzoli AWS Glue izberite ETL delovna mesta v podoknu za krmarjenje in nato izberite Urejevalnik skriptov.
  2. Pustite privzete možnosti (motor Spark in Začnite sveže) in izberite Ustvari skript.
  3. Zamenjajte nadomestni skript z naslednjo kodo Python:
    import ipaddress
    import socket subnets = { "PrivateSubnet1": "10.192.20.0/24", "PrivateSubnet2": "10.192.21.0/24"
    } ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.items(): if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr): subnet_name = subnet print(f"The driver node has been assigned the ip: {ip}" + f" which belongs to the subnet: {subnet_name}")
    

  4. Preimenujte opravilo v AirflowBlogJob.
  5. o Podrobnosti o delovnem mestu zavihek, za Vloga IAM, izberite poljubno vlogo in vnesite 2 za število delavcev (zgolj zaradi varčnosti).
  6. Shranite te spremembe, da bo delo ustvarjeno.

Dodelite dovoljenja AWS Glue vlogi okolja Airflow

Vloga, ustvarjena za Airflow s predlogo CloudFormation, zagotavlja osnovna dovoljenja za izvajanje delovnih tokov, ne pa tudi za interakcijo z drugimi storitvami, kot je AWS Glue. V produkcijskem projektu bi definirali lastne predloge s temi dodatnimi dovoljenji, vendar v tej objavi zaradi poenostavitve dodate dodatna dovoljenja kot vgrajeni pravilnik. Izvedite naslednje korake:

  1. Na konzoli IAM izberite vloge v podoknu za krmarjenje.
  2. Poiščite vlogo, ki jo je ustvarila predloga; začel se bo z imenom, ki ste ga dodelili skladu CloudFormation in nato -MwaaExecutionRole-.
  3. Na strani s podrobnostmi o vlogi, na Dodajte dovoljenja izberite meni Ustvari vgrajeno politiko.
  4. Preklopite iz vizualnega v način JSON in v besedilno polje vnesite naslednji JSON. Predpostavlja, da vaša vloga AWS Glue sledi konvenciji za začetek AWSGlueServiceRole. Za večjo varnost lahko zamenjate vir z nadomestnimi znaki na ec2:DescribeSubnets dovoljenje z ARN-ji dveh zasebnih podomrežij iz sklada CloudFormation.
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetConnection" ], "Resource": [ "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*", "arn:aws:glue:*:*:catalog" ] }, { "Effect": "Allow", "Action": [ "glue:UpdateJob", "glue:GetJob", "glue:StartJobRun", "glue:GetJobRun" ], "Resource": [ "arn:aws:glue:*:*:job/AirflowBlogJob", "arn:aws:glue:*:*:job/BlogAirflow" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeSubnets" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole*" } ]
    }
    

  5. Izberite Naslednji.
  6. Vnesite GlueRelatedPermissions kot ime pravilnika in dokončajte ustvarjanje.

V tem primeru uporabljamo opravilo skripta ETL; za vizualno opravilo, ker samodejno ustvari skript ob shranjevanju, bi vloga Airflow potrebovala dovoljenje za pisanje na konfigurirano pot skripta na Preprosta storitev shranjevanja Amazon (Amazon S3).

Ustvarite DAG za pretok zraka

Potek dela Airflow temelji na usmerjenem acikličnem grafu (DAG), ki je definiran z datoteko Python, ki programsko določa različne vključene naloge in njihove soodvisnosti. Izpolnite naslednje skripte, da ustvarite DAG:

  1. Ustvari lokalno datoteko z imenom glue_job_dag.py z uporabo urejevalnika besedil.

V vsakem od naslednjih korakov nudimo delček kode za vnos v datoteko in razlago, kaj počne.

  1. Naslednji delček doda zahtevane uvoze modulov Python. Moduli so že nameščeni na Airflow; če ne bi bilo tako, bi morali uporabiti a requirements.txt datoteko, ki Airflowu nakazuje, katere module naj namesti. Definira tudi odjemalce Boto3, ki jih bo koda uporabila pozneje. Privzeto bodo uporabljali isto vlogo in regijo kot Airflow, zato ste pred vlogo nastavili zahtevana dodatna dovoljenja.
    import boto3
    from pendulum import datetime, duration
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, task
    from airflow.models import Variable
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator glue_client = boto3.client('glue')
    ec2 = boto3.client('ec2')
    

  2. Naslednji delček doda tri funkcije za izvajanje strategije vrstnega reda povezav, ki določa, kako preurediti dane povezave, da se določi njihova prioriteta. To je samo primer; lahko sestavite kodo po meri za implementacijo lastne logike glede na vaše potrebe. Koda najprej preveri IP-je, ki so na voljo v vsakem povezovalnem podomrežju, in loči tiste, ki imajo na voljo dovolj IP-jev za izvajanje opravila s polno zmogljivostjo, in tiste, ki bi jih lahko uporabili, ker imajo na voljo vsaj dva IP-ja, kar je najmanj, kar mora opravilo začetek. Če je strategija nastavljena na random, bo naključno določil vrstni red znotraj vsake od prej opisanih povezovalnih skupin in dodal vse druge povezave. Če je strategija capacity, jih bo razvrstil od večine brezplačnih IP-jev do najmanjšega.
    def get_available_ips_from_connection(glue_connection_name): conn_response = glue_client.get_connection(Name=glue_connection_name) connection_properties = conn_response['Connection']['PhysicalConnectionRequirements'] subnet_id = connection_properties['SubnetId'] subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id]) return subnet_response['Subnets'][0]['AvailableIpAddressCount'] def get_connections_free_ips(glue_connection_names, num_workers): good_connections = [] usable_connections = [] for connection_name in glue_connection_names: try: available_ips = get_available_ips_from_connection(connection_name) # Priority to connections that can hold the full cluster and we haven't just tried if available_ips >= num_workers: good_connections.append((connection_name, available_ips)) elif available_ips >= 2: # The bare minimum to start a Glue job usable_connections.append((connection_name, available_ips)) except Exception as e: print(f"[WARNING] Failed to check the free ips for:{connection_name}, will skip. Exception: {e}") return good_connections, usable_connections def prioritize_connections(connection_list, num_workers, strategy): (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers) print(f"Good connections: {good_connections}") print(f"Usable connections: {usable_connections}") all_conn = [] if strategy=="random": shuffle(good_connections) shuffle(usable_connections) # Good connections have priority all_conn = good_connections + usable_connections elif strategy=="capacity": # We can sort both at the same time all_conn = good_connections + usable_connections all_conn.sort(key=lambda x: -x[1]) else: raise ValueError(f"Unknown strategy specified: {strategy}") result = [c[0] for c in all_conn] # Just need the name # Keep at the end any other connections that could not be checked for ips result += [c for c in connection_list if c not in result] return result
    

  3. Naslednja koda ustvari sam DAG z nalogo izvajanja opravila, ki posodobi opravilo z vrstnim redom povezave, ki ga določa strategija, ga zažene in čaka na rezultate. Ime opravila, povezave in strategija izvirajo iz spremenljivk Airflow, zato jih je mogoče enostavno konfigurirati in posodobiti. Ima dva konfigurirana ponovna poskusa z eksponentnim odmikom, tako da bo, če nalogi ne uspeta, ponovil celotno nalogo, vključno z izbiro povezave. Morda je zdaj najboljša izbira druga povezava ali pa je podomrežje, ki je bilo prej naključno izbrano, v območju razpoložljivosti, ki trenutno trpi zaradi izpada, in če izberete drugo, se lahko obnovi.
    with DAG( dag_id='glue_job_dag', schedule_interval=None, # Run on demand only start_date=datetime(2000, 1, 1), # A start date is required max_active_runs=1, catchup=False
    ) as glue_dag: @task( task_id="glue_task", retries=2, retry_delay=duration(seconds = 30), retry_exponential_backoff=True ) def run_job_task(**ctx): glue_connections = Variable.get("glue_job_dag.glue_connections").strip().split(',') glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip() strategy= Variable.get('glue_job_dag.strategy', 'random') # random or capacity print(f"Connections available: {glue_connections}") print(f"Glue job name: {glue_jobname}") print(f"Strategy to use: {strategy}") job_props = glue_client.get_job(JobName=glue_jobname)['Job'] num_workers = job_props['NumberOfWorkers'] glue_connections = prioritize_connections(glue_connections, num_workers, strategy) print(f"Running Glue job with the connection order: {glue_connections}") existing_connections = job_props.get('Connections',{}).get('Connections', []) # Preserve other connections that we don't manage other_connections = [con for con in existing_connections if con not in glue_connections] job_props['Connections'] = {"Connections": glue_connections + other_connections} # Clean up properties so we can reuse the dict for the update request for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']: del job_props[prop_name] GlueJobOperator( task_id='submit_job', job_name=glue_jobname, iam_role_name=job_props['Role'].split('/')[-1], update_config=True, create_job_kwargs=job_props, wait_for_completion=True ).execute(ctx) run_job_task()
    

Ustvarite potek dela Airflow

Zdaj ustvarite potek dela, ki prikliče opravilo AWS Glue, ki ste ga pravkar ustvarili:

  1. Na konzoli Amazon S3 poiščite vedro, ustvarjeno s predlogo CloudFormation, ki bo imelo ime, ki se začne z imenom sklada in nato -environmentbucket- (npr. myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Znotraj tega vedra ustvarite mapo z imenom dagsin znotraj te mape naložite datoteko DAG glue_job_dag.py ki ste jih ustvarili v prejšnjem razdelku.
  3. Na konzoli Amazon MWAA se pomaknite do okolja, ki ste ga uvedli s skladom CloudFormation.

Če stanje še ni Na voljo, počakajte, da doseže to stanje. Ne bi smelo trajati dlje kot 30 minut, odkar ste uvedli sklad CloudFormation.

  1. Za ogled podrobnosti okolja izberite povezavo okolja v tabeli.

Konfiguriran je za prevzem datotek DAG iz vedra in mape, ki ste ju uporabili v prejšnjih korakih. Airflow bo spremljal to mapo glede sprememb.

  1. Izberite Odprite uporabniški vmesnik Airflow da odprete nov zavihek z dostopom do uporabniškega vmesnika Airflow z uporabo integrirane varnosti IAM za prijavo.

Če obstaja kakšna težava z datoteko DAG, ki ste jo ustvarili, bo na vrhu strani prikazana napaka, ki označuje prizadete vrstice. V tem primeru preglejte korake in znova naložite. Po nekaj sekundah ga bo razčlenil in posodobil ali odstranil pasico z napako.

  1. o admin izberite meni Spremenljivke.
  2. Dodajte tri spremenljivke z naslednjimi ključi in vrednostmi:
    1. Ključne glue_job_dag.glue_connections z vrednostjo MWAA-Glue-Blog-Subnet1,MWAA-Glue-Blog-Subnet2.
    2. Ključne glue_job_dag.glue_job_name z vrednostjo AirflowBlogJob.
    3. Ključne glue_job_dag.strategy z vrednostjo capacity.

Zaženite opravilo z dinamično dodelitvijo podomrežja

Zdaj ste pripravljeni zagnati potek dela in videti strategijo, ki dinamično preureja povezave.

  1. V uporabniškem vmesniku Airflow izberite DAG, in na vrsti glue_job_dag, izberite ikono za predvajanje.
  2. o Brskanje izberite meni Primeri nalog.
  3. V tabeli primerkov se pomaknite desno, da prikažete Log Url in izberite ikono na njem, da odprete dnevnik.

Dnevnik se bo posodabljal med izvajanjem naloge; poiščete lahko vrstico, ki se začne z »Running Glue job with the connection order:« in prejšnje vrstice, ki prikazujejo podrobnosti IP-jev povezave in dodeljene kategorije. Če pride do napake, boste videli podrobnosti v tem dnevniku.

  1. Na konzoli AWS Glue izberite ETL delovna mesta v podoknu za krmarjenje, nato izberite opravilo AirflowBlogJob.
  2. o Teče izberite zavihek zagon in nato Izhodni dnevniki povezavo, ki bo odprla nov zavihek.
  3. Na novem zavihku uporabite povezavo toka dnevnika, da ga odprete.

Prikazal bo IP, ki je bil dodeljen gonilniku, in kateremu podomrežju pripada, kar se mora ujemati s povezavo, ki jo označuje Airflow (če dnevnik ni prikazan, izberite Resume tako da se posodobi takoj, ko je na voljo).

  1. V uporabniškem vmesniku Airflow uredite spremenljivko Airflow glue_job_dag.strategy nastaviti na random.
  2. Večkrat zaženite DAG in si oglejte, kako se spreminja vrstni red.

Čiščenje

Če uvajanja ne potrebujete več, izbrišite vire, da se izognete nadaljnjim stroškom:

  1. Izbrišite skript Python, ki ste ga naložili, tako da bo lahko vedro S3 samodejno izbrisano v naslednjem koraku.
  2. Izbrišite sklad CloudFormation.
  3. Izbrišite opravilo AWS Glue.
  4. Izbrišite skript, ki ga je opravilo shranilo v Amazon S3.
  5. Izbrišite povezave, ki ste jih ustvarili kot del te objave.

zaključek

V tej objavi smo pokazali, kako lahko AWS Glue in Amazon MWAA sodelujeta pri izdelavi naprednejših delovnih tokov po meri, hkrati pa zmanjšata stroške delovanja in upravljanja. Ta rešitev vam daje večji nadzor nad tem, kako vaše opravilo AWS Glue poteka, da izpolnite posebne operativne, omrežne ali varnostne zahteve.

Svoje okolje Amazon MWAA lahko uvedete na več načinov, na primer z Predloga uporabljen v tej objavi, na konzoli Amazon MWAA ali z uporabo AWS CLI. Prav tako lahko implementirate lastne strategije za orkestriranje opravil AWS Glue, ki temeljijo na vaši omrežni arhitekturi in zahtevah (na primer, da zaženete opravilo bližje podatkom, ko je to mogoče).


O avtorjih

Michael Greenshtein je specialist za analitične rešitve za javni sektor.

Gonzalo Herreros je višji arhitekt za velike podatke v skupini AWS Glue.

Časovni žig:

Več od Veliki podatki AWS