O Amazon EMR Serverless oferece suporte a tamanhos de trabalho maiores para executar mais cargas de trabalho com uso intenso de computação e memória

O Amazon EMR Serverless oferece suporte a tamanhos de trabalho maiores para executar mais cargas de trabalho com uso intenso de computação e memória

Nó Fonte: 1960092

Amazon EMR sem servidor permite executar estruturas de big data de código aberto, como Apache Spark e Apache Hive, sem gerenciar clusters e servidores. Com o EMR Serverless, você pode executar cargas de trabalho analíticas em qualquer escala com dimensionamento automático que redimensiona os recursos em segundos para atender aos volumes de dados em constante mudança e aos requisitos de processamento. O EMR Serverless dimensiona automaticamente os recursos para cima e para baixo para fornecer a quantidade certa de capacidade para seu aplicativo.

Temos o prazer de anunciar que o EMR Serverless agora oferece configurações de trabalho de 8 vCPUs com até 60 GB de memória e 16 vCPUs com até 120 GB de memória, permitindo que você execute mais cargas de trabalho com uso intensivo de memória e computação no EMR Serverless. Um aplicativo EMR Serverless usa internamente trabalhadores para executar cargas de trabalho. e você pode definir diferentes configurações de trabalho com base em seus requisitos de carga de trabalho. Anteriormente, a maior configuração de trabalho disponível no EMR Serverless era de 4 vCPUs com até 30 GB de memória. Esse recurso é especialmente benéfico para os seguintes cenários comuns:

  • Cargas de trabalho pesadas e aleatórias
  • Cargas de trabalho com uso intensivo de memória

Vejamos cada um desses casos de uso e os benefícios de ter tamanhos de trabalhador maiores.

Benefícios do uso de workers grandes para cargas de trabalho intensivas em embaralhamento

No Spark e no Hive, o embaralhamento ocorre quando os dados precisam ser redistribuídos pelo cluster durante uma computação. Quando seu aplicativo realiza grandes transformações ou reduz operações como join, groupBy, sortByou repartition, Spark e Hive acionam um shuffle. Além disso, cada estágio Spark e vértice Tez são limitados por uma operação aleatória. Tomando o Spark como exemplo, por padrão, existem 200 partições para cada trabalho do Spark definido por spark.sql.shuffle.partitions. No entanto, o Spark calculará o número de tarefas em tempo real com base no tamanho dos dados e na operação que está sendo executada. Quando uma ampla transformação é executada em um grande conjunto de dados, pode haver GBs ou mesmo TBs de dados que precisam ser buscados por todas as tarefas.

Os embaralhamentos geralmente são caros em termos de tempo e recursos e podem levar a gargalos de desempenho. Portanto, otimizar embaralhamentos pode ter um impacto significativo no desempenho e no custo de um trabalho do Spark. Com trabalhadores grandes, mais dados podem ser alocados para a memória de cada executor, o que minimiza os dados embaralhados entre os executores. Isso, por sua vez, leva a um aumento do desempenho de leitura aleatória porque mais dados serão buscados localmente do mesmo trabalhador e menos dados serão buscados remotamente de outros trabalhadores.

Experimentos

Para demonstrar os benefícios do uso de workers grandes para consultas intensivas em embaralhamento, vamos usar q78 de TPC-DS, que é uma consulta Spark pesada e aleatória que embaralha 167 GB de dados em 12 estágios do Spark. Vamos realizar duas iterações da mesma consulta com configurações diferentes.

As configurações para o Teste 1 são as seguintes:

  • Tamanho do executor solicitado durante a criação do aplicativo EMR Serverless = 4 vCPUs, 8 GB de memória, 200 GB de disco
  • Configuração do trabalho do Spark:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • Paralelismo = 192 (spark.executor.instances * spark.executor.cores)

As configurações para o Teste 2 são as seguintes:

  • Tamanho do executor solicitado durante a criação do aplicativo EMR Serverless = 8 vCPUs, 16 GB de memória, 200 GB de disco
  • Configuração do trabalho do Spark:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • Paralelismo = 192 (spark.executor.instances * spark.executor.cores)

