Amazon EMR Serverless поддерживает рабочие процессы большего размера для выполнения рабочих нагрузок, требующих больше вычислительных ресурсов и памяти.

Amazon EMR Serverless поддерживает рабочие процессы большего размера для выполнения рабочих нагрузок, требующих больше вычислительных ресурсов и памяти.

Исходный узел: 1960092

Amazon EMR без сервера позволяет запускать платформы больших данных с открытым исходным кодом, такие как Apache Spark и Apache Hive, без управления кластерами и серверами. С помощью EMR Serverless вы можете запускать аналитические рабочие нагрузки в любом масштабе с автоматическим масштабированием, которое изменяет размер ресурсов за считанные секунды в соответствии с изменяющимися объемами данных и требованиями к обработке. EMR Serverless автоматически масштабирует ресурсы вверх и вниз, чтобы обеспечить необходимый объем емкости для вашего приложения.

Мы рады сообщить, что EMR Serverless теперь предлагает рабочие конфигурации с 8 виртуальными ЦП с объемом памяти до 60 ГБ и с 16 виртуальными ЦП с объемом памяти до 120 ГБ, что позволяет выполнять более сложные вычислительные и ресурсоемкие рабочие нагрузки на EMR Serverless. Приложение EMR Serverless внутренне использует рабочих процессов для выполнения рабочих нагрузок. и вы можете настроить различные рабочие конфигурации в зависимости от ваших требований к рабочей нагрузке. Ранее самой большой рабочей конфигурацией, доступной в EMR Serverless, было 4 виртуальных ЦП с памятью до 30 ГБ. Эта возможность особенно полезна для следующих распространенных сценариев:

  • Перемешивание тяжелых рабочих нагрузок
  • Рабочие нагрузки с интенсивным использованием памяти

Давайте рассмотрим каждый из этих вариантов использования и преимущества больших размеров рабочих процессов.

Преимущества использования больших рабочих процессов для рабочих нагрузок с интенсивным перемешиванием

В Spark и Hive перемешивание происходит, когда данные необходимо перераспределить по кластеру во время вычисления. Когда ваше приложение выполняет широкие преобразования или сокращает операции, такие как join, groupBy, sortByили repartition, Spark и Hive вызывают перетасовку. Кроме того, каждый этап Spark и вершина Tez ограничены операцией перемешивания. Взяв Spark в качестве примера, по умолчанию для каждого задания Spark, определяемого spark.sql.shuffle.partitions. Однако Spark будет вычислять количество задач на лету в зависимости от размера данных и выполняемой операции. Когда широкое преобразование выполняется поверх большого набора данных, могут быть ГБ или даже ТБ данных, которые должны быть получены всеми задачами.

Перетасовка обычно требует больших затрат времени и ресурсов и может привести к снижению производительности. Таким образом, оптимизация тасования может существенно повлиять на производительность и стоимость задания Spark. С большими рабочими процессами в памяти каждого исполнителя может быть выделено больше данных, что сводит к минимуму перетасовку данных между исполнителями. Это, в свою очередь, приводит к увеличению производительности случайного чтения, поскольку больше данных будет извлекаться локально от одного и того же рабочего процесса, а меньше данных будет извлекаться удаленно от других рабочих процессов.

Эксперименты

Чтобы продемонстрировать преимущества использования больших рабочих операций для запросов с интенсивным перемешиванием, давайте воспользуемся q78 от TPC-DS, который представляет собой Spark-запрос с большим количеством перетасовок, который перемешивает 167 ГБ данных по 12 этапам Spark. Выполним две итерации одного и того же запроса с разными конфигурациями.

Конфигурации для теста 1 следующие:

  • Размер исполнителя, запрошенный при создании бессерверного приложения EMR = 4 виртуальных ЦП, 8 ГБ памяти, 200 ГБ на диске.
  • Конфигурация задания Spark:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • Параллельность = 192 (spark.executor.instances * spark.executor.cores)

Конфигурации для теста 2 следующие:

  • Размер исполнителя, запрошенный при создании бессерверного приложения EMR = 8 виртуальных ЦП, 16 ГБ памяти, 200 ГБ на диске.
  • Конфигурация задания Spark:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • Параллельность = 192 (spark.executor.instances * spark.executor.cores)

