Automatize a replicação de fontes relacionais em um data lake transacional com Apache Iceberg e AWS Glue

Automatize a replicação de fontes relacionais em um data lake transacional com Apache Iceberg e AWS Glue

Nó Fonte: 1958466

As organizações optaram por construir data lakes em cima de Serviço de armazenamento simples da Amazon (Amazon S3) por muitos anos. Um data lake é a escolha mais popular para as organizações armazenarem todos os seus dados organizacionais gerados por diferentes equipes, entre domínios de negócios, de todos os formatos diferentes e até mesmo ao longo do histórico. De acordo com um estudo, a empresa média está vendo o volume de seus dados crescer a uma taxa superior a 50% ao ano, geralmente gerenciando uma média de 33 fontes de dados exclusivas para análise.

As equipes geralmente tentam replicar milhares de trabalhos de bancos de dados relacionais com o mesmo padrão de extração, transformação e carregamento (ETL). Há muito esforço em manter os estados de trabalho e agendar esses trabalhos individuais. Essa abordagem ajuda as equipes a adicionar tabelas com poucas alterações e também mantém o status do trabalho com o mínimo de esforço. Isso pode levar a uma grande melhoria no cronograma de desenvolvimento e no rastreamento dos trabalhos com facilidade.

Nesta postagem, mostramos como replicar facilmente todos os seus armazenamentos de dados relacionais em um data lake transacional de maneira automatizada com um único trabalho ETL usando Apache Iceberg e Cola AWS.

Arquitetura da solução

Os data lakes são geralmente organizado usando baldes S3 separados para três camadas de dados: a camada bruta contendo dados em sua forma original, a camada de estágio contendo dados processados ​​intermediários otimizados para consumo e a camada analítica contendo dados agregados para casos de uso específicos. Na camada bruta, as tabelas geralmente são organizadas com base em suas fontes de dados, enquanto as tabelas na camada de estágio são organizadas com base nos domínios de negócios aos quais pertencem.

Esta postagem fornece um Formação da Nuvem AWS modelo que implanta um trabalho do AWS Glue que lê um caminho do Amazon S3 para uma fonte de dados da camada bruta do data lake e ingere os dados em tabelas Apache Iceberg na camada de palco usando Suporte do AWS Glue para estruturas de data lake. O trabalho espera que as tabelas na camada bruta sejam estruturadas da maneira Serviço de migração de banco de dados AWS (AWS DMS) os ingere: esquema, tabela e arquivos de dados.

Esta solução usa Armazenamento de parâmetros do AWS Systems Manager para configuração de mesa. Você deve modificar este parâmetro especificando as tabelas que deseja processar e como, incluindo informações como chave primária, partições e o domínio de negócios associado. O trabalho usa essas informações para criar automaticamente um banco de dados (se ainda não existir) para cada domínio de negócios, criar as tabelas Iceberg e executar o carregamento de dados.

Finalmente, podemos usar Amazona atena para consultar os dados nas tabelas Iceberg.

O diagrama a seguir ilustra essa arquitetura.

Arquitetura da solução

Essa implementação tem as seguintes considerações:

  • Todas as tabelas da fonte de dados devem ter uma chave primária para serem replicadas usando esta solução. A chave primária pode ser uma única coluna ou uma chave composta com mais de uma coluna.
  • Se o data lake contiver tabelas que não precisem de upserts ou não tenham uma chave primária, você poderá excluí-las da configuração do parâmetro e implementar processos ETL tradicionais para ingeri-las no data lake. Isso está fora do escopo deste post.
  • Se houver fontes de dados adicionais que precisam ser ingeridas, você pode implantar várias pilhas do CloudFormation, uma para lidar com cada fonte de dados.
  • O trabalho do AWS Glue foi projetado para processar dados em duas fases: o carregamento inicial executado após o AWS DMS concluir a tarefa de carregamento completo e o carregamento incremental executado em uma programação que aplica arquivos de captura de dados alterados (CDC) capturados pelo AWS DMS. O processamento incremental é executado usando um Marcador de trabalho do AWS Glue.

