导读

我们集群上的一些作业处理数据时经常需要进行Shuffle和临时数据落盘操作,在处理较大数据时,本地磁盘很容易就被写满导致task失败,严重时导致整个作业失败。还有一些作业是用Spark编写的,这些作业内部使用了RDD缓存,因为executor 进程缓存了过多的用户数据,导致程序空间变小需要进行频繁的GC操作,后来调整相关参数后虽然有所改善,但仍然不太理想。为了解决这些问题我们考虑很多方案与技术,后来将目标锁定在了Alluxio上 ,希望能够通过分布式缓存方式解决这些问题。

Alluxio原名Techyon是一个开源的基于内存的分布式存储系统,他于2012年诞生于UC Berkeley AMPLab,主要特性是数据存储与计算分离,使这两部分引擎可以进行独立的扩展。使数据引擎可以访问不同数据源中的数据。

本文主要从Alluxio的应用场景、通讯原理、存储方式、读写流程等几个角度分析阐述,使用的Alluxio版本为1.2.0. 因为水平有限同时也刚刚接触Alluxio,文章中不免有一些错误,希望大家批评指正。

Alluxio 架构

Alluxio (Techyon) ,可以说是世界上第一个以内存为中心的分布式存储系统(官方说的),它被设计成计算框架和底层存储的桥梁,所有的应用程序只需要连接上Alluxio 之后,便能方便的访问底层任意的存储系统上的数据。 同时为了更快速的提供数据访问服务,它的整个架构都是基于内存的。在大数据生态系统中,Alluxio 已经支持了很多计算框架和存储系统。同时Alluxio与Hadoop是兼容的,意味着已经有的Spark和MapRedcue程序可以不修改代码,直接在Alluxio上运行。

1-1

针对Alluxio的特性,我们可以用Alluxio同时管理多个底层文件系统,对外只提供统一的名称空间达到对上层屏蔽数据存储位置的目的。如果要处理的数据存储在不同的机房,可使用Alluxio的缓存功能在每天的计算开始前,预先将昨天或其他机房存储的数据缓存到起来提升计算与处理效率。

Alluxio已经能够较好的支持Hadoop, 假如我们使用Alluxio on YARN 的方式部署时就无形中就具备了一键部署的能力, 目前1.2.0版本的Alluxo已经能够较好的支持这种方式。在这种部署方式下提供了与集群部署方式相同的能力。

Alluxio 采用了经典型的单master 多worker架构。当使用集群独立模式部署时,为了解决master的单点问题,Alluxio使用多master实现,通过Zookeeper来保证全局唯一性。类似Hadoop的HA功能。 工作方式如下图

2-1

Master:  主要负责处理与存储系统元数据,元数据通过树形结构来管理,树形结构的每一个叶子节点代表文件或者空目录。对外(客户端或其他程序)提供元数据信息的访问与修改。对内(所有的Worker)负责统计收集每个Worker 周期心跳发送来的数据块信息与每个Worker的运行状态。Master不会主动与其他组件通讯,只以请求应答的方式与其他组件通讯。

Worker: 负责管理本地资源,如本地内存、SSD、磁盘等。提供Client读写数据请求服务,客户端或其他程序发来的数据将会以block的方式进行管理(一个文件的多个block会分散存储在多个Worker节点上)。每个Worker缓存数据时使用的是Ramfs,它是Linux下的一种基于RAM做存储的文件系统,上层可以象使用普通硬盘一样操作它。同时提供了象访问内存一样的速度。

Client: 使用数据的程序,可以是Spark executor 也可以是 MapReduce的task 或是其他程序。 Alluxio 允许Client通过与Master通讯来执行元数据操作,与Worker通讯来读写数据。同时客户端还可以直接绕过Alluxio访问底层存储系统。

Zookeeper: 实现容错和Leader选举,保证对外提供服务的leader(master)的唯一性。

Under File System: 真实的存储系统,可以是 S3、 HDFS、 GlusterFS等存储系统。

Alluxio RPC流程与原理

Alluxio 的通讯是基于Apache Thrift 实现的,Thrift是Facebook实现的一种高效的、支持多种编程语言的远程服务调用框架。Thrift可以使用代码生成器将一个定义好数据类型与接口描述的文件,生成RPC客户端与服务端的代码。目前已经支持当下流行的多种编程语言。

Alluxio 1.2.0的RPC Thrift描述文件在$ALLUXIO_HOME/core/common/src/thrift 文件夹下,它们共同组成了Alluxio的基础RPC组件,如下图:

3-1

  • common.thrift

通用信息定义文件,里面定义了基本的通讯类型、块基本信息、通用的RPC基类。最基础的AlluxioService 类就定义在这个文件里,它为其他RPC服务提供一些基础功能。 如getServiceVersion()

  • file_system_master.thrift

定义描述了Master端的服务,主要类如下:

FileSystemMasterClientService客户端Client至主节点Master通信的重要方法。如createFile(), createDirectory(), completeFile(), getStatus(), getUfsAddress(), loadMetadata(), mount()等。

FileSystemMasterWorkerService: Worker至主节点Master的通讯重要方法。如getFileInfo(), heartbeat()等。

  • file_system_worker.thrift

定义了Worker端服务FileSystemWorkerClientService, 客户端Client到Worker节点的通讯方法。如:createUfsFile(),openUfsFile()

  • block_master.thrift

定义了Master端 block管理服务BlockMasterWorkerService,每个Worker节点通过这个协议注册自己,并提交自己保存的块信息。

  • block_worker.thrift

定义了Worker端 block管理服务BlockWorkerClientServcie,允许向当前Worker发起相关请求,如:cancelBlock(), lockBlock(), promoteBlock()等相关方法

  • exception.thrift

定义基础异常类

Thrift 根据上述文件生成的RPC框架代码输出在alluxio.thrift 包下,有兴趣的可以自己去查看。

Alluxio Master 启动流程

Alluxio Master 启动服务类为AlluxioMaster ,它在alluxio.master包下。这个类中定义了AlluxioMaster类和一个main函数,这个main函数就是AllxuioMaster启动的入口。AlluxioMaster中会依赖如下几个组件:数据块元数据管理服务(BlockManager) , 文件系统元数据信息管理服务( FileSystemManager),   Lineage 元数据管理服务(LineageManager) ,WebUI 服务。AlluxioMaster 启动时会以RPC Server的身份打开监听,等待客户端的连接。其类图如下(AlluxioMaster.java):

4-1

 

AlluxioMaster 的 main 函数,在这里会通过get()函数以单例方式初始化AlluxioMaster对象,并调用start()函数启动Master服务。

 

AllxuioMaster 在构造函数中会初始化相关服务对象和创建存储目录,代码片段如下

 

上面这部分代码分别初始化了Thrift 框架的transportProvider服务对象,并从配置文件中初始化端口、IP信息,创建存储目录。初始化BlockManager、 FileSystemMaster 、LineageMaster等对象。

Alluxio Master 的启动接口是start(), 它内部会调用startMasters() 和 startServing() 两个函数,startMaster 函数会负责启动BlockManager、FileSystemMaster、LineageMaster 等服务。startServing函数负责启动Master的RPC和Web服务。

 

真正的RPC服务启动代码在startServingRPCServer() 成员函数中,在这里会创建MasterServer 并打开监听服务,代码片段如下:

 

至此Alluxio Master已经启动成功,可以接受来自客户端和Worker的请求了。如果对这个流程不太理解,可以尝试看一下Thrift编程框架,通过一个简单的RPC服务器的编写理解这部分代码。

Alluxio Worker 启动流程

Alluxio的Worker进程启动类定义在AlluxioWorker.java 文件中。在这个文件中定义了AlluxioWorker类和main函数,这个main函数就是AlluxioWorker的入口函数。

AlluxioWorker内包含了本地块管理服务(BlockWorker)、文件系统管理(FileSystemWorker)、数据请求服务(DataServer)、Worker 节点RPC服务等。

5-1

AlluxioWorker 服务的入口函数,在这里会以单例的方式初始化AlluxioWorker服务,并调用start()方法启动。

 

和AlluxioMaster 一样,它的初始化代码也是在无参构造函数中,在这里会读取配置文件,初始化Thrift 通讯框架。代码片段如下:

 

Worker节点的启动入口是start()函数,在这个函数中会依次启动metric system 、Web服务、块管理服务和文件系统管理服务。

代码片段如下:

 

Alluxio 读写数据流程分析

Alluxio 提供了访问数据的接口,根据官方说明,Alluxio 目前只提供数据一次写入,文件写完之后就不能再次修改。当文件还没有写完之前不能读取。同时提供了两种API供我们使用,一种是本地API,另一种是兼容Hadoop的API。 前者有更好的性能,后者可以灵活的使用Alluxio。

