Router 组件在Federation 模式下可以部署多个,通过部署多个Router可以分担单点压力。目前Router组件官方提供了多种策略,灵活保证资源请求的路由负载包括应用程序被调度到哪个集群,请求哪个子集群的资源。根据这个特性我们可以部署多个Router 配置不同的策略,针对不同业务客户端配置到不同的Router 上。

如上图3个SubCluster, ,3个Router, 若干Client。Client会连接到对应Router上,而不同的Router会根据不同的策略将App分配到对应的SubCluster上。

在Hadoop的源码中,Router的核心代码分别在hadoop-yarn-server-router和hadoop-yarn-server-common组件下, 在common组件主要包含了Router的各种Policy。Router 组件下是Router主要服务代码所在。

1. Router启动流程分析

Router作为一个单独的服务,他有一个自己的入口类Router, 它和其他服务一样利用组合模式(Composite Pattern) 实现。 其类图如下:

Router 主要有三个组件,为了保证性能和可扩展性,Router将客户端请求服务与管理请求服务分别放在两个服务中,另外将WebUI作为一个独立的服务。

RouterClientRMService: 负责接受客户端请求,并根据配置的Policy将客户端的请求转发到具体的HomeSubCluster。它本身实现了ApplicationClientProtocol 接口,兼容基于ApplicationClientProtocol协议实现的并行框架,如:MR、Spark.

RouterRMAdminService: 负责接受管理命令请求,保证管理命令可以快速的响应与 作业提交请求互不影响。

WebApp: Router的WebUI服务类,提供界面显示。

Router初始化代码片段如下:

Router在设计时充分考虑了可扩展性增加了自定义拦截器,允许用户开发拦截器加入到Pipeline中处理。这样我们可以根据自己的需要开发一些拦截器,比如可以开发一个拦截器,将所有某个客户端的请求做一个转发或回调。

2. RouterClientRMService组件

RouterClientRMService在Federation方式下代替非Federation模式下的ClientRMService接受用户请求。并根据设置的Interceptor处理请求。(配置项参考第一篇)

Router默认使用0.0.0.0:8050提供服务,用户可以使用yarn.router.clientrm.address配置修改端口和地址.

拦截器的默认使用的是org.apache.hadoop.yarn.server.router.clientrm.DefaultClientRequestInterceptor 可以使用yarn.router.clientrm.interceptor-class.pipeline修改

拦截器会在Router第一次被请求时初始化,初始化完成后会放在一个LRU的缓存中。

拦截器初始化代码如下:

2.1 拦截器

官方默认提供了两个拦截器一个是DefaultClientRequestInterceptor另一个是FederationClientInterceptor。

DefaultClientRequestInterceptor

DefaultClientRequestInterceptor没有考虑跨SubCluster集群提交作业,启动时根据Router节点配置的RM地址直接初始化,可以理解为Router与特定的SubCluster绑定的方式。初始化代码如下:

FederationClientInterceptor

FederationClientInterceptor设计时考虑到了多种情况,满足大部分场景。支持配置Policy决定拦截器的处理方式。

拦截器每次处理请求时都会从FederationState存储中获取SubCluster的信息,然后根据Policy决定处理转发到哪个子集群。其中FederationState存储有三种实现,分别是Zookeeper、关系数据库(MySQL或其他)、内存方式。

FederationClientInterceptor主要有三个组件:FederationStateStoreFacade、RouterPolicyFacade、RouterMetrics其类图如下:

FederationStateStoreFacade: 提供SubClusterInfo状态获取接口,用户可以配置yarn.federation.state-store.class修改存储方式,默认是MemoryFederationStateStore

RouterPolicyFacade:提供Policy获取接口,默认使用的是org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager 管理器,用户可以通过yarn.federation.policy-manager配置修改。

RouterMetrics:提供当前Router服务的Metrics统计

FederationClientInterceptor的初始化代码如下:

Policy分析

Policy策略只会在提交作业接口(submitApplication)起作用,即控制作业提交到哪个子集群。代码如下:

Policy类图如下:


Policy初始化代码如下:

 

总结:

Federation的Router组件具有很大的灵活性,可以根据自己的需要配置相应的策略使用。不过也存在一定问题,如果Router出了问题那么对应的客户端将无法提交作业。所以后续这块需要做一些扩展。另外目前的版本Policy对应的功能还不完善,如何配置策略配置文件目前官方还没有相应文档。后续我将会增加配置Router策略相关的文章。