Vamos também desabilitar a alocação dinâmica definindo spark.dynamicAllocation.enabled para false para ambos os testes para evitar qualquer ruído potencial devido a tempos variáveis ​​de inicialização do executor e manter a utilização de recursos consistente para ambos os testes. Nós usamos Medida da Faísca, que é uma ferramenta de código aberto que simplifica a coleta e a análise das métricas de desempenho do Spark. Como estamos usando um número fixo de executores, o número total de vCPUs e a memória solicitada são os mesmos para ambos os testes. A tabela a seguir resume as observações das métricas coletadas com o Spark Measure.

. Tempo total gasto para consulta em milissegundos shuffleLocalBlocksFetched shuffleRemoteBlocksFetched shuffleLocalBytesRead shuffleRemoteBytesRead shuffleFetchWaitTime shuffleWriteTime
Teste 1 153244 114175 5291825 3.5 GB 163.1 GB 1.9 hr 4.7 minutos
Teste 2 108136 225448 5185552 6.9 GB 159.7 GB 3.2 minutos 5.2 minutos

Como pode ser visto na tabela, há uma diferença significativa no desempenho devido a melhorias no shuffle. O Teste 2, com metade do número de executores duas vezes maior que o Teste 1, foi executado 29.44% mais rápido, com 1.97 vezes mais dados aleatórios buscados localmente em comparação com o Teste 1 para a mesma consulta, mesmo paralelismo e os mesmos recursos agregados de vCPU e memória . Portanto, você pode se beneficiar de um desempenho aprimorado sem comprometer o custo ou o paralelismo do trabalho com a ajuda de grandes executores. Observamos benefícios de desempenho semelhantes para outras consultas TPC-DS com uso intensivo de embaralhamento, como q23a e q23b.

Recomendações

Para determinar se os trabalhadores grandes beneficiarão seus aplicativos Spark intensivos em embaralhamento, considere o seguinte:

  • Verifique o estágios guia na interface do usuário do Spark History Server do seu aplicativo EMR Serverless. Por exemplo, na captura de tela a seguir do Spark History Server, podemos determinar que este trabalho do Spark escreveu e leu 167 GB de dados aleatórios agregados em 12 estágios, observando o Leitura aleatória e Gravação aleatória colunas. Se seus trabalhos embaralharem mais de 50 GB de dados, você poderá se beneficiar do uso de trabalhadores maiores com 8 ou 16 vCPUs ou spark.executor.cores.

  • Verifique o SQL/DataFrame guia da interface do usuário do Spark History Server do seu aplicativo EMR Serverless (somente para APIs Dataframe e Dataset). Ao escolher a ação realizada pelo Spark, como coletar, pegar, mostrarString ou salvar, você verá um DAG agregado para todos os estágios separados pelas trocas. Cada troca no DAG corresponde a uma operação de embaralhamento, e conterá os bytes e blocos locais e remotos embaralhados, conforme pode ser visto na captura de tela a seguir. Se os blocos ou bytes aleatórios locais buscados forem muito menores em comparação com os blocos ou bytes remotos buscados, você pode executar novamente seu aplicativo com trabalhadores maiores (com 8 ou 16 vCPUs ou spark.executor.cores) e revisar essas métricas de troca em um DAG para veja se há alguma melhora.

  • Use o Medida da Faísca ferramenta com sua consulta do Spark para obter as métricas de embaralhamento no driver do Spark stdout logs, conforme mostrado no log a seguir para um trabalho do Spark. Revise o tempo gasto para leituras aleatórias (shuffleFetchWaitTime) e gravações aleatórias (shuffleWriteTime) e a proporção dos bytes locais buscados para os bytes remotos buscados. Se a operação aleatória demorar mais de 2 minutos, execute novamente seu aplicativo com workers maiores (com 8 ou 16 vCPUs ou spark.executor.cores) com o Spark Measure para rastrear a melhoria no desempenho do shuffle e no tempo de execução geral do trabalho.
Time taken: 177647 ms Scheduling mode = FIFO
Spark Context default degree of parallelism = 192 Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

Benefícios de usar trabalhadores grandes para cargas de trabalho com uso intensivo de memória

Certos tipos de cargas de trabalho consomem muita memória e podem se beneficiar de mais memória configurada por trabalhador. Nesta seção, discutimos cenários comuns em que grandes workers podem ser benéficos para executar cargas de trabalho com uso intensivo de memória.

