Nuove funzionalità di Apache Hudi 0.7.0 e 0.8.0 disponibili su Amazon EMR

Nodo di origine: 1882414

Apache Hudi è un framework di data lake transazionale open source che semplifica notevolmente l'elaborazione dati incrementale e lo sviluppo di pipeline di dati fornendo funzionalità di inserimento, aggiornamento ed eliminazione a livello di record. Questa funzionalità a livello di record è utile se stai costruendo i tuoi data lake Servizio di archiviazione semplice Amazon (Amazon S3) o Hadoop File System distribuito (HDFS). Puoi usarlo per rispettare le normative sulla privacy dei dati e semplificare le pipeline di importazione dei dati che gestiscono record in arrivo in ritardo o aggiornati da origini dati in streaming o per acquisire dati utilizzando Change Data Capture (CDC) dai sistemi transazionali. Apache Hudi è integrato con framework di analisi dei big data open source come Apache Spark, Apache Hive, Presto e Trino. Ti consente di mantenere i dati in Amazon S3 o HDFS in formati aperti come Apache Parquet e Apache Avro.

A partire dalla versione 5.28.0, Amazon EMR installa il componente Hudi per impostazione predefinita quando si installa Spark, Hive, Presto o Trino. Dall'inclusione di Apache Hudi in Amazon EMR, sono stati aggiunti numerosi miglioramenti e correzioni di bug ad Apache Hudi. Apache Hudi si è laureato come progetto Apache di primo livello nel giugno 2020.

In questo post, forniamo un riepilogo di alcune delle nuove funzionalità e funzionalità chiave incluse dalle versioni di rilascio di Apache Hudi 0.7.0 e 0.8.0. Queste nuove funzionalità e capacità di Hudi sono disponibili a partire dalle versioni 5.33.0 e 6.3.0 di Amazon EMR:

  • il clustering
  • Elenco di file basato sui metadati
  • Integrazione con Amazon CloudWatch
  • Controllo ottimistico della concorrenza
  • Supporto e miglioramenti alla configurazione di Amazon EMR
  • Integrazione con Apache Flink
  • Kafka esegue il commit dei callback
  • Altri miglioramenti

il clustering

Vediamo più casi d'uso che richiedono l'importazione di un throughput elevato nei data lake. Tuttavia, un'acquisizione più rapida dei dati spesso porta a dimensioni dei file di dati inferiori che spesso influiscono negativamente sulle prestazioni delle query, poiché un numero elevato di file di piccole dimensioni aumenta le costose operazioni di I/O necessarie per restituire i risultati. Un'altra preoccupazione che vediamo è che l'organizzazione dei dati durante l'acquisizione è diversa dall'organizzazione che sarebbe più efficiente quando si interrogano i dati. Ad esempio, è conveniente importare gli ordini di e-commerce in base a OrderDate non appena arrivano, ma quando richiesto, è meglio se gli ordini per un singolo cliente vengono archiviati insieme.

Apache Hudi versione 0.7.0 introduce una nuova funzionalità che consente di raggruppare le tabelle Hudi. Il clustering in Hudi è un framework che fornisce una strategia collegabile per modificare e riorganizzare il layout dei dati ottimizzando al contempo le dimensioni dei file. Con il clustering, ora puoi ottimizzare le prestazioni delle query senza dover rinunciare alla velocità effettiva di acquisizione dei dati.

È possibile utilizzare il clustering per riscrivere i dati utilizzando metodi diversi in base ai diversi requisiti dei casi d'uso:

  • Migliora le prestazioni delle query con la località dei dati – Questo cambia il layout dei dati su disco ordinando i dati su una o più colonne specificate dall'utente. Con questo approccio, possiamo migliorare le prestazioni delle query utilizzando la capacità del formato di file Parquet di eseguire il push-down dei predicati e ignorare i file indesiderati e i gruppi di righe Parquet. Questa strategia può anche controllare la dimensione del file per evitare file di piccole dimensioni.
  • Migliora la freschezza dei dati – Questo requisito presuppone che la località dei dati non sia importante o non sia stata curata già al momento dell'importazione. È ideale per i casi d'uso in cui i dati aggiornati sono importanti, in cui i dati vengono acquisiti utilizzando diversi file di piccole dimensioni e uniti o uniti in seguito utilizzando il framework di clustering.