Há nove etapas para concluir este tutorial:

  1. Configure um endpoint de origem para o AWS DMS.
  2. Implante a solução usando o AWS CloudFormation.
  3. Revise a tarefa de replicação do AWS DMS.
  4. Opcionalmente, adicione permissões para criptografia e descriptografia ou Formação AWS Lake.
  5. Revise a configuração da tabela no Parameter Store.
  6. Execute o carregamento de dados inicial.
  7. Execute o carregamento de dados incremental.
  8. Monitore a ingestão da tabela.
  9. Programe o carregamento incremental de dados em lote.

Pré-requisitos

Antes de iniciar este tutorial, você já deve estar familiarizado com o Iceberg. Caso contrário, você pode começar replicando uma única tabela seguindo as instruções em Implemente um UPSERT baseado em CDC em um data lake usando Apache Iceberg e AWS Glue. Além disso, configure o seguinte:

Configurar um endpoint de origem para o AWS DMS

Antes de criarmos nossa tarefa do AWS DMS, precisamos configurar um endpoint de origem para se conectar ao banco de dados de origem:

  1. No console do AWS DMS, escolha Pontos finais no painel de navegação.
  2. Escolha Criar ponto final.
  3. Se seu banco de dados estiver em execução no Amazon RDS, escolha Selecione a instância de banco de dados RDS, em seguida, escolha a instância na lista. Caso contrário, escolha o mecanismo de origem e forneça as informações de conexão por meio de Gerenciador de segredos da AWS ou manualmente.
  4. Escolha Identificador de terminal, insira um nome para o terminal; por exemplo, source-postgresql.
  5. Escolha Criar ponto final.

Implante a solução usando AWS CloudFormation

Crie uma pilha do CloudFormation usando o modelo fornecido. Conclua as seguintes etapas:

  1. Escolha Pilha de lançamento:
  2. Escolha Próximo.
  3. Forneça um nome de pilha, como transactionaldl-postgresql.
  4. Digite os parâmetros necessários:
    1. DMSS3EndpointIAMRoleARN – O ARN da função IAM para AWS DMS para gravar dados no Amazon S3.
    2. ReplicaçãoInstanceArn – O ARN da instância de replicação do AWS DMS.
    3. S3BucketStage – O nome do bucket existente usado para a camada de estágio do data lake.
    4. S3BaldeCola – O nome do depósito S3 existente para armazenar scripts do AWS Glue.
    5. S3BucketRaw – O nome do depósito existente usado para a camada bruta do data lake.
    6. FonteEndpointArn – O ARN do endpoint do AWS DMS que você criou anteriormente.
    7. Sourcename – O identificador arbitrário da fonte de dados a replicar (por exemplo, postgres). Isso é usado para definir o caminho S3 do data lake (camada bruta) onde os dados serão armazenados.
  5. Não modifique os seguintes parâmetros:
    1. FonteS3BucketBlog – O nome do depósito no qual o script AWS Glue fornecido está armazenado.
    2. FonteS3BucketPrefix – O nome do prefixo do depósito no qual o script do AWS Glue fornecido está armazenado.
  6. Escolha Próximo duas vezes.
  7. Selecionar Eu reconheço que o AWS CloudFormation pode criar recursos IAM com nomes personalizados.
  8. Escolha Criar pilha.

Após aproximadamente 5 minutos, a pilha do CloudFormation é implantada.

Revise a tarefa de replicação do AWS DMS

A implantação do AWS CloudFormation criou um endpoint de destino do AWS DMS para você. Devido a duas configurações de endpoint específicas, os dados serão ingeridos conforme necessário no Amazon S3.

  1. No console do AWS DMS, escolha Pontos finais no painel de navegação.
  2. Pesquise e escolha o endpoint que começa com dmsIcebergs3endpoint.
  3. Revise as configurações do endpoint:
    1. DataFormat é especificado como parquet.
    2. TimestampColumnName irá adicionar a coluna last_update_time com a data de criação dos registros no Amazon S3.

