1 - YARN RPC架构设计
YARN RPC Server 处理流程大致可以分为四个阶段:建立连接、接收请求、处理请求和返回结果
。各阶段实现如下图所示:
1.1 - 建立连接
整个 YARN RPC Server 只有一个 Listener 线程,且包含一个 Selector 对象,用于监听 OP_ACCEPT
事件。统一负责监听是否有来自各个客户端的 RPC 连接请求到达,并采用轮询策略选择一个 Reader 线程处理新连接。
1.2 - 接收请求
当 Listener 完成客户端的连接之后,通过轮询方式找到一个 Reader 线程处理,并将新的 RPC 请求封装成固定的格式(Call 类),放到一个共享队列(callQueue)中。可同时存在多个 Reader 线程,且包含一个 Selector 对象,用于监听 OP_READ
事件。
1.3 - 处理请求
Handler 线程(可同时存在多个)并行从共享队列(callQueue)中读取 Call 对象,执行对应的函数调用,并尝试直接将结果返回给对应的客户端。但某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时,Handler 线程就会为对应客户端生成一个 Connection 对象,同时创建一个 responseQueue 队列来储存结果,最后将结果写到 Responder 线程。
1.4 - 返回结果
Server 端只有一个 Responder 线程,且包含一个 Selector 对象,用于监听 OP_WRITE
事件。当 Handler 线程没能将结果一次性发送到对应客户端时,会向该 Selector 对象注册 OP_WRITE
事件,进而由 Responder 线程采用异步方式继续发送未发送完成的结果。
1.5 - RPC 参数调优
Hadoop RPC 主要的配置参数如下:
1、Reader 线程数量
由参数 ipc.server.read.threadpool.size
配置,默认是 1。默认情况下,一个 RPC Server 只包含一个 Reader 线程。
2、每个 Handler 线程对应的最大 Call 数量
由参数 ipc.server.handler.queue.size
配置,默认是 100。默认情况下,每个 Handler 线程对应的 Call 对列长度为 100。例如:如果 Handler 线程数是 10,则整个 Call 队列(即共享队列 callQueue)最大长度为:100 x 10 = 1000
3、Handler 线程数量
在 HDFS 的 NameNode 中对应的 Handler 数量由参数 dfs.datanode.handler.count
配置,默认是 10。
在 YARN 的 ResourceManager 中对应的 Handler 数量由参数 yarn.resourcemanager.resource-tracker.client.thread-count
配置,默认是 50。
4、客户端最大重试次数
由参数 ipc.client.connect.max.retries
配置,默认是 10。也就是会连续重试 10 次。
2 - YARN通信协议
RPC 协议是连接各个组件的 “大动脉”
。在 YARN 中,任何两个需要相互通信的组件之间只有一个 RPC 协议,而对于任何一个 RPC 协议,通信双方有一端是 Client,另一端是 Server,且总是 Client 主动连接 Server 的。因此,YARN 实际上采用的是拉模式(pull-based)通信模型。如下图所示:
YARN 主要由以下几个 RPC 协议组成:
- ApplicationClientProtocol:JobClient(作业提交客户端)与 RM 之间的协议。JobClient 通过该 RPC 协议提交应用程序、 查询应用程序状态等。
- ResourceTrackerProtocol:NM 与 RM 之间的协议。NM 通过该 RPC 协议向 RM 注册,并定时发送心跳信息,汇报当前节点的资源使用情况和 Container 运行情况。
- ApplicationMasterProtocol:AM 与 RM 之间的协议。AM 通过该 RPC 协议向 RM 注册和撤销自己,并为各个任务申请资源。
- ContainerManagementProtocol:AM 与 NM 之间的协议。 AM 通过该 RPC 要求 NM 启动或者停止 Container,获取各个 Container 的使用状态等信息。
- ResourceManagerAdministrationProtocol:Admin 与 RM 之间的通信协议。Admin 通过该 RPC 协议更新系统配置文件。例如:节点黑白名单、用户队列权限等。
- HAServiceProtocol:Active RM 和 Standby RM 之间的通信协议。提供状态监控和 Failover 的 HA 服务。
- TaskUmbilicalProtocol:YarnChild 和 MRAppMaster 之间的通信协议。用于 MRAppMaster 监控跟踪 YarnChild 的运行状态,YarnChild 向 MRAppMaster 拉取 - Task 任务信息。
- MRClientProtocol:JobClient 和 AM 之间的通信协议。用于客户端拉取应用程序的执行状态,以及应用程序返回执行结果给 JobClient。
- ApplicationHistoryProtocol:JobClient 和 JobHistory Server 之间的通信协议。用于获取已完成应用程序的信息等。
3 - YARN Service工作机制
对于生命周期较长的对象,使用服务的对象管理模型
进行管理。该模型主要特点如下:
- 将每个被服务化的对象分为 4 个状态:NOTINITED(被创建)、INITED(已初始化)、STARTED(已启动)、STOPPED(已停止)。
- 任何服务状态变化都可以触发另外一些动作。
- 可通过组合的方式对任意服务进行组合,以便进行统一管理。也就是说,一个父 Service 可能会有多个子 Service。
3.1 - YARN 服务模型的类图
YARN 中关于服务模型的类图位于包 org.apache.hadoop.service
中,如下图所示:
在 YARN 中,会有非常多的服务对象,且都实现了接口 Service,定义了服务初始化、启动、停止等操作。YARN 中所有对象,如果是组合服务,直接继承 CompositeService
类,否则继承 AbstractService
类。如下图所示:
ResourceManager 是一个组合服务,包括 ClientRMService、ApplicationMasterLauncher、ApplicationMasterService 等服务对象。
NodeManager 也属于组合服务,它们内部包含多个单一服务和组合服务,以实现对内部多种服务的统一管理。
3.2 - Service 的定义
public interface Service extends Closeable { public enum STATE { NOTINITED(0, "NOTINITED"), INITED(1, "INITED"), STARTED(2, "STARTED"), STOPPED(3, "STOPPED"); } // 服务初始化 void init(Configuration config); // 服务启动 void start(); // 服务停止 void stop(); // 服务关闭 void close() throws IOException;}
4 - YARN AsyncDispatcher事件模型
4.1 - 事件处理模型
YARN 采用了事件驱动的并发模型
,其核心服务是一个中央异步调度器(AsyncDispatcher)。包括 ResourceManager、NodeManager、MRAppMaster 等,它们共同维护了一个事件(Event)与事件处理器(EventHandler)的映射表,用来处理各个事件。其事件处理模型如下图所示:
并发处理流程包括 5 个步骤:
1、各业务类型的处理请求以 Event 的形式提交到事件队列(Event Queue)中;
2、AsyncDispatcher 创建 HandlerThread 线程消费事件队列,并将 Event 传递给对应的 EventHandler;
3、该 EventHandler 可能将 Event 转发给另外一个 EventHandler,也有可能转发给带有有限状态机(StateMachine)的 EventHandler;
4、将 StateMachine 的处理结果以 Event 的形式输出到 AsyncDispatcher;
5、如果有新的 Event 会再次被 AsyncDispatcher 转发给下一个 EventHandler,直至处理完成(达到终止条件)。
例如: MRAppMaster 内部包含一个中央异步调度器(AsyncDispatcher),并注册了 TaskAttemptEvent/TaskAttemptImpl
、TaskEvent/TaskImpl
、JobEvent/JobImpl
等一系列事件/事件处理器
,由中央异步调度器统一管理和调度。
4.2 - 事件与事件处理器
通过引入服务化和事件驱动的设计思想
,使得 YARN 具有低耦合、高内聚的特点,各个模块只需要完成各自的功能,而模块之间则采用事件相互关联。事件与事件处理器的的类图位于包 org.apache.hadoop.yarn.event
中,如下图所示:
ResourceManager 内部事件与事件处理器交互图如下:
5 - YARN StateMachine 状态机
状态机(StateMachine)是由一组状态组成:
- 初始状态
- 中间状态
- 最终状态
当状态机从初始状态
开始运行,经过一系列中间状态
后,到达最终状态
时退出。也就是说,在一个状态机中,每个状态都可以接收一组特定事件,并根据具体的事件类型转换到另一个状态。当状态机转换到最终状态时,则退出。
5.1 - 状态机转换方式
在 YARN 中,每种状态转换(doTransition()
方法执行状态转换,addTransition()
方法注册状态转换)由一个四元组表示,分别是:
- 转换前状态(preState)
- 转换后状态(postState)
- 事件(event)
- 回调函数(hook)
YARN 定义了三种状态转换方式,具体如下:
1、一个初始状态、一个最终状态、一种事件
该方式表示经过处理之后,无论如何,进入到一个唯一状态。
初始状态:最终状态:事件 = 1:1:1
2、 一个初始状态、多个最终状态、一种事件
该方式表示不同的逻辑处理结果,可能导致进入不同的状态。
初始状态:最终状态:事件 = 1:N:1
3、一个初始状态、一个最终状态、多种事件
该方式表示多个不同的事件,可能触发到多个不同状态的转换。
初始状态:最终状态:事件 = 1:1:N
5.2 - 状态机类
YARN 实现了一个非常简单的状态机库,在 org.apache.hadoop.yarn.state
包中。
YARN 对外提供了一个状态机工厂 StatemachineFactory
,它提供多种 addTransition()
方法供用户添加各种状态转移,一旦状态机添加完毕后,可通过调用 installTopology()
完成一个状态机的构建。如下图所示:
5.3 - 状态机可视化
YARN 中实现了多个状态机对象,包括:
- ResourceManager 中的 RMAppImpl、RMAppAttemptImpl、RMContainerImpl 和 RMNodeImpl 等。
- NodeManager 中的 ApplicationImpl、ContainerImpl 和 LocalizedResource 等。
- MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等。
为了便于查看这些状态机的状态变化以及相关事件,YARN 提供了一个状态机可视化工具,具体操作步骤如下:
1、将状态机转化为 graphviz(.gv)
格式的文件,在源代码根目录下进行编译
[root@hadoop-01 hadoop-2.10.1-src]# mvn compile -Pvisualize
生成 3 个 *.gv
文件:
[root@hadoop-01 hadoop-2.10.1-src]# ls -l *.gv-rw-r--r-- 1 root root 16698 Sep 10 09:37 MapReduce.gv-rw-r--r-- 1 root root 12075 Sep 10 09:35 NodeManager.gv-rw-r--r-- 1 root root 14641 Sep 10 09:35 ResourceManager.gv
2、使用可视化包 graphviz
中的相关命令生成状态机图
[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng ResourceManager.gv > ResourceManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng NodeManager.gv > NodeManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng MapReduce.gv > MapReduce.png
如果尚未安装 graphviz
包,操作该步骤之前先要安装该包,Centos-7.x 安装命令如下:
[root@hadoop-01 hadoop-2.10.1-src]# yum install graphviz
ResourceManager 状态机如下图所示:
NodeManager 状态机如下图所示:
MapReduce 状态机如下图所示:
每一个状态机,其实本身也是一个事件处理器(EventHandler)。
::: hljs-center
扫一扫,我们的故事就开始了。
:::