È possibile eseguire il servizio tabelle di clustering in modo asincrono o sincrono. Introduce inoltre il nuovo tipo di azione REPLACE, che identifica l'azione di clustering nella sequenza temporale dei metadati Hudi.

Nell'esempio seguente, creiamo due tabelle Hudi Copy on Write (CoW): amazon_reviews ed amazon_reviews_clustered utilizzando Amazon EMR versione 6.3.0.

Usiamo spark-shell per creare le tabelle Hudi. Avvia la shell Spark eseguendo quanto segue sul nodo primario Amazon EMR:

spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar

Creiamo quindi la tabella Hudi amazon_reviews utilizzando l'operazione BULK_INSERT e senza il clustering abilitato:

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.spark.sql.SaveMode val srcPath = "s3://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews"
val tablePath = "s3://emr-hudi-test-data/hudi/hudi_080/" + tableName val inputDF = spark.read.format("parquet").load(srcPath) inputDF.write.format("hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date") .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "product_category")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Overwrite) .save(tablePath)

Creiamo quindi la tabella Hudi amazon_reviews_clustered utilizzando l'operazione BULK_INSERT e il clustering inline abilitato e ordinato per colonne star_rating ed total_votes:

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.spark.sql.SaveMode val srcPath = "s3://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews_clustered"
val tablePath = "s3://emr-hudi-test-data/hudi/hudi_080/" + tableName val inputDF = spark.read.format("parquet").load(srcPath) inputDF.write .format("hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date") .option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true")
.option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0") .option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43") .option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100")
.option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, "star_rating,total_votes") .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "product_category")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Overwrite) .save(tablePath)

Esaminiamo queste due tabelle e convalidiamo la differenza di prestazioni. Per convalidare le prestazioni, utilizzeremo Spark SQL CLI, un comodo strumento per eseguire il servizio di metastore Hive in modalità locale ed eseguire query di input dalla riga di comando. Per avviare Spark SQL CLI, eseguiamo il seguente comando:

spark-sql --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" —conf "spark.hadoop.mapreduce.input.pathFilter.class=org.apache.hudi.hadoop.HoodieROTablePathFilter" —jars /usr/lib/hudi/hudi-spark-bundle.jar

Riavviamo la CLI Spark SQL (spark-sql) sessione tra ogni esecuzione per evitare la memorizzazione nella cache o gli executor a caldo, che potrebbero influire sulle prestazioni della query.

Eseguiamo la query sulla tabella Hudi non in cluster eseguendo quanto segue nel file spark-sql interfaccia:

spark-sql> USE hudi_test;
spark-sql> select review_id from amazon_reviews where star_rating > 3 and total_votes > 10;

Eseguiamo anche la stessa query sulla nostra tabella cluster da spark-sql interfaccia:

spark-sql> USE hudi_test;
spark-sql> select review_id from amazon_reviews_clustered where star_rating > 3 and total_votes > 10;

Confrontiamo le prestazioni di scansione dei file sottostanti per le due diverse tabelle Hudi. La schermata seguente è l'output dell'interfaccia utente di Spark, che mostra le modifiche nei file scansionati per lo stesso numero di righe di output. Per prima cosa vediamo i file scansionati per la tabella Hudi non raggruppata.

Successivamente, vediamo i file scansionati per la tabella Hudi in cluster.

Il numero di file scansionati da Spark è sceso da 1,542 file per il set di dati Hudi non in cluster a 85 file per il set di dati Hudi in cluster per gli stessi identici dati. Inoltre, il numero di record scansionati è stato ridotto da 160,796,570 a 78,845,795.

Abbiamo confrontato le prestazioni della query precedente per il amazon_reviews (non cluster) e amazon_reviews_clustered Set di dati Hudi (in cluster), su Spark SQL, Hive e PrestoDB. La configurazione del cluster utilizzata era 1 leader (m5.4xlarge) e 2 core (m5.4xlarge).