Configurações de endpoint do AWS DMS

A implantação também cria uma tarefa de replicação do AWS DMS que começa com dmsicebergtask.

  1. Escolha Tarefas de replicação no painel de navegação e procure a tarefa.

Você verá que o Tipo de Tarefa está marcado como Carga total, replicação contínua. O AWS DMS executará um carregamento completo inicial dos dados existentes e, em seguida, criará arquivos incrementais com alterações realizadas no banco de dados de origem.

No Regras de Mapeamento guia, existem dois tipos de regras:

  • Uma regra de seleção com o nome do esquema de origem e as tabelas que serão ingeridas do banco de dados de origem. Por padrão, ele usa o banco de dados de amostra fornecido nos pré-requisitos, dms_sample, e todas as tabelas com a palavra-chave %.
  • Duas regras de transformação que incluem nos arquivos de destino no Amazon S3 o nome do esquema e o nome da tabela como colunas. Isso é usado por nosso trabalho do AWS Glue para saber a quais tabelas correspondem os arquivos no data lake.

Para saber mais sobre como personalizar isso para suas próprias fontes de dados, consulte Regras e ações de seleção.

Regras de mapeamento da AWS

Vamos alterar algumas configurações para finalizar a preparação da nossa tarefa.

  1. No Opções menu, escolha modificar.
  2. No Configurações da Tarefa seção, sob Parar a tarefa após a conclusão do carregamento completo, escolha Parar depois de aplicar as alterações em cache.

Dessa forma, podemos controlar a carga inicial e a geração de arquivos incrementais como duas etapas diferentes. Usamos essa abordagem de duas etapas para executar o trabalho do AWS Glue uma vez a cada etapa.

  1. Debaixo Registros de tarefas, escolha Ativar logs do CloudWatch.
  2. Escolha Salvar.
  3. Aguarde cerca de 1 minuto para que o status da tarefa de migração do banco de dados seja exibido como Pronto.

Adicione permissões para criptografia e descriptografia ou Lake Formation

Opcionalmente, você pode adicionar permissões para criptografia e descriptografia ou Lake Formation.

Adicionar permissões de criptografia e descriptografia

Se seus buckets S3 usados ​​para as camadas brutas e de estágio forem criptografados usando Serviço de gerenciamento de chaves AWS (AWS KMS), você precisa adicionar permissões para permitir que o trabalho do AWS Glue acesse os dados:

Adicionar permissões de formação do lago

Se você estiver gerenciando permissões usando o Lake Formation, precisará permitir que seu trabalho do AWS Glue crie os bancos de dados e tabelas de seu domínio por meio da função IAM GlueJobRole.

  1. Conceda permissões para criar bancos de dados (para obter instruções, consulte Criando um Banco de Dados).
  2. Conceda permissões SUPER ao default base de dados.
  3. Conceder permissões de localização de dados.
  4. Se você criar bancos de dados manualmente, conceda permissões em todos os bancos de dados para criar tabelas. Referir-se Concedendo permissões de tabela usando o console do Lake Formation e o método de recurso nomeado or Concedendo permissões do Data Catalog usando o método LF-TBAC de acordo com o seu caso de uso.

Depois de concluir a etapa posterior de execução do carregamento de dados inicial, certifique-se de incluir também permissões para que os consumidores consultem as tabelas. A função de trabalho se tornará a proprietária de todas as tabelas criadas e o administrador do data lake poderá realizar concessões para usuários adicionais.

Revise a configuração da tabela no Parameter Store

O trabalho do AWS Glue que executa a ingestão de dados em tabelas Iceberg usa a especificação de tabela fornecida no Parameter Store. Conclua as etapas a seguir para revisar o armazenamento de parâmetros que foi configurado automaticamente para você. Se necessário, modifique de acordo com suas próprias necessidades.

  1. No console Parameter Store, escolha Meus parâmetros no painel de navegação.

