Cómo Amazon Transportation Service habilitó el análisis de eventos casi en tiempo real a escala de petabytes usando AWS Glue con Apache Hudi

Nodo de origen: 1189861

Esta publicación está coescrita con Madhavan Sriram y Diego Menin de Amazon Transportation Services (ATS).

La industria del transporte y la logística cubre una amplia gama de servicios, como el transporte multimodal, el almacenamiento, el cumplimiento, el transporte de carga y la entrega. En Amazon Transportation Service (ATS), el ciclo de vida del envío se rastrea digitalmente y se adjunta a decenas de actualizaciones de rastreo en promedio. Esas actualizaciones de seguimiento son vitales para iniciar eventos a lo largo del ciclo de vida operativo y de facturación del envío, incluida la identificación de retrasos y la optimización de rutas. También son la base para la experiencia de seguimiento de clientes y consumidores a través de los diferentes puntos de contacto.

En esta publicación, discutimos cómo ATS permitió el análisis de eventos casi en tiempo real a escala de petabytes utilizando tablas de Apache Hudi creadas por Pegamento AWS Chispa de trabajos.

ATS estaba buscando formas de administrar y obtener información analítica de forma segura y rentable sobre conjuntos de datos del tamaño de petabytes, con datos provenientes de diferentes fuentes a diferentes ritmos y almacenados en diferentes soluciones de almacenamiento. Puede obtener conocimientos más profundos y ricos cuando reúne todos sus datos relevantes de todas las estructuras y tipos, de todas las fuentes, para analizar.

Uno de los principales desafíos que enfrentó nuestro equipo de ingeniería de datos en ATS fue reunir todos los datos que llegaban en tiempo real y crear una visión holística para nuestros clientes y socios. ATS opera la mayoría de los pedidos realizados a través de Amazon, uno de los minoristas en línea más grandes del mundo, para el transporte y la logística. ATS proporciona a la empresa una entrega de paquetes precisa y oportuna. Las operaciones de ATS generan datos a escala de petabytes, por lo que tener los datos disponibles al alcance de la mano brinda innumerables oportunidades para mejorar las operaciones a través de la toma de decisiones basada en datos.

apache hudi es un marco de gestión de datos de código abierto que se utiliza para simplificar el procesamiento de datos incrementales y el desarrollo de canalizaciones de datos. Este marco administra de manera más eficiente los requisitos comerciales, como los ciclos de vida de los datos, y mejora la calidad de los datos. Hudi le permite administrar datos a nivel de registro en Servicio de almacenamiento simple de Amazon (Amazon S3) lagos de datos para simplificar la captura de cambios de datos (CDC) y la ingesta de datos de transmisión a escala de petabytes, y ayuda a manejar los casos de uso de privacidad de datos que requieren actualizaciones y eliminaciones de nivel de registro.

Resumen de la solución

Uno de los mayores desafíos que enfrentó ATS fue el manejo de datos a escala de petabytes con la necesidad de inserciones, actualizaciones y eliminaciones constantes con un retraso de tiempo mínimo, lo que refleja escenarios comerciales reales y movimiento de paquetes a consumidores de datos posteriores.

Sus almacenes de datos tradicionales no podían escalar al tamaño de los datos ni a la frecuencia de ingesta de datos. Necesitaban escalar a cientos de GB de datos a través de múltiples fuentes de ingesta de datos para obtener datos casi en tiempo real para que los consumidores intermedios los usaran para análisis de datos que impulsaron informes, tableros y visualizaciones críticos para el negocio. Los datos también se utilizan para entrenar modelos de aprendizaje automático con acuerdos de nivel de servicio (SLA) generales de 15 minutos para la ingesta y el procesamiento de datos.