Il grafico seguente fornisce il confronto delle prestazioni della query utilizzando diversi motori per la tabella Hudi, che non è in cluster, e per la tabella Hudi, che è in cluster.

Abbiamo riscontrato che con il clustering abilitato per la tabella Hudi, le prestazioni della query sono aumentate per tutti e tre i motori di query, dal 28% al 63%. La tabella seguente fornisce i dettagli per le prestazioni della query per la tabella Hudi, sia con il clustering abilitato che disabilitato.

Motore di query Tavolo non a grappolo Tabella raggruppata Miglioramento del runtime delle query
Tempo (in secondi) Tempo (in secondi)
Scintilla SQL 21.6 15.4 28.7%
Alveare 96.3 47 51.3%
PrestoDB 11.7 4.3 63.25%

Elenco di file basato sui metadati

Le operazioni di scrittura Hudi come la compattazione, la pulizia e l'indice globale, nonché le query, eseguono un elenco di file system per ottenere la vista corrente delle partizioni e dei file nel set di dati. Per piccoli set di dati, ciò non dovrebbe influire drasticamente sulle prestazioni. Tuttavia, quando si lavora con dati di grandi dimensioni, questa operazione di elenco può influire negativamente sulle prestazioni durante la lettura dei file. Ad esempio, con HDFS come archivio dati sottostante, l'operazione di elenco per un numero elevato di file o partizioni può sovraccaricare HDFS NameNode e influire sulla stabilità del lavoro. Nei casi in cui Amazon S3 viene utilizzato come datastore sottostante, le chiamate O(N) per N partizioni con un numero elevato di file richiedono molto tempo e possono anche causare errori di limitazione.

Con Apache Hudi versione 0.7.0, puoi modificare questo comportamento abilitando l'elenco basato sui metadati per le tabelle Hudi. Questo elenco di partizioni e file è archiviato in una tabella di metadati interna, implementata utilizzando una tabella Hudi Merge on Read (MoR). Questa tabella di metadati può sfruttare tutti i vantaggi della tabella Hudi MoR, che include la capacità di aggiornamenti a bassa latenza e la possibilità di eseguire il commit atomico degli aggiornamenti dei metadati e di eseguire facilmente il rollback se la scrittura non riesce. Inoltre, semplifica la sincronizzazione dei metadati con la tabella Hudi perché entrambi utilizzano una sequenza temporale per la tracciabilità. Questo indice dell'elenco di file viene archiviato utilizzando HFiles per il formato di file di base e di registro per gli aggiornamenti delta. Il formato HFile consente la ricerca puntuale di record specifici in base alla chiave del record. L'obiettivo è ridurre O(N) list call per N partizioni a O(1) get call per leggere i metadati.

Abbiamo confrontato le prestazioni delle query per un set di dati Hudi con l'elenco dei metadati abilitato e non abilitato. Per questo esempio, abbiamo utilizzato un set di dati più grande di 3 TB con Amazon EMR versione 6.3.0. Abbiamo usato il seguente frammento di codice per creare il set di dati abilitato e non abilitato per i metadati impostando il HoodieMetadataConfig.METADATA_ENABLE_PROP (hoodie.metadata.enable) config:

val srcPath = "s3://gbrahmi-demo/3-tb-data_store_sales-parquet/"
val tableName = "tpcds_store_sales_3TB_hudi_080"
val tablePath = "s3://emr-hudi-test-data/hudi/hudi_080/" + tableName val inputDF = spark.read.format("parquet").load(srcPath) inputDF.write .format("hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, "true") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "ss_item_sk,ss_ticket_number") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "ss_sold_date_sk") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ss_ticket_number") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator") .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "ss_sold_date_sk")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Overwrite) .save(tablePath)

Sul lato motore di query, possiamo abilitarlo tramite i seguenti metodi:

  • Fonte dati Spark:
    spark.read.format("hudi") .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, "true") .load(tablePath + "/*")

  • CLI Spark SQL:
    spark-sql --conf "spark.hadoop.hoodie.metadata.enable=true" --jars /usr/lib/hudi/hudi-spark-bundle.jar

  • Alveare:
    hive> SET hoodie.metadata.enable = true;

  • PrestoDB:
    presto:default> set session hive.prefer_metadata_to_list_hudi_files=true;