Давайте также отключим динамическое размещение, установив spark.dynamicAllocation.enabled в false для обоих тестов, чтобы избежать любого потенциального шума из-за переменного времени запуска исполнителя и обеспечить согласованное использование ресурсов для обоих тестов. Мы используем Мера искры, который представляет собой инструмент с открытым исходным кодом, упрощающий сбор и анализ показателей производительности Spark. Поскольку мы используем фиксированное количество исполнителей, общее количество запрошенных виртуальных ЦП и памяти одинаково для обоих тестов. В следующей таблице приведены наблюдения по метрикам, собранным с помощью Spark Measure.

. Общее время, затраченное на запрос в миллисекундах в случайном порядкеLocalBlocksFetched в случайном порядкеRemoteBlocksFetched перемешатьLocalBytesЧтение перетасоватьУдаленныйбайтыЧтение перемешатьFetchWaitTime случайное время записи
Тест 1 153244 114175 5291825 3.5 GB 163.1 GB 1.9 час 4.7
Тест 2 108136 225448 5185552 6.9 GB 159.7 GB 3.2 5.2

Как видно из таблицы, есть существенная разница в производительности из-за улучшений в случайном порядке. Тест 2 с вдвое меньшим числом исполнителей, вдвое большим, чем в тесте 1, выполнялся на 29.44 % быстрее, с локальным извлечением в 1.97 раза больше данных в случайном порядке по сравнению с тестом 1 для того же запроса, того же параллелизма и тех же совокупных ресурсов виртуальных ЦП и памяти. . Таким образом, вы можете получить выгоду от повышения производительности без ущерба для стоимости или параллелизма заданий с помощью крупных исполнителей. Мы наблюдали аналогичные преимущества производительности для других запросов TPC-DS с интенсивным перемешиванием, таких как q23a и q23b.

Рекомендации

Чтобы определить, принесут ли большие рабочие процессы пользу вашим приложениям Spark с интенсивным перемешиванием, рассмотрите следующее:

  • Проверить Стажировки вкладку из пользовательского интерфейса Spark History Server вашего приложения EMR Serverless. Например, на следующем снимке экрана Spark History Server мы можем определить, что это задание Spark записало и прочитало 167 ГБ данных в случайном порядке, объединенных на 12 этапах, глядя на Перемешать Читать и Перемешать Записать столбцы. Если ваши задания перетасовывают более 50 ГБ данных, вы можете получить выгоду от использования более крупных рабочих процессов с 8 или 16 виртуальными ЦП или spark.executor.cores.

  • Проверить SQL/DataFrame на вкладке Spark History Server UI вашего приложения EMR Serverless (только для Dataframe и Dataset API). Когда вы выбираете выполняемое действие Spark, например, собирать, брать, показывать строку или сохранять, вы увидите агрегированную группу обеспечения доступности баз данных для всех этапов, разделенных обменами. Каждый обмен в DAG соответствует операции перемешивания, и он будет содержать перетасовку локальных и удаленных байтов и блоков, как показано на следующем снимке экрана. Если количество извлеченных локальных блоков или байтов в случайном порядке намного меньше по сравнению с извлеченными удаленными блоками или байтами, вы можете повторно запустить приложение с более крупными рабочими процессами (с 8 или 16 виртуальными ЦП или spark.executor.cores) и просмотреть эти метрики обмена в группе обеспечения доступности баз данных, чтобы посмотрите, есть ли улучшения.

  • Использовать Мера искры инструмент с вашим запросом Spark, чтобы получить метрики перемешивания в драйвере Spark. stdout журналы, как показано в следующем журнале для задания Spark. Просмотрите время, затраченное на случайное чтение (shuffleFetchWaitTime) и в случайном порядке пишет (shuffleWriteTime) и отношение количества извлеченных локальных байтов к количеству извлеченных удаленных байтов. Если операция перемешивания занимает более 2 минут, перезапустите приложение с более крупными рабочими процессами (с 8 или 16 виртуальными ЦП или spark.executor.cores) с Spark Measure для отслеживания улучшения производительности перемешивания и общего времени выполнения задания.
Time taken: 177647 ms Scheduling mode = FIFO
Spark Context default degree of parallelism = 192 Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

Преимущества использования больших рабочих процессов для рабочих нагрузок, интенсивно использующих память