A pilha do CloudFormation criou dois parâmetros:

  • iceberg-config para configurações de trabalho
  • iceberg-tables para configuração de mesa
  1. Escolha o parâmetro mesas-iceberg.

A estrutura JSON contém informações que o AWS Glue usa para ler dados e gravar as tabelas Iceberg no domínio de destino:

  • Um objeto por tabela – O nome do objeto é criado usando o nome do esquema, um ponto e o nome da tabela; por exemplo, schema.table.
  • chave primária – Isso deve ser especificado para cada tabela de origem. Você pode fornecer uma única coluna ou uma lista de colunas separadas por vírgulas (sem espaços).
  • partiçãoCols – Isso opcionalmente particiona colunas para tabelas de destino. Se você não deseja criar tabelas particionadas, forneça uma string vazia. Caso contrário, forneça uma única coluna ou uma lista separada por vírgulas de colunas a serem usadas (sem espaços).
  1. Se você quiser usar sua própria fonte de dados, use o seguinte código JSON e substitua o texto em CAPS do modelo fornecido. Se você estiver usando a fonte de dados de amostra fornecida, mantenha as configurações padrão:
{ "SCHEMA_NAME.TABLE_NAME_1": { "primaryKey": "ONLY_PRIMARY_KEY", "domain": "TARGET_DOMAIN", "partitionCols": "" }, "SCHEMA_NAME.TABLE_NAME_2": { "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY", "domain": "TARGET_DOMAIN", "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO" }
}
  1. Escolha Salvar as alterações .

Execute o carregamento inicial de dados

Agora que a configuração necessária foi concluída, ingerimos os dados iniciais. Esta etapa inclui três partes: ingerir os dados do banco de dados relacional de origem na camada bruta do data lake, criar as tabelas Iceberg na camada de estágio do data lake e verificar os resultados usando o Athena.

Ingerir dados na camada bruta do data lake

Para ingerir dados da fonte de dados relacional (PostgreSQL se você estiver usando o exemplo fornecido) para nosso data lake transacional usando o Iceberg, conclua as seguintes etapas:

  1. No console do AWS DMS, escolha Tarefas de migração de banco de dados no painel de navegação.
  2. Selecione a tarefa de replicação que você criou e no Opções menu, escolha Reiniciar/Retomar.
  3. Aguarde cerca de 5 minutos para que a tarefa de replicação seja concluída. Você pode monitorar as tabelas ingeridas no Estatísticas guia da tarefa de replicação.

Estatísticas de carga completa do AWS DMS

Após alguns minutos, a tarefa termina com a mensagem Carga completa concluída.

  1. No console do Amazon S3, escolha o bucket que você definiu como a camada bruta.

Sob o prefixo S3 definido no AWS DMS (por exemplo, postgres), você deverá ver uma hierarquia de pastas com a seguinte estrutura:

  • Esquema
    • Nome da mesa
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

Objetos de carregamento completo do AWS DMS criados no S3

Se seu bucket S3 estiver vazio, revise Solução de problemas de tarefas de migração no AWS Database Migration Service antes de executar o trabalho do AWS Glue.

Criar e ingerir dados em tabelas Iceberg

Antes de executar o trabalho, vamos navegar pelo script do trabalho do AWS Glue fornecido como parte da pilha do CloudFormation para entender seu comportamento.

  1. No console do AWS Glue Studio, escolha Empregos no painel de navegação.
  2. Pesquise a vaga que começa com IcebergJob- e um sufixo do nome da pilha do CloudFormation (por exemplo, IcebergJob-transactionaldl-postgresql).
  3. Escolha o trabalho.

Avaliação do trabalho ETL do AWS Glue

O script de trabalho obtém a configuração necessária do Parameter Store. A função getConfigFromSSM() retorna configurações relacionadas ao trabalho, como buckets de origem e destino, de onde os dados precisam ser lidos e gravados. a variável ssmparam_table_values contêm informações relacionadas à tabela, como domínio de dados, nome da tabela, colunas de partição e chave primária das tabelas que precisam ser ingeridas. Veja o seguinte código Python:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables" # Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