Abbiamo utilizzato la query seguente per confrontare le prestazioni delle query tramite Hive e PrestoDB:

select count(*) from tpcds_store_sales_3TB_hudi_080 where ss_quantity > 50;

Il grafico seguente fornisce il confronto delle prestazioni della query.

Abbiamo riscontrato che con l'elenco dei metadati, il tempo di esecuzione delle query è diminuito di circa il 25% per il motore Hive e di circa il 32% per PrestoDB. La tabella seguente fornisce i dettagli del runtime di esecuzione delle query con e senza l'elenco dei metadati.

Motore di query Metadati disabilitati Metadati abilitati Miglioramento del runtime delle query
Tempo (in secondi) Tempo (in secondi)
Alveare 415.28533 310.02367 25.35%
Presto 72 48.6 32.50%

Considerazioni sull'elenco dei metadati

Con Hudi 0.7.0 e 0.8.0, potresti non osservare miglioramenti evidenti per le query tramite Spark SQL (con elenco di metadati), perché Hudi si basa su Spark InMemoryFileIndex per eseguire l'elenco dei file effettivo e non è possibile utilizzare i metadati. Potresti osservare miglioramenti perché Felpa con cappuccioROPathFilter utilizza i metadati per il suo filtraggio. Tuttavia, con la versione 0.9.0 di Hudi, stiamo introducendo una personalizzazione FileIndice implementazione per Hudi per utilizzare i metadati per l'elenco dei file invece di fare affidamento su Spark. Pertanto, dalla versione 0.9.0, osserverai notevoli miglioramenti delle prestazioni per le query Spark SQL.

Integrazione con Amazon CloudWatch

Apache Hudi fornisce MetricheReporter implementazioni come JmxMetricsReporter, MetricsGraphiteReportere DatadogMetricsReporter, che puoi utilizzare per pubblicare le metriche nei sink specificati dall'utente. Amazon EMR, con la sua versione 6.4.0 con Hudi 0.8.0, è stato introdotto CloudWatchMetricsReporter, in cui puoi pubblicare queste metriche Amazon Cloud Watch. Aiuta a pubblicare metriche dello scrittore Hudi come durata del commit, durata del rollback, metriche a livello di file (numero di file aggiunti o eliminati per commit), metriche a livello di record (record inseriti o aggiornati per commit) e metriche a livello di partizione (partizioni inserite o aggiornate per commit). Ciò è utile per eseguire il debug dei lavori Hudi e per prendere decisioni sul ridimensionamento del cluster.

Puoi abilitare il parametro CloudWatch tramite le seguenti configurazioni:

hoodie.metrics.on = true
hoodie.metrics.reporter.type = CLOUDWATCH

La tabella seguente riassume le configurazioni aggiuntive che è possibile modificare se necessario.

Configurazione Descrizione Valore
felpa con cappuccio.metrics.cloudwatch.report.period.seconds Frequenza (in secondi) alla quale segnalare i parametri a CloudWatch Il valore predefinito è 60 secondi, che va bene per la risoluzione predefinita di 1 minuto offerta da CloudWatch
felpa con cappuccio.metrics.cloudwatch.metric.prefix Prefisso da aggiungere a ogni nome di metrica Il valore predefinito è vuoto (nessun prefisso)
felpa con cappuccio.metrics.cloudwatch.namespace Spazio dei nomi CloudWatch in cui vengono pubblicati i parametri Il valore predefinito è Hudi
felpa con cappuccio.metrics.cloudwatch.maxDatumsPerRequest Numero massimo di dati da includere in una richiesta a CloudWatch Il valore predefinito è 20, che corrisponde al valore predefinito di CloudWatch

Lo screenshot seguente mostra alcune delle metriche pubblicate per una particolare tabella Hudi, incluso il tipo di metrica e il relativo nome. Questi sono mago a discesa metrica; gauge rappresenta il valore esatto in un determinato momento, e counter rappresenta un semplice intero incrementale o decrescente.

