1.Spark 编程模型

我们先看一段代码,这段代码会创建一个本地模式运行的Spark程序 ,使用4个线程创建SparkContext对象,Application 的名字是“Test Spark”. 之后在Driver 端会打印出file中的每一行内容。

上面的代码中我们可以发现用scala 写 Spark程序非常简洁。这个程序实现了读取HDFS上的文件,并将文件打印到控制台上。(上面的代码有一个缺陷,当testFile读取的文件太大时,会给Driver 端造成很大的压力)

任何Spark程序都会有一个SparkContext对象(Java编写时是JavaSparkContext对象)它是Spark程序的主要入口,无论我们是从本地还是HDFS读取文件,都要首先创建一个SparkContext对象,然后基于这个sparkcontext对象,进行的RDD对象创建、转换等操作。SparkContext 初始化时依赖一个SparkConf 对象,它描述了配置环境相关的信息。扩展阅读【2】

2.Spark RDD

RDD (Resilient Distributed Dataset , 弹性分布式数据集)是Spark的核心,在Spark中一切转换和计算操作都是以RDD为单位进行处理。一个RDD代表一组数据,这些数据会分布在集群的节点上。RDD 具备高容错性,当某个RDD中有数据丢失时可以通过重新计算的方式再次生成相关数据。可以通过两种方式创建RDD对象,第一种是通过将驱动程序中的集合数据并行化得到一个RDD。 第二种是使用外部存储中的文件,如HBase、HDFS、共享文件系统等。

2.1  创建

2.1.1 通过本地数据集合创建RDD

我们可以在Driver端,使用SparkContext的parallelize方法来将一个集合并行化,创建一个RDD对象(注意这时集合对象已经被并行化,复制到计算节点上了)。 例如,下面是创建一个包含数字1到5的并行化集合:

对于并行集合的一个重要参数是将数据集切分的个数。 Spark将为集群的每个分区运行一个任务。 通常,需要为集群中的每个CPU分配2-4个分区(官方文档建议)。 Spark会尝试根据集群大小自动设置分区数。 当然我们也可以手动设置它(例如sc.parallelize(data,10))。

2.1.2 通过外部数据创建