En esta publicación, mostramos cómo ingerimos datos en tiempo real del orden de cientos de GB por hora y ejecutamos inserciones, actualizaciones y eliminaciones en un lago de datos a escala de petabytes usando Tablas Apache Hudi cargados con trabajos de AWS Glue Spark y otros servicios sin servidor de AWS, incluidos AWS Lambda, Manguera de bomberos de datos de Amazon Kinesisy Amazon DynamoDB. AWS ProServe, en estrecha colaboración con ATS, creó un lago de datos compuesto por tablas de Apache Hudi en Amazon S3 creadas y pobladas con AWS Glue. Se creó una canalización de datos que admite inserciones, actualizaciones y eliminaciones a escala de petabytes en las tablas de Apache Hudi mediante AWS Glue. Para admitir la ingesta en tiempo real, ATS también implementó una canalización de ingesta de datos en tiempo real basada en Kinesis Data Firehose, DynamoDB y Secuencias de Amazon DynamoDB.

Para abordar los desafíos que discutimos, decidimos seguir el enfoque "Divide et Impera" y definir dos flujos de trabajo separados:

  • basado en flujo – Ingerimos datos de cuatro fuentes de datos diferentes y 11 conjuntos de datos, y realizamos algunos pasos iniciales de transformación y unión de datos, respetando una ventana de tiempo que puede variar de 3 horas a 2 semanas en todas las cargas de trabajo. La tasa de eventos puede llegar a miles de eventos por segundo y los eventos pueden tener duplicados, llegar tarde o no estar en el orden correcto. Nuestro objetivo era comprender en tiempo real el estado de tránsito de un paquete o camión determinado, capturar el estado actual de las operaciones ATS en tiempo real y ampliar la solución actual basada en flujo para descargar y complementar la extracción, transformación y carga actuales ( solución ETL), basada en Desplazamiento al rojo de Amazon.
  • Lago de datos – Queríamos la capacidad de almacenar petabytes de datos y permitir fusiones entre datos históricos (petabytes) con datos recién ingeridos. La política de retención de datos se extiende hasta 5 años, lo que genera un aumento de los costos y reduce significativamente el rendimiento. Nuestro equipo requiere acceso a datos casi en tiempo real (menos de 15 minutos) de la ingestión basada en transmisión, con pleno cumplimiento de GDPR. Nuestro objetivo era fusionar archivos de datos ingeridos basados ​​en secuencias para obtener una vista holística del conjunto de datos en un momento determinado, con un SLA de menos de 15 minutos. También sería bueno tener capacidades de linaje de datos.

Solución basada en flujo

El siguiente diagrama ilustra la arquitectura de nuestra solución basada en secuencias.

El flujo de la solución es el siguiente:

  1. Los datos se incorporan de varias fuentes en flujos de datos separados de Firehose, se recopilan durante un máximo de 15 minutos y se almacenan en depósitos S3.
  2. Con la llegada de cada archivo nuevo a Amazon S3, se activa una función de Lambda para insertar datos en una tabla de DynamoDB asociada con una fuente de datos o conjuntos de datos específicos.
  3. Con DynamoDB Streams, activamos una segunda función de Lambda que agrega datos en tiempo real en las diferentes tablas de DynamoDB mediante la realización de búsquedas de tablas de DynamoDB en tiempo real. La ventana ETL se aplica mediante el elemento TTL de DynamoDB, por lo que los datos se eliminan automáticamente de la tabla una vez que expira el período TTL.
  4. Una vez transformados, los datos se recopilan en Amazon S3 y pasan por un flujo de entrega de Firehose y están listos para incorporarse a nuestro lago de datos.

La solución nos permite hacer lo siguiente:

  • Ingiera datos en paralelo, en tiempo real y a la escala deseada de todas las fuentes de datos
  • Escale bajo demanda y con una sobrecarga operativa humana mínima; esto se logra utilizando una pila de tecnología sin servidor de AWS
  • Implemente nuestra ventana de tiempo deseada por artículo, reduciendo los costos y la cantidad total de datos almacenados
  • Implemente ETL usando funciones de Lambda en Python, proporcionando así una mejor comprensión sobre la expresión de la lógica empresarial.
  • Acceda a los datos en Amazon S3 antes de que se ingresen en nuestro lago de datos y permita que los clientes y socios consuman datos en formato sin formato si es necesario