文件系统访问API类图如下:

6-1

为了使这个类图简洁,只画了读文件部分的接口与类。类中FileSystem为接口类,BaseFileSystem 实现了其所有的接口,LineageFileSystem 继承BaseFileSystem在它的基础上重写了相关方法,并增加了世袭相关接口。

  • 读数据流程

Alluxio客户端首先会从Alluxio文件系统中读取数据,如果读取不到则会从Under File System上读取数据,在读数据时允许用户根据自己的需要设置操作类型,目前支持的操作类型有如下几种:

类型 行为
CACHE_PROMOTE 如果读取的数据在Worker上时,该数据被移动到Worker的最高层。如果该数据不在本地Worker的Alluxio存储中,那么就将一个副本添加到本地Alluxio Worker中,用于每次完整地读取数据快。这是默认的读类型。
CACHE 如果该数据不在本地Worker的Alluxio存储中,那么就将一个副本添加到本地Alluxio Worker中,用于每次完整地读取数据块。
NO_CACHE 不会创建副本。

 

官方给的从Alluxio上读文件示例如下:

FileSystem fs = FileSystem.Factory.get();

AlluxioURI path = new AlluxioURI(“/myFile”);

// Generate options to set a custom read type OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.CACHE);

// Create a file and get its output stream

FileInStream in = fs.openFile(path,options);

// read data

in.read(…);

// Close and complete file

in.close();

在上面这个示例中,可以看出读数据代码还是比较简单的,只需要打开文件时指定文件名与打开选项,便可以读数据了。FileInSystem读数据的流程如下:

  1. 调用openFile接口打开文件时首先会根据传入的path,调用GetStatus()接口从master端拿到URIStatus 对象,这个对象中包含文件的block信息、文件属性信息、状态信息。然后判断要打开的文件如果是一个目录则抛出异常,如果不是则创建一个FileInStream对象并返回。

 

  1. 以读模式打开文件,如果文件长度无效将报错,否则返回FileInStream 对象。

 

  1. FileInStream 的read接口,在每次读取时都会更新当前流状态和块ID信息。这些计算与更新操作在updateStreams()函数中进行。updateStreams()函数会根据当前读取的偏移位置计算出block ID 在list中的顺序号,然后从block list中获取真正的BlockID。注意在创建FileInStream 对象时,如果被读文件还没有写完,那么客户端将会读失败。

 

  1. FileInStream有了相应的block ID后, 它首先会通过shouldUpdateStreams()函数检查当前blockStream是否已经读完。如果读完就执行更新下数据ID和切换数据流(更新mCurrentBlockBlockInStream指向下一个block作为输入 )操作和同步更新CACHE。
  1. 在updateBlockInStream函数更新mCurrentBlockBlockInStream对象时,首先会尝试从Alluxio文件系统上读取数据。如果读取块数据失败,FileInStream对象会再次尝试获得块数据信息,然后连接到底层存储服务器(UnderStoreBlockInStram),拉取相应的数据。在这时如果用户指定的是CACHE读类型,读取过程中还会向本地再缓存一份数据。

 

  1. 如果读远程数据也失败了,会尝试读取对应的Checkpoint文件(仅在打开世系功能后有效)。如果文件存在,则创建输入流和设定偏移位置。如果不存在或读数据失败,抛出异常。标记读操作失败。
  • 写数据流程

Alluxio 采用了多种策略来存储和管理底层文件,当我们进行写数据时Alluxio提供了多种策略来存储数据,官方给出的策略如下:

类型 行为
CACHE_THROUGH 数据被同步地写入到Alluxio的Worker和底层存储系统。
MUST_CACHE 数据被同步地写入到Alluxio的Worker。但不会被写入到底层存储系统。这是默认写类型。
THROUGH 数据被同步地写入到底层存储系统。但不会被写入到Alluxio的Worker。
ASYNC_THROUGH 数据被同步地写入到Alluxio的Worker,并异步地写入到底层存储系统。处于实验阶段。

 

下面的代码是官方给出的写数据相关的代码,写数据和读数据一样代码行数不多,可以在创建文件时指定写模式。

FileSystem fs = FileSystem.Factory.get();

AlluxioURI path = new AlluxioURI(“/myFile”);// Create a file and get its output

streamCreateFileOptions options = CreateFileOptions.defaults().setWriteType(WriteType.CACHE_THROUGH);

FileOutStream out fs.createFile(path, options);// Write data