Некоторые типы рабочих нагрузок интенсивно используют память и могут выиграть от большего объема памяти, настроенного для каждого рабочего. В этом разделе мы обсудим распространенные сценарии, в которых большие рабочие процессы могут быть полезны для выполнения рабочих нагрузок, интенсивно использующих память.

Перекос данных

Перекосы данных обычно возникают в нескольких типах наборов данных. Некоторыми распространенными примерами являются обнаружение мошенничества, анализ населения и распределение доходов. Например, если вы хотите обнаружить аномалии в своих данных, ожидается, что только менее 1% данных являются аномальными. Если вы хотите выполнить некоторую агрегацию поверх нормальных и ненормальных записей, 99% данных будут обрабатываться одним рабочим процессом, что может привести к тому, что этому рабочему процессу не хватит памяти. Перекосы данных могут наблюдаться для преобразований с интенсивным использованием памяти, таких как groupBy, orderBy, join, оконные функции, collect_list, collect_set, и так далее. Типы соединений, такие как BroadcastNestedLoopJoin и продукт Cartesan также по своей природе интенсивно используют память и подвержены перекосам данных. Точно так же, если ваши входные данные сжаты Gzip, один файл Gzip не может быть прочитан более чем одной задачей, поскольку тип сжатия Gzip неразделим. Когда на входе есть несколько очень больших файлов Gzip, вашему заданию может не хватить памяти, потому что одной задаче может потребоваться прочитать огромный файл Gzip, который не помещается в памяти исполнителя.

Сбои из-за перекоса данных можно смягчить, применяя такие стратегии, как добавление солей. Однако для этого часто требуются значительные изменения в коде, что может оказаться невыполнимым для производственной рабочей нагрузки, которая завершилась сбоем из-за беспрецедентного перекоса данных, вызванного внезапным ростом объема входящих данных. Для более простого обходного пути вы можете просто увеличить рабочую память. Использование более крупных рабочих с большим spark.executor.memory позволяет обрабатывать перекосы данных без внесения каких-либо изменений в код приложения.

Кэширование

Для повышения производительности Spark позволяет кэшировать кадры данных, наборы данных и RDD в памяти. Это позволяет повторно использовать фрейм данных в приложении несколько раз без необходимости его повторного вычисления. По умолчанию до 50% JVM вашего исполнителя используется для кэширования фреймов данных на основе property spark.memory.storageFraction. Например, если ваш spark.executor.memory установлен на 30 ГБ, то 15 ГБ используются для хранения кэша, который не подлежит вытеснению.

Уровень хранения по умолчанию для операции кэширования: DISK_AND_MEMORY. Если размер кадра данных, который вы пытаетесь кэшировать, не помещается в памяти исполнителя, часть кэша выливается на диск. Если на диске недостаточно места для записи кэшированных данных, блоки удаляются, и вы не получаете преимуществ кэширования. Использование более крупных рабочих процессов позволяет кэшировать больше данных в памяти, повышая производительность заданий за счет извлечения кэшированных блоков из памяти, а не из базового хранилища.

Эксперименты

Например, следующие Работа PySpark приводит к перекосу, когда один исполнитель обрабатывает 99.95% данных с помощью агрегатов с интенсивным использованием памяти, таких как collect_list. Задание также кэширует очень большой фрейм данных (2.2 ТБ). Давайте запустим две итерации одного и того же задания на EMR Serverless со следующими конфигурациями виртуальных ЦП и памяти.

Давайте запустим тест 3 с максимально возможными ранее конфигурациями воркеров:

  • Размер набора исполнителей при создании бессерверного приложения EMR = 4 виртуальных ЦП, 30 ГБ памяти, 200 ГБ на диске.
  • Конфигурация задания Spark:
    • spark.executor.cores = 4
    • spark.executor.memory = 27 г

Давайте запустим тест 4 с недавно выпущенными большими рабочими конфигурациями:

  • Размер исполнителя, заданный при создании бессерверного приложения EMR = 8 виртуальных ЦП, 60 ГБ памяти, 200 ГБ на диске.
  • Конфигурация задания Spark:
    • spark.executor.cores = 8
    • spark.executor.memory = 54 г

Тест 3 не прошел с FetchFailedException, что произошло из-за того, что памяти исполнителя не хватило для задания.

Кроме того, из пользовательского интерфейса Spark теста 3 мы видим, что зарезервированная память исполнителей была полностью использована для кэширования кадров данных.

