这篇文章是作为 数据科学博客马拉松
介绍
Apache Spark 是一种大数据处理框架,长期以来一直是与大数据相关的各种项目中最流行和最常遇到的框架之一。 它成功地结合了工作速度和开发人员表达想法的简单性。
开发人员在足够高的级别上处理数据,似乎没有什么困难,例如,只需编写一行代码即可连接两个数据集:
ordersDF.join(customersDF, ordersDF ["customer_id"] == customersDF["id"], "left_outer")
但是想一想:当两个数据集连接在一起时,集群中会发生什么,这两个数据集可能完全位于也可能不完全位于任何集群节点上? 通常,Apache Spark 可以快速处理所有事情,但有时,尤其是在数据确实很多的情况下,您仍然需要了解下层发生的情况,并利用这些知识帮助 Apache Spark 发挥最大作用。
今天我们将讨论如何使您的应用程序运行得更快并使用您为其请求的所有资源。 本文将主要关注 Spark SQL 模块,通过静态配置在 Yarn 集群上运行 Apache Spark 应用程序。 但是一般的想法也可以应用于其他初始数据。 我们在这里查看 Spark 2.3 / 2.4,以更好地了解 Spark 3 中的所有创新。
数据及其所在
让我们从 Spark 为我们提供的用于处理数据的抽象开始——这就是 RDD( 弹性分布式数据集 )。 就本文而言,我们使用的是 DataFrame 还是 DataSet 并不重要。
图片1
因此,对于开发人员来说,一组数据被呈现为单个对象,并在集群中某个执行器的某个线程中分部分(块)单独处理。 一个块是最小的处理单元,执行器接收一个块和一条指令,告诉他需要用这个数据块做什么。
Apache Spark 应用程序如何在集群中工作
在高层次上,每个 Spark 应用程序在其运行时都包含一个驱动程序——一个执行 main() 函数的程序和运行在集群节点上的执行程序。 Executors 是万能的士兵,他们接收一块数据(块)和一条指令,执行它,并向驱动程序报告完成以接收下一条指令。 每个执行器可以运行多个处理线程,在这种情况下,每个线程独立于其他线程处理自己的数据块。 因此,如果在启动我们的应用程序时,我们从集群管理器订购了五个具有四个内核(线程)的执行器,那么在每个时刻我们都有 5 * 4 = 20 个线程,并且最多我们可以同时处理 20 个数据块。
因此,每个任务都可以执行:
-
num_executors – 将在其中启动数据处理线程的独立 JVM 进程的数量(它们可以位于同一个集群节点或不同的节点上)。 进程将一直运行到应用程序结束;
-
executor_cores 每个执行器中运行的并发线程数。 一个线程一次处理一个数据块。
图片2
Apache Spark 应用程序的工作原理
在 Spark History(一种以方便的形式显示 Spark 应用程序执行日志的 Web 服务器)中,它看起来像这样:
我们在这里看到两个执行器,每个执行器都有四个处理线程。
Shuffle Apache Spark 性能优化
所以,我们发现我们有 N 个数据块和 P 个线程(工作线程)可以并行处理这些数据块。
如果这些块一直存在到应用程序结束,一切都会好起来的,但几乎所有应用程序都会进行处理,需要对我们的块进行完全重组。 例如,按键(JOIN)连接两个表,按键分组(GROUP BY)。 在这种情况下,众所周知的 映射简化 模式适用于每个人,将整个集合的数据按key重新分配到新的数据块中,使具有相同key的行只在一个块中。 这个过程在 Spark 中称为 Shuffle。 为什么我把它大写? 因为这是一个非常复杂和昂贵的过程,它增加了执行者的内存消耗,集群节点上的磁盘内存消耗,以及集群节点之间的网络交换。 这非常让人想起毛毛虫变成蝴蝶的过程——一切都分崩离析,重新组装成新的样子,而且也是能源密集型的。
任务分阶段
在 Spark 中,处理从一个 Shuffle 到另一个 Shuffle 的块称为 Stage。 注意在shuffle之前,所有的block都是并行处理的,shuffle之后也是并行处理的,但是直到上一个stage结束的所有block都通过了这个过程之后,才会开始新的stage。 因此,阶段之间的边界是并行处理块时的等待位置。 还要注意,在一个阶段内,一个块上的所有任务(任务)在一个线程内顺序发生。 也就是说,区块不会通过网络传输到任何地方,而是并行处理所有区块。 事实证明,阶段边界内的块数没有变化。
图片3
任务分为阶段
我们来到了下图:所有的任务都被分成了阶段,每个阶段内的块数是恒定且相等的……。 这就是乐趣的开始。 我们知道 worker 的数量(P = executors * cores),但是每个阶段会有多少块是一个直接影响我们应用程序性能的问题。 毕竟,如果块很多,执行者很少,那么每个执行者会依次处理几个块,反之亦然:如果块少,执行者多,那么一些执行者将空闲而其余的执行者正在不知疲倦地工作。 这里最有趣的是,当应用程序运行缓慢时,他们试图给它更多的执行器,但在这种情况下,性能并没有提高。
让我们首先弄清楚分阶段的工作量。 在下文中,为了简单起见,我们将只考虑一个数据集的块。 在任何给定的时间,表演者都可以处理几个不相关的阶段。 例如,在 JOIN 之前,两个数据集将相互独立处理,从而在它们之间划分执行器。 在这种情况下,处理单元的数量将是它们的总和。 但就我们的目的而言,有必要了解一组数据发生了什么。 在第一步中,一切都将取决于您的数据集的来源。 例如,如果您正在从 HDFS 读取 parquet 文件的目录,那么第一步中的块数通常等于(构成加载目录中所有 .parquet 文件的 HDFS 块数)。 也就是说,在这种情况下,每个 HDFS 块将代表一个单独的数据块进行处理。 但不要忘记,这个区块分布会一直维持到阶段结束。 这是一个很好的例子。
我们正在从 HDFS 读取一个包含 150,000 个条目的小文件。 整个文件适合一个 HDFS 块。 因此,在第一阶段,我们只有一个数据块,因此只有一个执行者可以使用它。 但是根据转换的逻辑,每行包含一个字段持续时间(观看秒数),我们需要将输出中的每一行乘以与该行中观看秒数一样多的行。
viDF = spark.read.parquet("/tst/vi/") viDF.createOrReplaceTempView("ViewingInterval") spark.sql("""select t.*,explode(get_list_of_seconds(duatation)) as secondNumber from ViewingInterval""" )
测试数据的转换不能很快进行。 查看 Spark History,我们看到:
在第一阶段,一个数据块
Tasks = 1 表示这一阶段只有一个任务,因为只有一个数据块。 我们在输入端看到 2 MB 的数据,在输出端,已经有一个扩展的 1 GB 数据集。 所有这些都是由一个线程完成的,其余的都是空闲的,因为在这个阶段没有更多的任务。 我们应该怎么做,毕竟,爆炸 - 一个狭窄的依赖,因此不会中断阶段,而是在读取数据的同一阶段执行。 在阶段的框架内,正如我们已经知道的,块的数量是不变的。 在这种情况下,我们可以很容易地(因为输入数据集很小,并且混洗会很快发生)使用函数 repartition(N) 将这个阶段分成两个阶段,这导致随机顺序混洗,产生 N 个数据块在输出端,大小大致相等。 而既然它洗牌(Shuffle),就意味着一个新的阶段开始了。
viDF = spark.read.parquet("/tst/vi/") viDF.repartition(60).createOrReplaceTempView("ViewingInterval") spark.sql("""select t.*,explode(get_list_of_seconds(duatation)) as secondNumber来自 ViewingInterval""")
让我们看看 Spark 历史:
现在处理并行运行
在第二阶段——我现在重新分区后爆炸了,我们有 60 个任务(数据块),所有的表演者现在都在工作,没有闲着。 转型时间几乎减少了一半。 我们的任务是确保没有停机时间,并且所有执行者都在工作,否则,我们为什么要从以后不使用的集群中获取资源。
我们找到了第一个阶段,甚至学会了如何在 repartition(N) 的帮助下将任何阶段分成两个阶段。 让我们处理两个 shuffle 之间的内部阶段。 这里的一切都由 spark.sql.shuffle.partitions (default 200) 参数决定。 更准确地说,我决定,因为随着 AQE 的引入,Spark 学会了自己调节这个数量。 所以任何内部阶段都将由数据块的 spark.sql.shuffle.partitions 组成。 但在这里,也不是一切都那么顺利:如果你没有很多数据,那么你需要减少这个参数,如果你有很多,增加它。 在 Spark 2.3 的情况下,您需要根据您的数据寻找某种中间立场。
举个例子,当我们数据很少的时候,spark.sql.shuffle.partitions=200,默认情况下。查看Spark History,我们看到我们的数据集只有185行,在shuffle的时候被分成了200块(但是在这里它不足以容纳 200 个块)。 请注意,表演者真正有用的工作在这里被标为绿色。 也就是说,事实证明,在执行者处理一个记录中的一个数据块的总时间中,有用时间 <10%。 剩下的时间在等待和反序列化。
最后阶段会发生什么? 这再次取决于我们在哪里输出我们的 改造 数据。 例如,我们想将所有内容作为镶木地板文件写入目录。 如果我们在shuffle之后这样做,什么都不做,那么我们的程序执行后我们会在这个目录中找到200个文件。 为什么? 因为经过shuffle之后,我们默认得到了spark.sql.shuffle.partitions = 200个block,而且由于一个block被一个线程处理,它会自己写到一个单独的文件中。
通常,这是开发人员想要控制 HDFS 中文件数量并在保存到 DataFrame 合并(N)时调用该方法的地方。 这种方法只是将我们集合中的每个块输入到 N 的新块之一中。 那有coalesced(),实际上,不像repartition(),它不会导致shuffle,因此不会破坏stage,它只是让它在我们的stage会有N个数据块。 但这导致在这个阶段只有 N 个表演者的作品。 如果我们决定将所有内容保存在一个文件中会怎样——只有一个流可以工作。 让我们回忆一下关于第一阶段的推理,如果最后阶段在计算方面相当严重,那么在保存之前,coalesce(N) 做 repartition(N) 将最后阶段分成两部分是有意义的:倒数第二个,将在线程的 spark.sql.shuffle.partitions 并行执行繁重的计算(如果之前有连接,例如)和最后一个,将直接保存到我们需要的文件数(N ) 已经没有资源密集型计算。 在这里,您需要考虑什么会更快——保持一切不变,或者添加 repartition(N) 以执行 shuffle,这也不是免费的,但有可能并行化复杂的计算。
dataDF.repartition(1) .write .format("parquet") .mode("overwrite") .option("compression","snappy") .save("/tst/problem_4/result")
现在我们已经弄清楚了舞台上的块数与表演者数量之间的关系,我将举一个小例子。 在输入阶段,我们有20个数据块,只有10个执行器(5个执行器*2个核)。 我们看到几乎每个执行器在处理完一个块后,都被迫制作另一个块进行处理,因为平均而言,一个执行器有两个数据块需要处理。 但是,记住一个阶段的所有数据块都可以并行处理,我们为我们的任务请求 20 个执行器(5 个执行器 * 4 个内核),我们得到每个执行器现在只处理一个块,整个阶段的时间将最好减半。 这正是增加资源有效并提高速度时的情况,
增加资源 - 工作更快
顺便说一下,应用打破最后阶段的方法的有趣点之一,在上一段中描述:
dataDF.repartition(N).write。 …
如果我们比较最后一个阶段中断前后的指标,那么一切似乎都很好:转换时间减少了几次(因为最后一次计算是所有执行者并行执行的),Shuffle Spill 消失了(这是执行者没有足够的内存,他安排了一种与本地的交换。当然,在这种情况下,所有数据都来自几个大块,执行者很难消化它们)。
停止! 让我们仔细看看保存时收到的文件的大小。 以前是 5.9 GB,现在是 10.3 GB,记录数一样,数据的构成也一样。 为什么? 这真是美中不足!
注意输出的大小
只加了。 我们 repartition() 已经发现它以随机方式分配数据。 也就是说,我们不是在最后一次 shuffle 之后按键对数据进行部分排序(在我们的例子中是 JOIN),而是随机分布数据。 回想一下,parquet 是一种列式文件存储格式,其中的数据被压缩,利用它们可以在列中部分排序的事实。 事实证明,我们在行的分布中引入了随机性,从而使数据可压缩性降低了近两倍。 你能为这个做什么? 可以在每个数据块内返回顺序。
dataDF.repartition(20)。 sortWithinPartitions(asc("id")).write。 …
函数 sortWithinPartitions() 按字段或每个块内的多个字段排序,即没有改组发生,所有操作都在一个表演者的记忆中进行。 将此函数应用于我们的转换以按多个字段排序后,输出文件的总大小甚至比最初的还要小。 现在一切都很快对我们有用,输出文件的大小适合我们。 此外,在这种情况下,我们在 HDFS 中记录了大约相同大小的文件(这是 repartition() 的结果),这可能便于进一步处理。
Apache Spark 性能优化的优化器
既然我们已经接触到了 parquet 格式文件,我们将看到 Spark 优化器如何在诸如谓词下推和投影下推这样的优化器规则的示例上工作。
在投影下推的情况下,柱状拼花尤其胜出。 提醒一下,查询树的实际执行只在执行动作的时候开始,也就是输出数据的一个操作:传输到主程序(驱动程序)(收集,计数,..),存储文件在数据库中传输等等。……在此过程中,Spark 构建了一个查询树并对其进行了优化。 因此,在构建查询时,优化器已经知道需要哪些字段才能获得结果,并且只会从文件中读取这些字段。 由于在列式文件格式中,数据存储在列的上下文中,因此仅从文件中读取这些字段。
考虑优化器的谓词下推规则。 这种优化的原理很简单:我们有很多数据,如果它们最终没有用处,就不需要处理它们,例如,必须在我们的查询树执行结束时对其进行过滤。 优化器尝试将所有条件和过滤器尽可能地降低到更低的级别——更接近数据源,最好是在直接读取文件(或者,例如,对 RDBMS 的查询)之前。
让我们考虑一个例子:
这是生成的查询的物理执行计划:
让我们注意直接读取文件的块(FileScan parquet)和 PushedFilters 块——这些是在物理读取文件时会施加的条件。 我们看到这里得到了三个条件:
-
对于 ValueDatecondition IsNotNulland LessThanOrEqual- 后者很明显,这反映在我们的 SQL 中。 它从哪里来的 IsNotNull? 很明显,我们的请求中有一个条件 ValueDate <= 常量而 NULL 值不满足这个条件,即逻辑上一切都是正确的。 但是为什么镶木地板文件优化器要单独制作这个条件呢? 下一段将详细介绍这一点;
-
对于 SubjectID 条件 IsNotNull。 但是我们在请求中没有这样的条件,一般来说SubjectID是没有条件的。 在这个字段上只有一个 LEFT JOIN,我们的表在这里连接到主表。 是的,完全正确:使用这样的 JOIN,SubjectID 为 NULL 的所有行都不会包含在结果选择中。 我们看到优化器考虑到了这一点,甚至在一开始就不会从文件中读取这些行。
让我们仍然弄清楚条件 IsNotNull 有什么有趣之处,以至于优化器单独添加了它。 为此,让我们看看 parquet 文件的结构。 您可以使用 用于此的镶木地板工具. 问题是,镶木地板文件与架构一起,还存储了行组上下文中字段的一些统计信息。
里面的镶木地板文件
我们看到对于所有的整数类型都有值的个数(Values),这个块中NULL的个数(Null Values),还有这组行中列的Min和Max值。 我们立即回想起我们在字段 IsNotNull 上的条件。 也就是说,如果这组中的字段SubjectID有Values = Null Values,那么我们可以得出结论,这组行中的所有值都是NULL,根本不读取这个块。 这同样适用于更多、更少、相等的条件——这里你可以使用列的 Min 和 Max 值并得出结论——你是否需要读取这组行。
重要的是要了解,只有在开始执行查询之前事先知道条件,才能将条件降低到以下级别。
Apache Spark 性能优化的真实示例
parquet 文件目录按字段分区。 带有此字段的过滤器值的字符串,以逗号分隔,被传递给转换。 开发者做了explode(split(filter)),也就是从这一行取了一张有value的小表,问心无愧,和需要过滤的主表做了一个INNER JOIN。 转型进展缓慢。 让我们看一下查询计划:
所有分区都从 HDFS 读取
奇怪,但是在第一阶段,Spark 减去了所有分区( PartitionCount = 121),尽管我们通过了一个仅包含一个值的过滤器。 这正是在构建查询树时,Spark 根本不知道过滤器的情况,因为它隐藏在 JOIN 后面。
我们不使用过滤器值构建表,而是使用标准的 Spark SQL 函数 find_in_set(). 它在字符串中查找子字符串的位置,这是一个逗号分隔的列表。
也就是说,过滤器现在表示一个简单的表达式: where find_in_set(surveyprogectid, )
如果您查看查询执行计划,因为在构建它时,优化器已经知道过滤器字符串和条件,它将此条件降低到从文件读取的级别。 此外,知道这是一个分区字段,它应用分区修剪规则,即从考虑中丢弃不匹配过滤器的分区。
只从 HDFS 读取一个分区
请注意,我们的条件现在在 PartitionFilters 块中,由于该字段正在分区,因此从 HDFS 中仅减去我们需要的分区( PartitionCount = 1)。
因此,如果你有一个带分区的大表,并且你通过JOIN选择了一些分区,那么最好形成一个单独的动作,从这个过滤后的表中形成一个值列表作为一个字符串,并将它作为一个常量传递给主查询的条件。
Apache Spark 优化器的痛点
优化器的出色工作......但有时它倾向于将条件尽可能低到源可能是有害的。 UDF(用户定义的 功能 ) 进入现场。 用户定义函数是 Spark 优化器的黑匣子。
考虑以下示例:
我们有一个包含数十亿行的大文件。 我们只想选择唯一的 ID 并将我们的 UDF 应用到它们,然后只选择那些将为 Null 的结果。 请求顺序:
T1=> 选择 不同的id 止 T T2=> 选择 UDF(ID)的 as 新ID 止 T1 T3=> 选择 * 止 T2 哪里 新ID is 空
表中只有几千个唯一的 id 值,我们的 UDF 运行不快——它转到 HBase。 也就是说,我们已经构建了这样一个查询树,预计我们的 UDF 将被调用数千次。 我们发起请求并等待很长时间。
我们看一下查询执行计划:
UDF的情况几乎下降到最低水平
… 哦! 优化器尽了最大的努力:它诚实地将我们的条件 isNull(UDF(id)) 降低到直接读取文件后的级别,甚至直到我们只选择唯一 id 的那一刻。 这意味着我们繁重的 UDF 已经尝试了数十亿次而不是数千次。
在这里你能想到什么? 例如,在计算唯一ID(T1)后进行缓存(持久)。 或者使用横向视图,优化器不会通过它进一步传递条件。
选择 udf_res as 新ID 止 T1 侧 查看 爆炸 (数组(UDF(ID))) as udf_res
我们一开始就得到了我们想要的东西——UDF 只为唯一的 id 计算:
结论
在本文的范围之外,还有与 JOIN 优化相关的问题:广播、数据倾斜、合并和重新分区的优缺点。 此处对某些点进行了足够详细的描述,而有些则没有。 所以,访问 分析维迪亚 对于基础知识,使用它可以高度优化性能。 请参阅附加的屏幕截图!
参考:
图 1:https://mallikarjuna_g.gitbooks.io/spark/content/diagrams/spark-rdds.png
Image 2: https://media.springernature.com/lw685/springer-static/image/art%3A10.1007%2Fs11227-019-03093-0/MediaObjects/11227_2019_3093_Fig1_HTML.png
图片3:https://platoaistream.net/wp-content/uploads/2021/09/apache-spark-performance-optimization-for-data-engineers-3.png
- "
- 000
- 9
- 账号管理
- 操作
- 优点
- 所有类型
- 其中
- 分析
- 阿帕奇
- Apache Spark
- 应用
- 应用领域
- 应用领域
- 刊文
- 基础
- 最佳
- 大数据运用
- 亿
- 黑色
- 盒子
- 建筑物
- 呼叫
- 接近
- 码
- 柱
- 消费
- data
- 数据处理
- 数据集
- 数据库
- 处理
- 细节
- 开发商
- 开发
- DID
- 停机
- 司机
- 工程师
- 进入
- 等
- 交换
- 执行
- 高效率
- 字段
- 数字
- 过滤器
- 结束
- 姓氏:
- 专注焦点
- 申请
- 格式
- 骨架
- Free
- 开玩笑
- 功能
- 其他咨询
- 大
- 绿色
- 团队
- 此处
- 高
- 历史
- 创新中心
- How To
- HTTPS
- 增加
- IT
- 工作
- 加入
- 键
- 知识
- 大
- 发射
- 铅
- 知道
- Level
- Line
- 清单
- 本地
- 长
- 匹配
- 媒体
- 中等
- 最受欢迎的产品
- 网络
- 节点
- 秩序
- 其他名称
- 其它
- 模式
- 性能
- 的
- 图片
- 热门
- 曲目
- 项目
- 阅读
- 现实
- 记录
- 报告
- 资源
- REST的
- 成果
- 运行
- 运行
- 保存
- 科学
- 感
- 集
- 简易
- 尺寸
- 小
- So
- 速度
- 分裂
- SQL
- 阶段
- 开始
- 统计
- 存储
- 商店
- 告诉
- test
- 基础知识
- 该座
- 次
- 转型
- 普遍
- us
- 折扣值
- 查看
- 等待
- 卷筒纸
- Web服务器
- 什么是
- 中
- 工作
- 工人
- 合作
- 写作