累加器

累加器是只支持被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和计算求和。Spark原生只支持数值类型的累加器,我们可以添加新类型的支持。当我们在创建累加器时指定了名字,那么我们在Spark ui 上就能看到这个变量。

我们可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()分别创建Long类型或Double类型的累加器。(这是2.02 的API , 旧版本需要使用sc.accumulator() 创建) 然后,在运行时可以使用add 方法累加值。另外累加器变量只能在Driver端被访问。

原生累加器使用方法如下:

我们可以通过继承AccumulatorV2 方法来实现自己的累加器(对于2.0 之前需要继承AccumulatorParam)

自定义累加器使用方法

注意:当程序员定义自己的AccumulatorV2类型时,结果类型可能与添加的元素不同。

对于仅在action内执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,即重新启动的任务不会更新该值。 在tranformation中,如果重新执行任务或作业阶段,每个任务的更新可能会被应用多次。

累加器不会改变Spark的延迟计算模型。 如果它们在需要在RDD内被更新,则仅当RDD作为动作的一部分计算时才更新它们的值。 因此,不能保证在像map()这样的延迟变换中执行累加器更新。 以下代码片段演示此属性:

 

参考:http://spark.apache.org/docs/2.0.2/programming-guide.html#accumulators