out.write(…);// Close and complete fileout.close();

具体的写数据流程如下:

  1. 打开文件的入口是createFile(),它有两个实现,第一个实现只需要一个path。另外一个实现需要输入一个path 和一个写数据策略对象(CreateFileOptions) 。真正打开文件时调用的都是两个参数的createFile接口。接口内部会从客户端连接池中取一个masterClient对象,然后使用这个对象通知master 增加一个新文件。

 

5-2

  1. Master端接收到消息之后会将文件信息写入到日志文件,并向文件树结构中加入一个新节点。创建时会检查权限与挂载信息。如果要创建的文件已经存在将会抛出FileAlreadyExistsException 异常

6-2

  1. 创建文件元数据信息成功后,会根据这个Alluxio URI路径创建一个FileOutStream对象,在FileOutStream对象初始化时会根据用户指定的存储策略初始化相应的输出流。

7-2

  1. FileOutStream 创建成功后,客户端便可以开始顺序写入数据(getNextBlock() 获得下一个块),写数据数据时Write接口会判断当前写入的数据量如果已经满足一个块,便会申请一个新的block 继续写入。写入完成后需要调用close接口关闭文件。

8-2

9-2

一次简单的测试

测试目的:

测试与分析在相同的物理环境下用与不用Alluxio的性能区别

测试环境:

hadoop 集群 (Hadoop2.7.1)
名称 数量
主节点 2 (NM + RM)
计算节点 312
VCore 8938
Memory 31.03TB

 

Alluxio on yarn (Alluxio 1.2.0)
名称 数量
Alluxio master 1
worker 236
VCore 944 (4 * 236)
Memory 4720GB (20GB * 236)

 

Spark shell (Spark 1.5.1)
名称 数量
executor 100
VCore 200 (2* 100)
Memory 141330MB (1413.3MB * 100)

 

备注说明:

  1. Hadoop物理环境312个计算节点, 单节点物理内存128GB,2 * 8核CPU
  2. 运行 Alluxio on YARN (使用development 队列, 236个worker ,每个Worker 20G内存, 总内存4720GB)
  3. 使用spark-shell测试 (指定启动100个executor, 每个2 VCore, 1413.3 MB 内存)

 

测试方法:

  1. 生成350G测试数据,数据内容为行级日志数据(apache 日志, 19个18G数据)
  2. 执行Spark-shell 命令

使用HDFS测试命令:

val rdd = sc.textFile(“/alluxio/tmpdir”);

rdd.repartition(1).saveAsTextFile(“/alluxio/tmpdir3″)

rdd.count

val op=rdd.flatMap(_.split(” “)).map((_,1)).reduceByKey(_ + _)

op.sortByKey().take(10)

 

使用Alluxio测试命令(测试之前已经使用alluxio load 命令将数据放入内存):

val rdd = sc.textFile(“alluxio://alluxioMasterIP:19998/tmpdir”);

rdd.repartition(1).saveAsTextFile(“alluxio:// alluxioMasterIP:19998/tmpdir11″)

rdd.count

val op=rdd.flatMap(_.split(” “)).map((_,1)).reduceByKey(_ + _)

op.sortByKey().take(10)

测试结果:

耗时
执行顺序 Stage HDFS Alluxio
0 repartition 13 min 5.3 min
1 saveAsTextFile 1.2 h  出错
2 count 13 min 4.0 min
3 map 14 min 5.2 min
4 sortByKey 24 s 22 s
5 reduceBykey 16 s 16 s
6 take 0.6 s 0.6 s

 

备注:Alluxio 在执行高并发写入一个文件时会出错(命令:rdd.repartition(1).saveAsTextFile ),目前怀疑是写数据超时。从结果上看所有的读数据性能都有所提升(30%~40%)

  • Spark 使用HDFS读写数据

7

  • Spark 从Alluxio读写数据

8

  • 使用的测试数据状态

9

  • 测试数据内容

10

总结

本篇文章只是浅显的分析了Alluxio其中一小部分代码与执行流程,Alluxio 本身还有一些其他一些比较酷的特性,如数据容错处理,键值存储库、分层存储,Alluxio-FUSE这些功能都很实用能解决某个场景下的问题。因为本人也是刚刚接触Alluxio, 在测试时也遇到了一些问题,相信这些问题在之后的版本中都会一一解决。这篇文章如果有错误之处欢迎批评与指正。

 

敬请期待下一篇,《Alluxio 使用实战》