官方的YARN Federation Router ,将Router REST API 做成了统一汇总输出方式 ,即: Router REST服务使用了RM的规范,其URL路径相同。这样保证了客户端访问方式不变最大化向前兼容。其返回结果是所有SubCluster信息的聚合,即它会将所有SubCluster的信息合并返回一个统一的结果。
注意,这些REST API 是以拦截器方式注册到RouterWebService 中,目前提供了两个实现DefaultRequestInterceptorREST.calss 和 FederationInterceptorREST.class ,前者是简单实现,当收到请求之后直接转发到RM上(配置文件中的RM地址), 后者会从FederationStateStoreFacade中获取所有Actvie RM地址,通过这些地址获取所有RM信息合并返回给客户端。
DefaultRequestInterceptorREST.calss 其中的一个实现
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Override public ClusterInfo getClusterInfo() { return RouterWebServiceUtil.genericForward(webAppAddress, null, ClusterInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.INFO, null, null); } @Override public ClusterMetricsInfo getClusterMetricsInfo() { return RouterWebServiceUtil.genericForward(webAppAddress, null, ClusterMetricsInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.METRICS, null, null); } |
FederationInterceptorREST.class 中的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
@Override public AppInfo getApp(HttpServletRequest hsr, String appId, Set<String> unselectedFields) { long startTime = clock.getTime(); ApplicationId applicationId = null; try { applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedRetrieved(); return null; } SubClusterInfo subClusterInfo = null; SubClusterId subClusterId = null; try { subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); if (subClusterId == null) { routerMetrics.incrAppsFailedRetrieved(); return null; } subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedRetrieved(); return null; } DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( subClusterId, subClusterInfo.getRMWebServiceAddress()); AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); long stopTime = clock.getTime(); routerMetrics.succeededAppsRetrieved(stopTime - startTime); return response; } |
目前官方代码中的REST API 实现不全,个人实现了一部分反馈给了社区 (YARN-8401)
总结:
在使用YARN Federation 时,建议优先使用FederationInterceptorREST.class 拦截器。按现有的设计Router会屏蔽Client 到 Resource的可见性。而统一的汇总API则是实现这种方式的基础。
Router角色正常,日志无报错。但是跑pi作业就是跑不通过,和这个相关吗?
可以看下配置