Distorção de dados

As distorções de dados geralmente ocorrem em vários tipos de conjuntos de dados. Alguns exemplos comuns são detecção de fraude, análise populacional e distribuição de renda. Por exemplo, quando você deseja detectar anomalias em seus dados, espera-se que apenas menos de 1% dos dados sejam anormais. Se você quiser realizar alguma agregação sobre registros normais versus anormais, 99% dos dados serão processados ​​por um único trabalhador, o que pode fazer com que esse trabalhador fique sem memória. Distorções de dados podem ser observadas para transformações com uso intensivo de memória, como groupBy, orderBy, join, funções de janela, collect_list, collect_set, e assim por diante. Tipos de junção como BroadcastNestedLoopJoin e produto cartesiano também são inerentemente intensivos em memória e suscetíveis a distorções de dados. Da mesma forma, se os dados de entrada forem compactados em Gzip, um único arquivo Gzip não poderá ser lido por mais de uma tarefa porque o tipo de compactação Gzip não pode ser dividido. Quando há alguns arquivos Gzip muito grandes na entrada, seu trabalho pode ficar sem memória porque uma única tarefa pode ter que ler um arquivo Gzip enorme que não cabe na memória do executor.

As falhas devido à distorção de dados podem ser mitigadas pela aplicação de estratégias como salting. No entanto, isso geralmente requer grandes alterações no código, o que pode não ser viável para uma carga de trabalho de produção que falhou devido a uma distorção de dados sem precedentes causada por um aumento repentino no volume de dados de entrada. Para uma solução alternativa mais simples, você pode apenas querer aumentar a memória do trabalhador. Usando trabalhadores maiores com mais spark.executor.memory permite lidar com a distorção de dados sem fazer alterações no código do aplicativo.

Cache

Para melhorar o desempenho, o Spark permite armazenar em cache os quadros de dados, conjuntos de dados e RDDs na memória. Isso permite que você reutilize um quadro de dados várias vezes em seu aplicativo sem precisar recalculá-lo. Por padrão, até 50% da JVM do seu executor é usado para armazenar em cache os quadros de dados com base no property spark.memory.storageFraction. Por exemplo, se o seu spark.executor.memory for definido como 30 GB, 15 GB serão usados ​​para armazenamento em cache que é imune a remoção.

O nível de armazenamento padrão da operação de cache é DISK_AND_MEMORY. Se o tamanho do quadro de dados que você está tentando armazenar em cache não couber na memória do executor, uma parte do cache será despejada no disco. Se não houver espaço suficiente para gravar os dados armazenados em cache no disco, os blocos serão removidos e você não obterá os benefícios do armazenamento em cache. O uso de workers maiores permite que você armazene em cache mais dados na memória, aumentando o desempenho do trabalho recuperando blocos em cache da memória em vez do armazenamento subjacente.

Experimentos

Por exemplo, o seguinte Trabalho PySpark leva a uma distorção, com um executor processando 99.95% dos dados com agregados com uso intensivo de memória, como collect_list. A tarefa também armazena em cache um quadro de dados muito grande (2.2 TB). Vamos executar duas iterações do mesmo trabalho no EMR Serverless com as seguintes configurações de vCPU e memória.

Vamos executar o Teste 3 com as maiores configurações de worker possíveis anteriormente:

  • Tamanho do executor definido durante a criação do aplicativo EMR Serverless = 4 vCPUs, 30 GB de memória, 200 GB de disco
  • Configuração do trabalho do Spark:
    • spark.executor.cores = 4
    • spark.executor.memory = 27G

Vamos executar o Teste 4 com as configurações de trabalho grandes recém-lançadas:

  • Tamanho do executor definido durante a criação do aplicativo EMR Serverless = 8 vCPUs, 60 GB de memória, 200 GB de disco
  • Configuração do trabalho do Spark:
    • spark.executor.cores = 8
    • spark.executor.memory = 54G

O teste 3 falhou com FetchFailedException, o que resultou devido à memória do executor não ser suficiente para o trabalho.