Il seguente grafico del gauge metrica rappresenta i record totali scritti in una tabella nel tempo.

Il seguente grafico del counter metrica rappresenta il numero di commit in aumento nel tempo.

Controllo ottimistico della concorrenza

Una delle principali funzionalità introdotte con Hudi 0.8.0 e disponibile dalla versione 6.4.0 di Amazon EMR è Optimistic Concurrency Control (OCC) per consentire a più writer di assimilare i dati contemporaneamente nella stessa tabella Hudi. Questo è OCC a livello di file, il che significa che per due commit (o writer) che si verificano contemporaneamente sulla stessa tabella, entrambi possono avere esito positivo se non hanno scritture su file sovrapposti. La funzione richiede l'acquisizione di blocchi, per i quali è possibile utilizzare Zookeeper o HiveMetastore. Per maggiori informazioni sulle garanzie fornite, cfr Controllo della concorrenza.

I cluster Amazon EMR hanno Zookeeper installato, che puoi utilizzare come provider di blocco per eseguire scritture simultanee dallo stesso cluster. Per semplificarne l'utilizzo, Amazon EMR preconfigura il provider di blocco nella nuova introduzione /etc/hudi/conf/hudi-defaults.conf file (vedere la sezione successiva) tramite le seguenti proprietà:

hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=<EMR Zookeeper URL>
hoodie.write.lock.zookeeper.port=<EMR Zookeeper Port>
hoodie.write.lock.zookeeper.base_path=/hudi

Sebbene il provider di blocco sia preconfigurato, l'abilitazione di OCC deve comunque essere gestita dagli utenti tramite le opzioni di lavoro Hudi o a livello di cluster tramite l'API di configurazione di Amazon EMR:

hoodie.write.concurrency.mode = optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes = LAZY (Performs cleaning of failed writes lazily instead of inline with every write)
hoodie.write.lock.zookeeper.lock_key = <Key to uniquely identify the Hudi table> (Table Name is a good option)

Supporto e miglioramenti alla configurazione di Amazon EMR

Amazon EMR versione 6.4.0 ha introdotto la possibilità di configurare e riconfigurare Hudi tramite il configurazioni caratteristica. Le configurazioni Hudi necessarie tra lavori e tabelle possono ora essere configurate a livello di cluster tramite il hudi-defaults classificazione o /etc/hudi/conf/hudi-defaults.conf file, simile ad altre applicazioni come Spark e Hive. Il codice seguente è un esempio di hudi-defaults classificazione per abilitare l'elenco basato sui metadati e i parametri CloudWatch:

[{ "Classification": "hudi-defaults", "Properties": { "hoodie.metadata.enable": "true", "hoodie.metadata.insert.parallelism": "3000", "hoodie.metrics.on": "true", "hoodie.metrics.reporter.type": "CLOUDWATCH" }
}]

Amazon EMR configura automaticamente le impostazioni predefinite adatte per alcune configurazioni, per migliorare l'esperienza dell'utente eliminando la necessità per i clienti di passarle:

  • HIVE_URL_OPT_KEY è configurato sull'URL del server Hive del cluster e non deve più essere specificato. Ciò è particolarmente utile quando si esegue un lavoro in modalità cluster Spark, in cui gli utenti in precedenza dovevano determinare e specificare autonomamente l'IP primario Amazon EMR.
  • Configurazioni specifiche di HBase, utili per utilizzare l'indice HBase con Hudi.
  • Configurazione specifica del provider di blocco Zookeeper, come discusso sotto il controllo della concorrenza, che semplifica l'uso di OCC.

Sono state introdotte ulteriori modifiche per ridurre il numero di configurazioni che gli utenti devono superare e per dedurre automaticamente ove possibile:

Integrazione con Apache Flink

Apache Hudi ha iniziato con un'integrazione molto stretta con Apache Spark. Con la versione 0.7.0, ora sono disponibili integrazioni per importare i dati utilizzando Apache Flink. Richiedeva il disaccoppiamento di Spark dal formato tabella interno, dai writer e dal codice dei servizi tabella in un modo che potesse essere utilizzato da altri motori in evoluzione nel settore come Flink.