Los datos presentes en Amazon S3 representan el punto de partida para una perfecta integración del lago de datos.

Ingesta de lago de datos

Pasando a nuestro lago de datos, el siguiente diagrama ilustra nuestra arquitectura para la ingestión del lago de datos.

La implementación central en esta arquitectura es el trabajo de ingestión de AWS Glue Spark para la tabla Hudi; representa el punto de entrada para la canalización de procesamiento de datos incremental.

El trabajo de AWS Glue Spark se ejecuta con una simultaneidad de 1 y contiene la lógica para upsert y delete aplicada secuencialmente en la tabla de Hudi. La secuencia de eliminación después de upsert en el trabajo de AWS Glue Spark garantiza que las eliminaciones se apliquen después de upsert y que se mantenga la coherencia de los datos incluso en caso de que se vuelva a ejecutar el trabajo.

Para usar Apache Hudi v0.7 en trabajos de AWS Glue mediante PySpark, importamos las siguientes bibliotecas en los trabajos de AWS Glue, extraídas localmente del nodo principal de EMR de Amazon:

  • hudi-spark-bundle_2.11-0.7.0-amzn-1.jar
  • spark-avro_2.11-2.4.7-amzn-1.jar

Recomendamos usar Glue 3.0 con Conector Hudi 0.9.0 en lugar de importar archivos jar de Hudi v0.7 desde EMR, para una integración perfecta y tener más capacidades y funciones.

Antes de insertar datos en la tabla Hudi, la preparamos para el envío. Para optimizar la combinación incremental, tomamos una ventana de búsqueda fija basada en consideraciones de casos de uso comercial. Comenzamos leyendo datos históricos en una ventana de tiempo determinada. Ver el siguiente código:

# HUDI DATA READ
read_options = { 'hoodie.datasource.query.type': 'snapshot'
} # HUDI DATAFRAME created from target Hudi Table on S3
hudi_df = spark. read. format("hudi"). options(*read_options). load(config['target'] + "////*") # Read Historical data set, load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery # input_df is the INCREMENT DATAFRAME created from incrementally ingested data on S3
input_df = spark.read.format("csv"). options(header='true').load(config['incremental_path']) window_year, window_month, window_day = year_month_day_window()
window_filter_clause = "year >= {} AND month >= {} and day >= {}".format(window_year, window_month, window_day) # We merge it with the incoming newly available data: # Data from Hudi Table on S3, because our use case is global, id is unique else id + partitionPath = unique.
hudi_s3_df = hudi_df.select(col("node_id"),col("container_label"),col(config['sort_key'])).filter(window_filter_clause) # Perform a left outer join between new data (input_df) and data present in S3 Hudi. (hudi_s3_df)
hudi_join_df = input_df.alias("incomingData").join(hudi_s3_df.alias("S3HudiData"), (input_df.node_id == hudi_s3_df.node_id) & (input_df.container_label == hudi_s3_df.container_label), "leftouter") # As it's a Left Outer join, there might bew new records which aren't present on S3 Hudi. hudi_new_df = hudi_join_df.filter(col("S3HudiData.last_update_row_epoch_seconds").isNull()).drop(col("S3HudiData.node_id")).drop(col("S3HudiData.container_label")).drop(col("S3HudiData.last_update_row_epoch_seconds")) # As it's a Left Outer join, Select the records where input_df.last_update_time > hudi_s3_df.last_update_time. hudi_updated_df = hudi_join_df.filter(col("S3HudiData.last_update_row_epoch_seconds").isNotNull() & (col("incomingData.last_update_row_epoch_seconds") > col("S3HudiData.last_update_row_epoch_seconds"))).drop(col("S3HudiData.node_id")).drop(col("S3HudiData.container_label")).drop(col("S3HudiData.last_update_row_epoch_seconds"))
hudi_final_df = hudi_new_df.union(hudi_updated_df) # After we prepare the data to be pushed in the Hudi table, we implement the Hudi table update using the following code: (hudi_final_df.write.format(HUDI_FORMAT)
.option(TABLE_NAME, config['hudi_table_name'])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(PARTITIONPATH_FIELD_OPT_KEY,config["partition_keys"])
.option(KEYGENERATOR_CLASS_OPT_KEY, COMPLEX_KEYGENERATOR_CLASS_OPT_VAL)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 1500)
.option('hoodie.payload.ordering.field',config["sort_key"])
.option(PAYLOAD_CLASS_OPT_KEY,'org.apache.hudi.common.model.DefaultHoodieRecordPayload')
.option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
.option(HIVE_DATABASE_OPT_KEY,config['hudi_database'])
.option(HIVE_TABLE_OPT_KEY,config['hudi_table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_JDBC_SYNC,"false")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
.option('hoodie.datasource.write.hive_style_partitioning', 'true')
# To switch to Global Bloom index, set the following configuration as GLOBAL_BLOOM.
.option('hoodie.index.type', 'GLOBAL_SIMPLE') .option('hoodie.simple.index.update.partition.path', 'true')
.option('hoodie.global.simple.index.parallelism', '500')
.mode("append")
.save(config['target']))

En el código anterior, config es un diccionario que incluye todos los Configuraciones de Apache Hudi. El catálogo de datos de AWS Glue se sincroniza automáticamente después de la creación de la tabla de Hudi, como parte del trabajo de Glue, lo que refleja la estructura de partición de Amazon S3. Ahora podemos consultar los datos usando Atenea amazónica or Espectro de Redshift de Amazon.

Para cumplir con nuestro estricto SLA de ingestión interna, tuvimos que dedicar especial atención a emplear el derecho Índices Hudi y definiendo el tipo de tabla correcto. Para este último, analizamos el tipo de carga de trabajo. Debido a la naturaleza analítica de los conjuntos de datos y el caso de uso, identificamos que la configuración correcta sería usar un COPY_ON_WRITE tabla, incluso si eso fue un compromiso en el rendimiento de escritura pero mejoró el rendimiento de lectura.

Para lo primero, pasamos por una fase de experimentación. Empezamos con un GLOBAL_BLOOM índice, identificando un patrón no lineal inicial para el rendimiento de escritura de datos.

Dada la aleatoriedad y la ventana de tiempo especificada para los datos de entrada, hemos encontrado una cantidad significativa de falsos positivos, lo que lleva a leer todo el conjunto de datos para comparar. Es más, GLOBAL_BLOOM sigue aumentando linealmente correspondiente al tamaño de los datos, mientras que GLOBAL_SIMPLE no trae esta sobrecarga (con una ventana de búsqueda fija) como se puede observar en el diagrama.

El gráfico representa el tiempo total empleado por el trabajo de Glue Hudi (eje X) a lo largo de los días (eje Y) a medida que los datos entrantes se fusionan con los datos históricos. aprovechando GLOBAL_BLOOM. El gráfico en la mitad superior muestra que cuando los mismos datos se combinaron consecutivamente durante días, se observó un aumento de tiempo no lineal. El gráfico de la mitad inferior indica un aumento lineal con una pendiente pronunciada cuando los nuevos datos entrantes se fusionaron con los datos históricos.

GLOBAL_BLOOM no era apropiado para nuestro caso de uso, ya que los datos históricos se remontaban a 5 años, y el trabajo de Glue no podrá cumplir con las demandas de SLA. En este punto investigamos GLOBAL_SIMPLE índices, alcanzando los patrones de desempeño esperados.

Nuestra solución de lago de datos nos permite hacer lo siguiente:

  • Ingerir archivos de datos en un lago de datos a escala de petabytes, con un SLA de ingestión de 15 minutos desde el momento en que recibimos los datos
  • Lea los datos a escala de Peta Byte aprovechando las particiones de Amazon S3 (creadas por trabajos de Glue y asignadas a la lógica de partición de Hudi) y búsquedas más rápidas mediante el uso de índices de Hudi
  • Utilice las capacidades de linaje de datos de Hudi
  • Reduzca los costos de almacenamiento de datos, mantenimiento de infraestructura y desarrollo
  • Administre el gobierno de datos usando Formación del lago AWS, que permite a los socios y clientes consultar los datos utilizando sus propias herramientas, al tiempo que permite que ATS mantenga el control sobre nuestros datos

Conclusión

En esta publicación, destacamos cómo ATS creó una plataforma de ingestión de datos totalmente sin servidor en tiempo real, escalando hasta miles de eventos por segundo y fusionándose con datos históricos del tamaño de petabytes almacenados en un lago de datos casi en tiempo real.

Creamos una solución de lago de datos a escala de petabytes basada en Apache Hudi y AWS Glue que nos permite compartir nuestros datos dentro de los 15 minutos posteriores a la ingestión con nuestros socios y consumidores, al mismo tiempo que conservamos el control total sobre nuestros datos y descargamos automáticamente los costos por el consumo de datos. Esto proporciona un rendimiento lineal a medida que los datos crecen con el tiempo.

Acerca del servicio de transporte de Amazon

Amazon Transportation Service (ATS) es la milla intermedia de la red de transporte de Amazon, que conecta los centros de distribución en un extremo y las estaciones de entrega y las oficinas de correos en el otro extremo. Permitimos que los paquetes que se piden y empaquetan en los centros logísticos que atraviesan el continente europeo se entreguen en la estación de entrega final que realiza la entrega de casa en casa.


Acerca de los autores

Madhavan Sriram es un gerente de ciencia de datos que cuenta con una amplia experiencia en múltiples organizaciones empresariales en el espacio de las tecnologías Big Data y Machine Learning. Actualmente dirige el equipo de Productos y tecnología de datos dentro de Amazon Transportation Services (ATS) y crea productos intensivos en datos para la red de transporte dentro de Amazon. En su tiempo libre, Madhavan disfruta de la fotografía y la poesía.

Diego Menín es un ingeniero de datos sénior dentro del equipo de productos y tecnología de datos. Viene con una amplia experiencia en nuevas empresas y empresas con una profunda experiencia en AWS para desarrollar productos de análisis y datos escalables basados ​​en la nube. Dentro de Amazon, es el arquitecto del lago de datos de transporte de Amazon y trabaja intensamente en la transmisión de datos y los mecanismos de integración con aplicaciones posteriores a través del lago de datos.

gabriele cacciola es un arquitecto de datos sénior que trabaja para el equipo de servicios profesionales de Amazon Web Services. Viniendo de una sólida experiencia en Startups, actualmente ayuda a los clientes empresariales en EMEA a implementar sus ideas, innovar utilizando la última tecnología y crear soluciones de análisis y datos escalables para tomar decisiones comerciales críticas. En su tiempo libre, Gabriele disfruta del fútbol y de la cocina.

Kunal Gautama es Arquitecto Senior de Big Data en Amazon Web Services. Con experiencia en la creación de su propia Startup y trabajando junto con empresas, aporta una perspectiva única para hacer que las personas, los negocios y la tecnología trabajen en conjunto para los clientes. Le apasiona ayudar a los clientes en su viaje de transformación digital y les permite crear datos escalables y soluciones analíticas avanzadas para obtener información oportuna y tomar decisiones comerciales críticas. En su tiempo libre, Kunal disfruta de maratones, encuentros tecnológicos y retiros de meditación.

Fuente: https://aws.amazon.com/blogs/big-data/how-amazon-transportation-service-enabled-near-real-time-event-analytics-at-petabyte-scale-using-aws-glue- con-apache-hudi/

Sello de tiempo:

Mas de AWS