Оставшиеся блоки для кэширования были сброшены на диск, как видно из команды исполнителя. stderr журналы:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

Около 33% сохраненного фрейма данных было кэшировано на диске, как видно из Хранилище вкладка пользовательского интерфейса Spark.

Тест 4 с более крупными исполнителями и виртуальными ядрами был успешно выполнен без каких-либо ошибок, связанных с памятью. Кроме того, только около 2.2% фрейма данных кэшировалось на диск. Таким образом, кэшированные блоки фрейма данных будут извлекаться из памяти, а не с диска, что обеспечивает более высокую производительность.

Рекомендации

Чтобы определить, принесут ли большие рабочие процессы пользу вашим приложениям Spark, интенсивно использующим память, рассмотрите следующее:

  • Определите, есть ли в вашем приложении Spark искажения данных, взглянув на пользовательский интерфейс Spark. На следующем снимке экрана пользовательского интерфейса Spark показан пример сценария перекоса данных, когда одна задача обрабатывает большую часть данных (145.2 ГБ). Перемешать Читать размер. Если одна или несколько задач обрабатывают значительно больше данных, чем другие задачи, перезапустите приложение с более крупными рабочими процессами с 60–120 ГБ памяти (spark.executor.memory установить от 54 до 109 ГБ с учетом 10% spark.executor.memoryOverhead).

  • Проверить Хранилище вкладку Spark History Server, чтобы просмотреть соотношение данных, кэшированных в памяти, к диску из Размер в памяти и Размер на диске столбцы. Если более 10 % ваших данных кэшируются на диске, перезапустите приложение с более крупными рабочими процессами, чтобы увеличить объем данных, кэшируемых в памяти.
  • Еще один способ заранее определить, требуется ли вашей работе больше памяти, — это отслеживать Пиковая память JVM в пользовательском интерфейсе Spark Исполнители вкладка Если пиковая используемая память JVM близка к памяти исполнителя или драйвера, вы можете создать приложение с большей рабочей ролью и настроить более высокое значение для spark.executor.memory or spark.driver.memory. Например, на следующем снимке экрана максимальное значение пикового использования памяти JVM составляет 26 ГБ, а spark.executor.memory установлено значение 27 G. В этом случае может оказаться выгодным использовать более крупные воркеры с 60 ГБ памяти и spark.executor.memory установить на 54 г.

Соображения

Несмотря на то, что большие виртуальные ЦП помогают повысить локальность блоков в случайном порядке, существуют и другие факторы, такие как пропускная способность диска, дисковые IOPS (операций ввода-вывода в секунду) и пропускная способность сети. В некоторых случаях большее количество мелких рабочих процессов с большим количеством дисков может предложить более высокие дисковые операции ввода-вывода в секунду, пропускную способность и пропускную способность сети в целом по сравнению с меньшим количеством крупных рабочих процессов. Мы рекомендуем вам сравнить свои рабочие нагрузки с подходящими конфигурациями виртуальных ЦП, чтобы выбрать наилучшую конфигурацию для вашей рабочей нагрузки.

Для заданий, требующих перетасовки, рекомендуется использовать большие диски. Вы можете подключить до 200 ГБ диска к каждому воркеру при создании приложения. Использование больших виртуальных ЦП (spark.executor.cores) на исполнителя может увеличить использование диска каждым рабочим. Если ваше приложение завершается с ошибкой «Нет свободного места на устройстве» из-за невозможности разместить данные в случайном порядке на диске, используйте более мелкие рабочие процессы с диском емкостью 200 ГБ.

Заключение

В этом посте вы узнали о преимуществах использования крупных исполнителей для бессерверных заданий EMR. Дополнительные сведения о различных конфигурациях рабочих процессов см. Конфигурации рабочих. Конфигурации с большими рабочими процессами доступны во всех регионах, где используется EMR Serverless. доступен.


Об авторе

Вина Васудеван — старший архитектор партнерских решений и специалист по Amazon EMR в AWS, специализирующийся на больших данных и аналитике. Она помогает клиентам и партнерам создавать оптимизированные, масштабируемые и безопасные решения; модернизировать свою архитектуру; и перенесите свои рабочие нагрузки с большими данными на AWS.

Отметка времени:

Больше от AWS Большие данные