Hudi 0.7.0 fornisce il supporto iniziale di Flink tramite HooodieFlinkStreamer, che puoi utilizzare per scrivere tabelle CoW eseguendo lo streaming di dati da un argomento Kafka utilizzando Apache Flink. Ad esempio, puoi utilizzare il seguente comando Flink per iniziare a leggere l'argomento ExampleTopic dai broker Kafka broker-1, broker-2e broker-3 in esecuzione sulla porta 9092:

./bin/flink run -c org.apache.hudi.HoodieFlinkStreamer -m yarn-cluster -d -yjm 1024 -ytm 1024 -p 4 -ys 3 -ynm hudi_on_flink_example /usr/lib/hudi/hudi-flink-bundle.jar --kafka-topic ExampleTopic --kafka-group-id <kafka-group-id> --kafka-bootstrap-servers broker-1:9092,broker-2:9092,broker-3:9092 --table-type COPY_ON_WRITE --target-table hudi_flink_table --target-base-path s3://emr-hudi-test-data/hudi/hudi_070/hudi_flink_table --props hdfs:///hudi/flink/config/hudi-jobConf.properties --checkpoint-interval 6000 --flink-checkpoint-path hdfs:///hudi/hudi-flink-checkpoint-dir

Con Hudi 0.8.0, sono stati apportati importanti miglioramenti alle prestazioni e alla scalabilità dell'integrazione di Flink, nonché all'introduzione di nuove funzionalità come il connettore SQL sia per sorgente che per sink, scrittore per MoR, lettore batch per CoW e MoR, lettore di streaming per MoR e indicizzazione supportata dallo stato con supporto bootstrap. Per ulteriori informazioni sulla progettazione dell'integrazione di Flink, vedere Apache Hudi incontra Apache Flink. Per iniziare con Flink SQL, vedere Guida di sfarfallio.

Kafka esegue il commit dei callback

La versione precedente (0.6.0) di Apache Hudi ha introdotto la funzionalità di callback del commit di scrittura. Con questa funzionalità, Hudi può inviare un messaggio di richiamata ogni volta che arriva un commit riuscito al set di dati Hudi. Il metodo HTTP supportato dalla callback del commit di scrittura nella versione precedente. Con la versione 0.7.0 di Apache Hudi, Hudi ora supporta anche il callback del commit di scrittura per Kafka. L'utilizzo di Kafka per l'invio dei messaggi di callback per ogni commit riuscito può ora consentire di creare pipeline di dati asincroni o logiche di elaborazione aziendale ogni volta che il set di dati Hudi rileva un nuovo commit. È ora possibile creare pipeline ETL incrementali per l'elaborazione di nuovi eventi che arrivano nel data lake Hudi.

L'implementazione di Kafka commit callback utilizza HoodieWriteCommitKafkaCallback la hoodie.write.commit.callback.class. Oltre a impostare la classe di callback del commit, puoi anche impostare parametri aggiuntivi per i server bootstrap Kafka e le configurazioni degli argomenti.

Quello che segue è un frammento di codice in cui i messaggi di callback di commit vengono pubblicati nell'argomento Kafka ExampleTopic ospitato sui broker Kafka b-1.demo-hudi.xxxxxx.xxx.kafka.us-east-1.amazonaws.com, b-2.demo-hudi.xxxxxx.xxx.kafka.us-east-1.amazonaws.come b-3.demo-hudi.xxxxxx.xxx.kafka.us-east-1.amazonaws.com quando si scrive su un set di dati Hudi:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "trips_data_kafka_callback"
val tablePath = "s3://gbrahmi-sample-bucket/hudi-dataset/hudi_kafka_callback/" + tableName val dataGen = new DataGenerator(Array("2021/05/01"))
val updates = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) df.write.format("hudi"). option(TABLE_NAME, tableName). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option("hoodie.write.commit.callback.on", "true"). option("hoodie.write.commit.callback.class", "org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback"). option("hoodie.write.commit.callback.kafka.bootstrap.servers", "b-1.demo-hudi.xxxxxx.xxx.kafka.us-east-1.amazonaws.com:9092,b-2.demo-hudi.xxxxxx.xxx.kafka.us-east-1.amazonaws.com:9092,b-3.demo-hudi.xxxxxx.xxx.kafka.us-east-1.amazonaws.com:9092"). option("hoodie.write.commit.callback.kafka.topic", "ExampleTopic"). option("hoodie.write.commit.callback.kafka.acks", "all"). option("hoodie.write.commit.callback.kafka.retries", 3). mode(Append). save(tablePath)

