15|内存视角(一):如何最大化内存的使用效率?
文章目录
你好,我是吴磊。
上一讲我们说,想要提升 CPU 利用率,最重要的就是合理分配执行内存,但是,执行内存只是 Spark 内存分区的一部分。因此,想要合理分配执行内存,我们必须先从整体上合理划分好 Spark 所有的内存区域。
可在实际开发应用的时候,身边有不少同学向我抱怨:“Spark 划分不同内存区域的原理我都知道,但我还是不知道不同内存区域的大小该怎么设置,纠结来、纠结去。最后,所有跟内存有关的配置项,我还是保留了默认值。”
这种不能把原理和实践结合起来的情况很常见,所以今天这一讲,我就从熟悉的 Label Encoding 实例出发,**一步步带你去分析不同情况下,不同内存区域的调整办法,**帮你归纳出最大化内存利用率的常规步骤。这样,你在调整内存的时候,就能结合应用的需要,做到有章可循、有的放矢。
从一个实例开始
我们先来回顾一下第 5 讲中讲过的 Label Encoding。在 Label Encoding 的业务场景中,我们需要对用户兴趣特征做 Encoding。依据模板中兴趣字符串及其索引位置,我们的任务是把千亿条样本中的用户兴趣转换为对应的索引值。模板文件的内容示例如下所示。
//模板文件
//用户兴趣
体育 - 篮球-NBA-湖人
军事 - 武器 - 步枪-AK47
实现的代码如下所示,注意啦,这里的代码是第 5 讲中优化后的版本。
/**
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, “UTF-8”)
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)
//Dataset 中的函数调用
partFunc(“体育 - 篮球-NBA-湖人”)
下面,咱们先一起回顾一下代码实现思路,再来分析它目前存在的性能隐患,最后去探讨优化它的方法。
首先,findIndex 函数的主体逻辑比较简单,就是读取模板文件和构建 Map 映射,以及查找用户兴趣并返回索引。不过,findIndex 函数被定义成了高阶函数。这样一来,当以模板文件为实参调用这个高阶函数的时候,我们会得到一个内置了 Map 查找字典的标量函数 partFunc,最后在千亿样本上调用 partFunc 完成数据转换。利用高阶函数,我们就避免了让 Executor 中的每一个 Task 去读取模板文件,以及从头构建 Map 字典这种执行低效的做法。
在运行时,这个函数在 Driver 端会被封装到一个又一个的 Task 中去,随后 Driver 把这些 Task 分发到 Executor,Executor 接收到任务之后,交由线程池去执行(调度系统的内容可以回顾第 5 讲)。这个时候,每个 Task 就像是一架架小飞机,携带着代码“乘客”和数据“行李”,从 Driver 飞往 Executor。Task 小飞机在 Executor 机场着陆之后,代码“乘客”乘坐出租车或是机场大巴,去往 JVM stack;数据“行李”则由专人堆放在 JVM Heap,也就是我们常说的堆内内存。
回顾 Label encoding 中的 findIndex 函数不难发现,其中大部分都是代码“乘客”,唯一的数据“行李”是名为 searchMap 的 Map 字典。像这样用户自定义的数据结构,消耗的内存区域就是堆内内存的 User Memory(Spark 对内存区域的划分内容可以回顾一下第 7 讲)。
User Memory 性能隐患
回顾到这里,你觉得 findIndex 函数有没有性能隐患呢?你可以先自己思考一下,有了答案之后再来看我下面的分析。
答案当然是“有”。首先,每架小飞机都携带这么一份数据“大件行李”,自然需要消耗更多的“燃油”,这里的“燃油”指的是 Task 分发过程中带来的网络开销。其次,因为每架小飞机着陆之后,都会在 Executor 的“旅客行李专区”User Memory 寄存上这份同样的数据“行李”,所以,User Memory 需要确保有足够的空间可以寄存所有旅客的行李,也就是大量的重复数据。
那么,User Memory 到底需要准备出多大的内存空间才行呢?我们不妨来算一算。这样的计算并不难,只需要用飞机架次乘以行李大小就可以了。
用户自定义的数据结构往往是用于辅助函数完成计算任务的,所以函数执行完毕之后,它携带的数据结构的生命周期也就告一段落。因此,在 Task 的数量统计上,我们不必在意一个 Executor 总共需要处理多少个 Task,只需要关注它在同一时间可以并行处理的 Task 数量,也就是 Executor 的线程池大小即可。
我们说过,Executor 线程池大小由 spark.executor.cores 和 spark.task.cpus 这两个参数的商(spark.executor.cores/spark.task.cpus)决定,我们暂且把这个商记作 #threads。
接下来是估算数据“行李”大小,由于 searchMap 并不是分布式数据集,因此我们不必采用先 Cache,再提取 Spark 执行计划统计信息的方式。对于这样的 Java 数据结构,我们完全可以在 REPL 中,通过 Java 的常规方法估算数据存储大小,估算得到的 searchMap 大小记为 #size。
好啦!现在,我们可以算出,User Memory 至少需要提供 #threads * #size 这么大的内存空间,才能支持分布式任务完成计算。但是,对于 User Memory 内存区域来说,使用 #threads * #size 的空间去重复存储同样的数据,本身就是降低了内存的利用率。那么,我们该怎么省掉 #threads * #size 的内存消耗呢?
性能调优
学习过广播变量之后,想必你头脑中已经有了思路。没错,咱们可以尝试使用广播变量,来对示例中的代码进行优化。
仔细观察 findIndex 函数,我们不难发现,函数的核心计算逻辑有两点。一是读取模板文件、创建 Map 映射字典;二是以给定字符串对字典进行查找,并返回查找结果。显然,千亿样本转换的核心需求是其中的第二个环节。既然如此,我们完全可以把创建好的 Map 字典封装成广播变量,然后分发到各个 Executors 中去。
有了广播变量的帮忙,凡是发往同一个 Executor 的 Task 小飞机,都无需亲自携带数据“行李”,这些大件行李会由“联邦广播快递公司”派货机专门发往各个 Executors,Driver 和每个 Executors 之间,都有一班这样的货运专线。思路说完了,优化后的代码如下所示。
/**
广播变量实现方式
*/
//定义广播变量
val source = Source.fromFile(filePath, “UTF-8”)
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
val bcSearchMap = sparkSession.sparkContext.broadcast(searchMap)
//在 Dataset 中访问广播变量
bcSearchMap.value.getOrElse(“体育 - 篮球-NBA-湖人”, -1)
上面代码的实现思路很简单:第一步还是读取模板文件、创建 Map 字典;第二步,把 Map 字典封装为广播变量。这样一来,在对千亿样本进行转换时,我们直接通过 bcSearchMap.value 读取广播变量内容,然后,通过调用 Map 字典的 getOrElse 方法来获取用户兴趣对应的索引值。
相比最开始的第一种实现方式,第二种实现方式的代码改动还是比较小的,那这一版代码对内存的消耗情况有什么改进呢?
我们发现,Task 小飞机的代码“乘客”换人了!小飞机之前需要携带函数 findIndex,现在则换成了一位“匿名的乘客”:一个读取广播变量并调用其 getOrElse 方法的匿名函数。由于这位“匿名的乘客”将大件行李托运给了“联邦广播快递公司”的专用货机,因此,Task 小飞机着陆后,没有任何“行李”需要寄存到 User Memory。换句话说,优化后的版本不会对 User Memory 内存区域进行占用,所以第一种实现方式中 #threads * #size 的内存消耗就可以省掉了。
Storage Memory 规划
这样一来,原来的内存消耗转嫁到了广播变量身上。但是,广播变量也会消耗内存,这会不会带来新的性能隐患呢?那我们就来看看,广播变量消耗的具体是哪块内存区域。
回顾存储系统那一讲,我们说过,Spark 存储系统主要有 3 个服务对象,分别是 Shuffle 中间文件、RDD 缓存和广播变量。它们都由 Executor 上的 BlockManager 进行管理,对于数据在内存和磁盘中的存储,BlockManager 利用 MemoryStore 和 DiskStore 进行抽象和封装。
那么,广播变量所携带的数据内容会物化到 MemoryStore 中去,以 Executor 为粒度为所有 Task 提供唯一的一份数据拷贝。MemoryStore 产生的内存占用会被记入到 Storage Memory 的账上。因此,广播变量消耗的就是 Storage Memory 内存区域。
接下来,我们再来盘算一下,第二种实现方式究竟需要耗费多少内存空间。由于广播变量的分发和存储以 Executor 为粒度,因此每个 Executor 消耗的内存空间,就是 searchMap 一份数据拷贝的大小。searchMap 的大小我们刚刚计算过就是 #size。
明确了 Storage Memory 内存区域的具体消耗之后,我们自然可以根据公式:(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction 去有针对性地调节相关的内存配置项。
内存规划两步走
现在,咱们在两份不同的代码实现下,分别定量分析了不同内存区域的消耗与占用。对于这些消耗做到心中有数,我们自然就能够相应地去调整相关的配置项参数。基于这样的思路,想要最大化内存利用率,我们需要遵循两个步骤:
- 预估内存占用
- 调整内存配置项
我们以堆内内存为例,来讲一讲内存规划的两步走具体该如何操作。我们都知道,堆内内存划分为 Reserved Memory、User Memory、Storage Memory 和 Execution Memory 这 4 个区域。预留内存固定为 300MB,不用理会,其他 3 个区域需要你去规划。
预估内存占用
首先,我们来说内存占用的预估,主要分为三步。
第一步,计算 User Memory 的内存消耗。我们先汇总应用中包含的自定义数据结构,并估算这些对象的总大小 #size,然后用 #size 乘以 Executor 的线程池大小,即可得到 User Memory 区域的内存消耗 #User。
第二步,计算 Storage Memory 的内存消耗。我们先汇总应用中涉及的广播变量和分布式数据集缓存,分别估算这两类对象的总大小,分别记为 #bc、#cache。另外,我们把集群中的 Executors 总数记作 #E。这样,每个 Executor 中 Storage Memory 区域的内存消耗的公式就是:#Storage = #bc + #cache / #E。
第三步,计算执行内存的消耗。学习上一讲,我们知道执行内存的消耗与多个因素有关。第一个因素是 Executor 线程池大小 #threads,第二个因素是数据分片大小,而数据分片大小取决于数据集尺寸 #dataset 和并行度 #N。因此,每个 Executor 中执行内存的消耗的计算公式为:#Execution = #threads * #dataset / #N。
调整内存配置项
得到这 3 个内存区域的预估大小 #User、#Storage、#Execution 之后,调整相关的内存配置项就是一件水到渠成的事情(由公式(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction)可知),这里我们也可以分为 3 步。
首先,根据定义,spark.memory.fraction 可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到。
同理,spark.memory.storageFraction 的数值应该参考(#Storage)/(#Storage + #Execution)。
最后,对于 Executor 堆内内存总大小 spark.executor.memory 的设置,我们自然要参考 4 个内存区域的总消耗,也就是 300MB + #User + #Storage + #Execution。不过,我们要注意,利用这个公式计算的前提是,不同内存区域的占比与不同类型的数据消耗一致。
总的来说,在内存规划的两步走中,第一步预估不同区域的内存占比尤为关键,因为第二步中参数的调整完全取决于第一步的预估结果。如果你按照这两个步骤去设置相关的内存配置项,相信你的应用在运行时就能够充分利用不同的内存区域,避免出现因参数设置不当而导致的内存浪费现象,从而在整体上提升内存利用率。
小结
合理划分 Spark 所有的内存区域,是同时提升 CPU 与内存利用率的基础。因此,掌握内存规划很重要,在今天这一讲,我们把内存规划归纳为两步走。
第一步是预估内存占用。
- 求出 User Memory 区域的内存消耗,公式为:#User=#size 乘以 Executor 线程池的大小。
- 求出每个 Executor 中 Storage Memory 区域的内存消耗,公式为:#Storage = #bc + #cache / #E。
- 求出执行内存区域的内存消耗,公式为:#Execution = #threads * #dataset / #N。
第二步是调整内存配置项:根据公式得到的 3 个内存区域的预估大小 #User、#Storage、#Execution,去调整(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction 公式中涉及的所有配置项。
- spark.memory.fraction 可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到。
- spark.memory.storageFraction 的数值应该参考(#Storage)/(#Storage + #Execution)。
- spark.executor.memory 的设置,可以通过公式 300MB + #User + #Storage + #Execution 得到。
这里,我还想多说几句,内存规划两步走终归只是手段,它最终要达到的效果和目的,是确保不同内存区域的占比与不同类型的数据消耗保持一致,从而实现内存利用率的最大化。
每日一练
- 你知道估算 Java 对象存储大小的方法有哪些吗?不同的方法又有哪些优、劣势呢?
- 对于内存规划的第一步来说,要精确地预估运行时每一个区域的内存消耗,很费时、费力,调优的成本很高。如果我们想省略掉第一步的精确计算,你知道有哪些方法能够粗略、快速地预估不同内存区域的消耗占比吗?
期待在留言区看到你的思考和答案,我们下一讲见!
文章作者 anonymous
上次更新 2024-01-26