O script usa um nome de catálogo arbitrário para Iceberg que é definido como my_catalog. Isso é implementado no Catálogo de dados do AWS Glue usando configurações do Spark, portanto, uma operação SQL apontando para my_catalog será aplicada no Catálogo de dados. Veja o seguinte código:

catalog_name = 'my_catalog'
errored_table_list = [] # Iceberg configuration
spark = SparkSession.builder .config('spark.sql.warehouse.dir', warehouse_path) .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') .getOrCreate()

O script itera sobre as tabelas definidas no Parameter Store e executa a lógica para detectar se a tabela existe e se os dados recebidos são um carregamento inicial ou um upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values: # Get table data isTableExists = False schemaName, tableName = key.split('.') logger.info(f'Processing table : {tableName}')

A initialLoadRecordsSparkSQL() A função carrega os dados iniciais quando nenhuma coluna de operação está presente nos arquivos S3. O AWS DMS adiciona esta coluna apenas aos arquivos de dados Parquet produzidos pela replicação contínua (CDC). O carregamento de dados é realizado usando o comando INSERT INTO com SparkSQL. Veja o seguinte código:

sqltemp = Template(""" INSERT INTO $catalog_name.$dbName.$tableName ($insertTableColumnList) SELECT $insertTableColumnList FROM insertTable $partitionStrSQL """)
SQLQUERY = sqltemp.substitute( catalog_name = catalog_name, dbName = dbName, tableName = tableName, insertTableColumnList = insertTableColumnList[ : -1], partitionStrSQL = partitionStrSQL) logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Agora executamos o trabalho do AWS Glue para inserir os dados iniciais nas tabelas Iceberg. A pilha do CloudFormation adiciona o --datalake-formats parâmetro, adicionando as bibliotecas Iceberg necessárias ao trabalho.

  1. Escolha Executar trabalho.
  2. Escolha Execuções de trabalho para monitorar o estado. Aguarde até que o status seja Executado com sucesso.

Verifique os dados carregados

Para confirmar se a tarefa processou os dados conforme o esperado, conclua as seguintes etapas:

  1. No console Athena, escolha Editor de consulta no painel de navegação.
  2. verificar AwsDataCatalog é selecionado como fonte de dados.
  3. Debaixo banco de dados, escolha o domínio de dados que deseja explorar, com base na configuração definida no armazenamento de parâmetros. Se estiver usando o banco de dados de amostra fornecido, use sports.

Debaixo Tabelas e visualizações, podemos ver a lista de tabelas que foram criadas pelo trabalho do AWS Glue.

  1. Escolha o menu de opções (três pontos) ao lado do primeiro nome da tabela e escolha Visualizar dados.

Você pode ver os dados carregados nas tabelas do Iceberg. Dados iniciais de revisão do Amazon Athena carregados

Executar carregamento de dados incremental

Agora começamos a capturar as alterações de nosso banco de dados relacional e aplicá-las ao data lake transacional. Essa etapa também é dividida em três partes: capturar as alterações, aplicá-las nas tabelas Iceberg e verificar os resultados.

Capturar alterações do banco de dados relacional

Devido à configuração que especificamos, a tarefa de replicação parou após a execução da fase de carga total. Agora reiniciamos a tarefa para adicionar arquivos incrementais com alterações na camada bruta do data lake.

  1. No console do AWS DMS, selecione a tarefa que criamos e executamos anteriormente.
  2. No Opções menu, escolha CV .
  3. Escolha Iniciar tarefa para começar a capturar as alterações.
  4. Para acionar a criação de um novo arquivo no data lake, execute inserções, atualizações ou exclusões nas tabelas de seu banco de dados de origem usando sua ferramenta de administração de banco de dados preferida. Se estiver usando o banco de dados de amostra fornecido, você pode executar os seguintes comandos SQL:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31'; update dms_sample.mlb_data set bats = 'R'