Spark 可以从Hadoop支持的任何数据源读取数据,包括本地文件系统。对于文本类型的RDD可以用sparkcontext的textFile方法进行创建。同时我们也可以明确指定存储URI (hdfs:///file.txt,  file:///file.txt)

  • textFile方法支持通配符方式获得一个目录下的所有文件,如 sc.textFile(“hdfs:///dir/*.txt”)
  • textFile方法还可使用第二个参数来控制文件的分区数。默认,Spark为文件的每个块创建一个分区(HDFS默认为64MB),但是你也可以通过传递一个更大的值来请求更多的分区。

除了文本文件,Spark的Scala API还支持其他几种数据格式:

  • SparkContext.wholeTextFiles可以读取包含多个小文本文件的目录,并将其作为(文件名,内容)对返回。与textFile相反,textFile将打所有文件,返回一个结果集。
  • 读取SequenceFiles,使用SparkContext的sequenceFile [K,V]方法,其中K和V是键和文件中的值的类型。可以是Hadoop的Writable接口的子类,如IntWritable和Text。另外,Spark允许您为几个常见的Writables指定本机类型;例如,sequenceFile [Int,String]将自动读取IntWritables和Texts。
  • 对于其他Hadoop InputFormats,可以使用SparkContext.hadoopRDD方法,该方法接受一个JobConf和一个输入格式类,键类和值类。还可以使用SparkContext.newAPIHadoopRDD为InputFormats和基于“新的”MapReduce API(org.apache.hadoop.mapreduce)。

RDD.saveAsObjectFile和SparkContext.objectFile支持以Java对象或序列化方式将RDD保存起来。

2.2 RDD操作

创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某种函数操作,从而使记录发生改变;  而action是运行某些计算或聚合操作,并将结果返回SparkContext驱动程序。

Spark 中的所有转换都是延迟的,当有转换操作时不会马上开始计算。仅当需要将结果返回到驱动程序或阶段依赖时,才会开始转换计算。这种设计使Spark能够更有效地运行。

2.2.1  转换(transformation)

转换函数 含义
map(func) Map 转换会对RDD中的每一个记录都执行func 函数,并返回一个新的RDD
filter(func) 根据传入的func函数对RDD进行筛选,并返回一个新的RDD.其中func是一个返回true或false的函数。
flatMap(func) 接收一个函数,应用到RDD中的每个元素,返回一个包含可迭代的类型(如list等)的RDD,可以理解为先Map(),后flat().
mapPartitions(func) 与Map函数类似,mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
mapPartitionsWithIndex(func) 与mapPartition基本相同,只是在处理函数的参数是一个二元元组,元组的第一个元素是当前处理的分区的index,元组的第二个元素是当前处理的分区元素组成的Iterator。 (Int, Iterator<T>) => Iterator<U>
sample(withReplacement, fraction, seed) 根据 fraction 指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed 用于指定随机数生成器种子
union(otherDataset) 返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成
intersection(otherDataset) 取交集,对源数据和传入参数取交集然后返回
distinct([numTasks])) 返回一个包含源数据集中所有不重复元素的新数据集
groupByKey([numTasks]) 在一个键值对的数据集上调用,返回一个 (K,Seq[V])对的数据集 。
注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的 numTasks 参数来改变它
reduceByKey(func, [numTasks]) 在一个键值对的数据集上调用时,返回一个键值对的数据集,使用指定的 reduce 函数,将相同 key 的值聚合到一起。类似 groupByKey,reduce 任务个数是可以通过第二个可选参数来配置的
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 这个函数可用于完成对groupByKey,reduceByKey的相同的功能,用于对rdd中相同的key的值的聚合操作,主要用于返回一个指定的类型U的RDD的transform,
sortByKey([ascending], [numTasks]) 在一个键值对的数据集上调用,K 必须实现 Ordered 接口,返回一个按照 Key 进行排序的键值对数据集。升序或降序由 ascending 布尔参数决定
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W) 类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的 (K, (V, W)) 数据集
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W) 的数据集上调用,返回一个 (K, Seq[V], Seq[W]) 元组的数据集。这个操作也可以称之为 groupwith
cartesian(otherDataset) 笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U) 对数据集(两两的元素对)
pipe(command, [envVars]) 对 RDD 进行管道操作
coalesce(numPartitions) 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作
repartition(numPartitions) 随机重新刷新RDD中的数据以创建更多或更少的分区,并在它们之间进行平衡。
repartitionAndSortWithinPartitions(partitioner) 根据给定的分区器重新分区RDD,并在每个生成的分区按key排序。 这比调用重新分区然后在每个分区内排序更有效,因为它可以将排序推入shuffle机器。

2.2.2 Action

Action 含义
reduce(func) 使用函数func(它接受两个参数并返回一个)来聚合数据。 为保证正确,实现的func函数的两个输入参数应该是可交换和关联的.
collect() 在Driver 端以数组的形式获得数据集的内容, 通常应用于数据量较小的数据集
count() 统计dataset 中元素的个数
first() 获得数据集的第一个元素 (类似于 take(1)).
take(n) 获得数据集中前N个元素
takeSample(withReplacement, num, [seed]) 从数据集中随机获得num元素,并以数组形式返回。可以选择是否用随机数替换不足的部分,seed 用于指定的随机数生成器种子
takeOrdered(n, [ordering]) 使用自然顺序或自定义比较器返回RDD的前n个元素。
saveAsTextFile(path) 将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。 Spark将对每个元素调用toString将其转换为文件中的一行文本。
saveAsSequenceFile(path) 将数据集的元素作为Hadoop SequenceFile写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定路径中。也可以是任何实现Hadoop的Writable接口的文件系统。
saveAsObjectFile(path) 以Java 对象的形式,存储到制定的路径上。可以使用SparkContext.objectFile() 函数加载。
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
只作用于类型为(K,V)对的RDD。 返回一个hashmap (K,int) 及每个Key的计数
foreach(func) 在数据集的每一个元素上,运行函数 func 进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase

2.2.3 Shuffle 操作

Spark中的某些操作会触发shuffle事件。 shuffle是Spark重新分发数据的机制,由于分区中的分组方式不同。 经常会跨executor和机器复制数据,使得shuffe操作既复杂又耗时。shuffle 就是将具有相同KEY的数据放在同一个分区内,为了分配正确需要将所有数据读取并放在合适的executor或节点上。能够导致shuffle的操作包括重新分区操作,如repartition和join,“ByKey操作(除了计数),如groupByKey和reduceByKey,以及连接操作,如cogroup和join。