Di seguito è come appaiono i messaggi nel tuo argomento Kafka:

{"commitTime":"20210508210230","tableName":"trips_data_kafka_callback","basePath":"s3:// gbrahmi-sample-bucket/hudi-dataset/hudi_kafka_callback/trips_data_kafka_callback"}

Una pipeline a valle ora può interrogare facilmente questi eventi da Kafka ed elaborare i dati incrementali in tabelle Hudi derivate.

Altri miglioramenti

Oltre ai miglioramenti sopra menzionati, ci sono state alcune modifiche aggiuntive degne di nota. Dal lato dello scrittore, ci sono i seguenti miglioramenti:

  • Supporto per Spark 3 – Il supporto per la scrittura e l'interrogazione dei dati utilizzando Apache Spark 3 è ora disponibile con Apache Hudi 0.7.0 in poi. Funziona con il bundle Scala 2.12 per hudi-spark-bundle.
  • Inserisci sovrascrittura e inserisci sovrascrittura operazioni di scrittura della tabella – Apache Hudi 0.7.0 introduce due nuove operazioni, insert_overwrite ed insert_overwrite_table, per supportare lavori ETL batch in cui un'intera tabella o partizione viene sovrascritta durante ogni esecuzione. Puoi usare queste operazioni invece di upsert operazione, ed è più economico da eseguire.
  • eliminare le partizioni – La nuova API è ora disponibile dalla 0.7.0 per eliminare un'intera partizione. Ciò consente di evitare l'uso di eliminazioni a livello di record.
  • Supporto per lo scrittore Java – Hudi 0.7.0 ha introdotto il supporto per la scrittura basato su Java tramite il HoodieJavaWriteClient classe.

Allo stesso modo, sul lato dell'integrazione delle query, sono stati apportati i seguenti miglioramenti:

  • Letture in streaming strutturate – Hudi 0.8.0 ha introdotto un'implementazione della sorgente di streaming strutturata Spark tramite il HoodieStreamSource classe. Puoi usarlo per supportare le letture in streaming dalle tabelle Hudi.
  • Interrogazione incrementale su MoR – Da Hudi 0.7.0, ora abbiamo il supporto delle query incrementali per le tabelle MoR, che puoi utilizzare per estrarre dati in modo incrementale dalle applicazioni downstream.

Conclusione

Le nuove funzionalità introdotte in Apachi Hudi ti consentono di creare soluzioni disaccoppiate utilizzando funzionalità come il callback del commit Kafka e l'integrazione di Flink con Apache Hudi con Amazon EMR. Puoi anche migliorare le prestazioni complessive del data lake Hudi usando le funzionalità di clustering e tabelle di metadati.


Informazioni sugli autori

Audit Mehrotra è un ingegnere di sviluppo software presso Amazon Web Services e un membro/committente di Apache Hudi PMC. Lavora sulle funzionalità all'avanguardia di Amazon EMR ed è anche coinvolto in progetti open source come Apache Hudi, Apache Spark, Apache Hadoop e Apache Hive. Nel tempo libero gli piace suonare la chitarra, viaggiare, fare baldoria e uscire con gli amici.

Gagan Brahmi è un architetto specializzato in soluzioni focalizzato su Big Data e analisi presso Amazon Web Services. Gagan ha oltre 16 anni di esperienza nella tecnologia dell'informazione. Aiuta i clienti a progettare e creare soluzioni basate su cloud altamente scalabili, performanti e sicure su AWS.

Source: https://aws.amazon.com/blogs/big-data/new-features-from-apache-hudi-0-7-0-and-0-8-0-available-on-amazon-emr/

Timestamp:

Di più da AWS