where mlb_id=506560 and bats='L'; update dms_sample.sporting_event set start_date = current_date where id=11 and sold_out=0;
  1. Na página de detalhes da tarefa do AWS DMS, escolha o Estatísticas da tabela guia para ver as alterações capturadas.
    Estatísticas do CDC do AWS DMS
  2. Abra a camada bruta do data lake para encontrar um novo arquivo contendo as alterações incrementais dentro do prefixo de cada tabela, por exemplo, sob o sporting_event prefixo.

O registro com alterações para o sporting_event tabela se parece com a captura de tela a seguir.

Objetos do AWS DMS migrados para S3 com CDC

Observe a Op coluna no início identificada com uma atualização (U). Além disso, o segundo valor de data/hora é a coluna de controle adicionada pelo AWS DMS com a hora em que a alteração foi capturada.

Esquema de arquivo CDC no Amazon S3

Aplicar alterações nas tabelas Iceberg usando o AWS Glue

Agora, executamos o trabalho do AWS Glue novamente e ele processará automaticamente apenas os novos arquivos incrementais, pois o marcador de trabalho está ativado. Vamos revisar como funciona.

A dedupCDCRecords() A função executa a desduplicação de dados porque várias alterações em um único ID de registro podem ser capturadas no mesmo arquivo de dados no Amazon S3. A desduplicação é realizada com base no last_update_time coluna adicionada pelo AWS DMS que indica o carimbo de data/hora de quando a alteração foi capturada. Veja o seguinte código Python:

def dedupCDCRecords(inputDf, keylist): IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize) inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF)) NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'") UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')") finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf) return finalInputDF

Na linha 99, o upsertRecordsSparkSQL() A função executa o upsert de maneira semelhante ao carregamento inicial, mas desta vez com um comando SQL MERGE.

Revise as alterações aplicadas

Abra o console do Athena e execute uma consulta que selecione os registros alterados no banco de dados de origem. Se estiver usando o banco de dados de amostra fornecido, use uma das seguintes consultas SQL:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Dados do cdc de revisão do Amazon Athena carregados

Monitorar ingestão de tabelas

O script de trabalho do AWS Glue é codificado com simples Manipulação de exceção do Python para capturar erros durante o processamento de uma tabela específica. O marcador de trabalho é salvo depois que cada tabela termina o processamento com êxito, para evitar o reprocessamento de tabelas se a execução do trabalho for repetida para as tabelas com erros.

A Interface de linha de comando da AWS (AWS CLI) fornece um get-job-bookmark comando para AWS Glue que fornece informações sobre o status do marcador para cada tabela processada.

  1. No console do AWS Glue Studio, escolha o trabalho ETL.
  2. Escolha o Execuções de trabalho guia e copie o ID de execução da tarefa.
  3. Execute o seguinte comando em um terminal autenticado para a AWS CLI, substituindo <GLUE_JOB_RUN_ID> na linha 1 com o valor que você copiou. Se sua pilha do CloudFormation não for nomeada transactionaldl-postgresql, forneça o nome do seu trabalho na linha 2 do script:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

Nesta solução, quando o processamento de uma tabela causa uma exceção, o trabalho do AWS Glue não falhará de acordo com essa lógica. Em vez disso, a tabela será adicionada a uma matriz que será impressa após a conclusão do trabalho. Nesse cenário, o trabalho será marcado como com falha após tentar processar o restante das tabelas detectadas na fonte de dados brutos. Desta forma, tabelas sem erros não precisam esperar até que o usuário identifique e resolva o problema nas tabelas conflitantes. O usuário pode detectar rapidamente execuções de trabalho que tiveram problemas usando o status de execução de trabalho do AWS Glue e identificar quais tabelas específicas estão causando o problema usando os logs do CloudWatch para a execução do trabalho.

  1. O script de trabalho implementa esse recurso com o seguinte código Python:
# Performed for every table try: # Table processing logic except Exception as e: logger.info(f'There is an issue with table: {tableName}') logger.info(f'The exception is : {e}') errored_table_list.append(tableName) continue job.commit()
if (len(errored_table_list)): logger.info('Total number of errored tables are ',len(errored_table_list)) logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ') raise Exception(f'***** Some tables failed to process.')

A captura de tela a seguir mostra como os logs do CloudWatch procuram tabelas que causam erros no processamento.

Monitoramento de trabalhos do AWS Glue com logs

Alinhado com o Lente de análise de dados do AWS Well-Architected Framework práticas, você pode adaptar mecanismos de controle mais sofisticados para identificar e notificar as partes interessadas quando erros aparecem nos pipelines de dados. Por exemplo, você pode usar um Amazon DynamoDB tabela de controle para armazenar todas as tabelas e execuções de trabalho com erros, ou usando Serviço de notificação simples da Amazon (Amazon SNS) para enviar alertas aos operadores quando determinados critérios são atendidos.

Agendar carregamento incremental de dados em lote

A pilha do CloudFormation implanta um Amazon Event Bridge regra (desativada por padrão) que pode acionar a execução do trabalho do AWS Glue em uma programação. Para fornecer sua própria programação e habilitar a regra, conclua as seguintes etapas:

  1. No console do EventBridge, escolha Regras no painel de navegação.
  2. Pesquise a regra prefixada com o nome de sua pilha do CloudFormation seguida por JobTrigger (por exemplo, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Escolha a regra.
  4. Debaixo Calendário de eventos, escolha Editar.

A programação padrão é configurada para disparar a cada hora.

  1. Forneça a programação em que deseja executar o trabalho.
  2. Além disso, você pode usar um Expressão cron do EventBridge selecionando Um cronograma refinado.
    Tarefa ETL de agendamento do Amazon EventBridge
  3. Ao terminar de configurar a expressão cron, escolha Próximo três vezes e, finalmente, escolha regra de atualização para salvar as alterações.

A regra é criada desativada por padrão para permitir que você execute o carregamento de dados inicial primeiro.

  1. Ative a regra escolhendo permitir.

Você pode usar o do Paciente guia para visualizar invocações de regras ou diretamente no AWS Glue Execução de trabalho Detalhes.

Conclusão

Depois de implantar esta solução, você automatizou a ingestão de suas tabelas em uma única fonte de dados relacional. As organizações que usam um data lake como plataforma de dados central geralmente precisam lidar com várias, às vezes até dezenas de fontes de dados. Além disso, cada vez mais casos de uso exigem que as organizações implementem recursos transacionais no data lake. Você pode usar esta solução para acelerar a adoção de tais recursos em todas as suas fontes de dados relacionais para habilitar novos casos de uso de negócios, automatizando o processo de implementação para obter mais valor de seus dados.


Sobre os autores

Luís Gerardo BaezaLuís Gerardo Baeza é arquiteto de big data no laboratório de dados da Amazon Web Services (AWS). Ele tem 12 anos de experiência ajudando organizações nos setores de saúde, financeiro e educação a adotar programas de arquitetura corporativa, computação em nuvem e recursos de análise de dados. Luis atualmente ajuda organizações em toda a América Latina a acelerar iniciativas estratégicas de dados.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu é arquiteto de dados no laboratório de dados da Amazon Web Services (AWS). Ele tem 10 anos de experiência na implementação de processos de carregamento, transformação e visualização de dados. Atualmente, a SaiKiran ajuda organizações na América do Norte a adotar arquiteturas de dados modernas, como data lakes e data mesh. Tem experiência nos setores de varejo, aéreo e financeiro.

Narendra MerlaNarendra Merla é arquiteto de dados no laboratório de dados da Amazon Web Services (AWS). Ele tem 12 anos de experiência em projetar e produzir pipelines de dados em tempo real e orientados a lotes e criar data lakes em ambientes de nuvem e locais. Narendra atualmente ajuda organizações na América do Norte a construir e projetar arquiteturas de dados robustas e tem experiência nos setores de telecomunicações e finanças.

Carimbo de hora:

Mais de Grandes dados da AWS