最近笔者在用Spark进行离线处理时,观察到很多Spark On YARN 的APP 虽然请求了很多个executor 但是实际上每个executor的利用率不高。最后导致整个集群的CPU利用率一直处于低点。而由于这些APP占用了资源,导致YARN无法运行其他APP,造成排队现象。

通过分析主要有以几个特点:

  1. 不同stage之前的task数不一样,无法在每个阶段都全部打满所有的executor
  2. 部分executor处于满负荷运行,一个app里总有几个executor 是一直在运行的
  3. 读取orc文件进行切分task时会有一段时间消耗,这段时间内所有的executor空闲状态

基于这个问题,我们期望提升CPU利用率和队列利用率,理想情况是让所有类型的APP(MR/SPARK)能够及时将资源释放回YARN资源池(释放后使这些资源能够服务其他APP),同时当APP需要资源时能够快速再从资源池取回继续计算。通过一放一取的方式增加CPU利用率和处理效率。

优化前

过去Spark 作业与MR作业会跑在一个队列里,在这种模式下不同类型的作业在一个环境中使用同一个资源队列运行。经常导致多个Spark作业将资源全部占用并长时间不释放,MR作业排队等到死的问题(即使你的队列并行APP个数很高,也一样死,因为没有资源)。以前的办法可能是暴力的kill掉一些executor 更甚至直接kill掉spark app.让其他作业跑起来。这对于Spark来说是不公平的。

优化1.0版本

经过不断的打磨与试验我们将Spark和MR作业分别跑在自己的专有队列里,也就是每个业务可能有两个独立队列,一个跑Spark,一个跑MR。运行一段时间之后,情况有所好转,Spark与MR之间不再互相影响。MR作业变的运行可预期,满足优化后目标。但Spark作业仍然存在另一个问题,那就是Spark队列的资源占用率很高但CPU使用率很低,其实还是Spark在每个阶段无法将executor利用起来。

优化2.0版本

后来我们通过Spark运行原理的角度分析了导致这个问题的原因,其实很好解释。在Spark On YARN 模式时,Spark 会优先将所有资源都请求好(driver, executor),然后DAGScheduler 和 TaskScheduler 会根据代码逻辑生成stage和task set ,然后将这些task 分配到相应的executor 上执行。在这种模式下必然会存在资源无法完全利用的情况。为了提升资源利用率,我们需要将Spark On YARN的资源分配模型从原来的一次分配模型改成弹性分配模型。及不用就释放需要就请求。Spark本身提供了这个功能叫动态资源分配。不过我们基于这个功能做了一些扩展,主要增加了一些申请与释放的策略。减少YARN的压力。

使用优化2.0版本之后,集群的CPU利用率大提升,并且减少了10%左右的作业排队时间,变向的提升了作业效率。

Spark动态资源参数

 

参数说明:

作业启动后第一批启动的executor数 等于 max (spark.dynamicAllocation.initialExecutors , spark.dynamicAllocation.minExecutors, spark.executor.instances, –num-executors )

同时 spark.dynamicAllocation.maxExecutors 和 –num-executors 可以不一样。 必须是 minExecutors < –num-executors < maxExecutors 的一个数。

总结:

以上调优化还需要配合YARN 本身的调优与队列配置才能达到最优效果。笔者在3000节点的集群上亲测有效。个人觉得小集群(小于500台节点)无需打开Spark动态资源这个功能,打开反而更慢。