有些shuffle 操作并不会对分区内的数据进行排序。如果希望shuffle之后可数据有序,可以使用:

  • mapPartitions以使用例如.sorted对每个分区进行排序
  • repartitionAndSortWithinPartitions有效地对分区进行排序,同时重新分区
  • sortBy创建一个全局排序的RDD

某些shuffle操作可能会消耗大量的堆内存,因为它们使用内存数据结构来在传输和组织数据。象reduceByKey和aggregateByKey在map端创建这些结构,而ByKey操作在reduce端生成这些结构。当内存放不下数据时,Spark会将这些数据溢写到磁盘,这会增加磁盘I / O的额外和垃圾回收的开销。

2.6 RDD 持久化

Spark 最重要的特性之一就是可以将内存中的RDD 持久化到磁盘上。当我们持久化RDD时,每个节点存储自己内存中的分区,支持后续我们重用这部分数据。可以使用persist()或cache()方法将RDD标记为持久化。持久化动作会在第一次的action被触发时开始。Spark的如果发现有分区丢失了,它将使用最初创建的RDD 重新计算这部分数据。

另外,在持久化RDD时,可以指定不同的存储级别。Spark 支持将数据持久化到内存中也支持存储在磁盘上。Persist()方法支持用户指定存储级别。而cache ()方法则只能使用默认的StorageLevel.MEMORY_ONLY存储级别。存储级别如下:

存储级别 含义
MEMORY_ONLY 默认的级别, 将 RDD 作为反序列化的的对象存储在 JVM 中。如果不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算
MEMORY_AND_DISK 将 RDD 作为反序列化的的对象存储在 JVM 中。如果不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取
MEMORY_ONLY_SER 将 RDD 作为序列化的的对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化的空间利用率更高,尤其当使用fast serializer,但在读取时会比较占用CPU
MEMORY_AND_DISK_SER 与 MEMORY_ONLY_SER 相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算
DISK_ONLY 只将 RDD 分区存储在硬盘上
MEMORY_ONLY_2、MEMORY_AND_DISK_2等 与上述的存储级别一样,但是将每一个分区都复制到两个集群结点上
OFF_HEAP(实验阶段) 类似于MEMORY_ONLY_SER,但将数据存储在堆外存储器中。 需要启用非堆内存。

3. 广播变量(Broadcast Variables)

广播变量允许程序员将一个只读变量缓存在每台机器上,而不需要再在任务之间传递。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。

Spark动作通过一组阶段执行,由分布式“shuffle”操作分隔。 Spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以串行形式缓存,并在运行每个任务之前反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同的数据或以反序列化形式缓存数据很重要时才有用。

通过调用SparkContext.broadcast(v)从变量v创建广播变量。广播变量是v的一个包装,它的值可以通过调用value方法访问。例子如下:

4. 累加器(Accumulator)

累加器是只支持被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和计算求和。Spark原生只支持数值类型的累加器,我们可以添加新类型的支持。当我们在创建累加器时指定了名字,那么我们在Spark ui 上就能看到这个变量。

我们可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()分别创建Long类型或Double类型的累加器。(这是2.02 的API , 旧版本需要使用sc.accumulator() 创建) 然后,在运行时可以使用add 方法累加值。另外累加器变量只能在Driver端被访问。

原生累加器使用方法如下:

我们可以通过继承AccumulatorV2 方法来实现自己的累加器(对于2.0 之前需要继承AccumulatorParam)

自定义累加器使用方法

注意:当程序员定义自己的AccumulatorV2类型时,结果类型可能与添加的元素不同。

对于仅在action内执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,即重新启动的任务不会更新该值。 在tranformation中,如果重新执行任务或作业阶段,每个任务的更新可能会被应用多次。

累加器不会改变Spark的延迟计算模型。 如果它们在需要在RDD内被更新,则仅当RDD作为动作的一部分计算时才更新它们的值。 因此,不能保证在像map()这样的延迟变换中执行累加器更新。 以下代码片段演示此属性:

 

 

小结

本文只是简单介绍了一个Spark最基础的一部分功能和API,对于Spark 的其他特性还有很多内容没有包含。介绍的这些函数和API 将会是我们写Spark程序的基础。万事开头难,所以更需要我们花一些时间和精力去理解和实践。

 

参考:

【1】Spark官方文档 http://spark.apache.org/docs/latest/programming-guide.html#overview

【2】SparkContext分析 http://www.cnblogs.com/softlin/p/5792126.html

【3】Spark 编程实战