Além disso, na IU do Spark do Teste 3, vemos que a memória de armazenamento reservada dos executores foi totalmente utilizada para armazenar em cache os quadros de dados.

Os blocos restantes para o cache foram derramados no disco, como visto no arquivo do executor stderr Histórico:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

Cerca de 33% do quadro de dados persistente foi armazenado em cache no disco, como visto no Armazenamento guia da interface do usuário do Spark.

O teste 4 com executores maiores e vCores foi executado com êxito sem lançar nenhum erro relacionado à memória. Além disso, apenas cerca de 2.2% do quadro de dados foi armazenado em cache no disco. Portanto, os blocos armazenados em cache de um quadro de dados serão recuperados da memória e não do disco, oferecendo melhor desempenho.

Recomendações

Para determinar se os trabalhadores grandes beneficiarão seus aplicativos Spark com uso intensivo de memória, considere o seguinte:

  • Determine se seu aplicativo Spark tem alguma distorção de dados observando a interface do usuário do Spark. A captura de tela a seguir da interface do usuário do Spark mostra um exemplo de cenário de distorção de dados em que uma tarefa processa a maioria dos dados (145.2 GB), observando o Leitura aleatória tamanho. Se uma ou menos tarefas processarem significativamente mais dados do que outras tarefas, execute novamente seu aplicativo com trabalhadores maiores com 60–120 G de memória (spark.executor.memory definido em qualquer lugar de 54-109 GB considerando 10% de spark.executor.memoryOverhead).

  • Verifique o Armazenamento guia do Spark History Server para revisar a proporção de dados armazenados em cache na memória para o disco do Tamanho na memória e Tamanho em disco colunas. Se mais de 10% de seus dados estiverem armazenados em cache no disco, execute novamente seu aplicativo com trabalhos maiores para aumentar a quantidade de dados armazenados em cache na memória.
  • Outra maneira de determinar preventivamente se seu trabalho precisa de mais memória é monitorando Pico de memória JVM na IU do Spark Executores aba. Se o pico de memória JVM usado estiver próximo ao executor ou memória do driver, você pode criar um aplicativo com um trabalhador maior e configurar um valor maior para spark.executor.memory or spark.driver.memory. Por exemplo, na captura de tela a seguir, o valor máximo de pico de uso de memória JVM é 26 GB e spark.executor.memory é definido como 27 G. Nesse caso, pode ser benéfico usar workers maiores com 60 GB de memória e spark.executor.memory definido para 54 G.

Considerações

Embora grandes vCPUs ajudem a aumentar a localização dos blocos aleatórios, há outros fatores envolvidos, como taxa de transferência de disco, IOPS de disco (operações de entrada/saída por segundo) e largura de banda da rede. Em alguns casos, mais trabalhadores pequenos com mais discos podem oferecer maior IOPS de disco, taxa de transferência e largura de banda de rede em geral em comparação com menos trabalhadores grandes. Incentivamos você a comparar suas cargas de trabalho com configurações de vCPU adequadas para escolher a melhor configuração para sua carga de trabalho.

Para tarefas pesadas aleatórias, é recomendável usar discos grandes. Você pode anexar até 200 GB de disco a cada trabalhador ao criar seu aplicativo. Usando grandes vCPUs (spark.executor.cores) por executor pode aumentar a utilização do disco em cada trabalhador. Se seu aplicativo falhar com "Sem espaço restante no dispositivo" devido à incapacidade de ajustar os dados aleatórios no disco, use mais workers menores com disco de 200 GB.

Conclusão

Nesta postagem, você aprendeu sobre os benefícios de usar grandes executores para seus trabalhos EMR Serverless. Para obter mais informações sobre diferentes configurações de trabalhador, consulte Configurações do trabalhador. Grandes configurações de worker estão disponíveis em todas as regiões onde o EMR Serverless está disponível.


Sobre o autor

Veena Vasudevan é Arquiteto de Soluções de Parceiro Sênior e especialista em Amazon EMR na AWS com foco em big data e análise. Ela ajuda clientes e parceiros a criar soluções altamente otimizadas, escaláveis ​​e seguras; modernizar suas arquiteturas; e migrar suas cargas de trabalho de big data para a AWS.

Carimbo de hora:

Mais de Grandes dados da AWS