探讨Dolphinscheduler之Master高可用
Master高可用
Master作为DolphinScheduler中处理工作流的核心组件,其可用性直接关系到整个系统的稳定性。
由于Master并不像API-Server一样只是被动的接收外界的请求,Master会主动的消费数据库中的工作流,而一个工作流在某一时刻只能被一个Master处理,因此Master在横向扩容的时候需要考虑的问题更多。
一种比较简单的方案是采用active-standby的方式,即部署多台Master服务,但是只有一台处于active状态,对外工作,其他Master服务都处于standby状态,只有等active的Master宕机,standby状态的Master会重新选举出一台新的active Master对外工作。

这种方案实现起来简单,同时可以很好的解决Master单点问题,但是这种active-standby的架构同一时刻只能有一台Master进行工作,对于DolphinScheduler来说,由于Master需要处理工作流的调度,因此这会导致整个集群的工作流处理吞吐量上不去。
在DolphinScheduler中采用分片的方式对工作流元数据进行了预划分,具体来说对工作流产生的command根据id进行分片,将command均匀的分散到所有的Master,这样来达到所有Master都可以同时工作,并且不会互相影响。
Master通过注册中心来感知集中其他Master的节点信息,由于当节点上下线的时候,Master的元数据变更通知到所有Master服务时间会不一致,因此通过数据库事务做了进一步的保障,保证同一个Command只会被处理一次。

缓存策略
MasterServer 调度过程中,有大量的数据库读操作,例如t_ds_user、t_ds_tenant、t_ds_process_definition、t_ds_task_definition表等,考虑到这部分业务数据是读多写少的场景,开发者引入缓存机制,一方面减少DB读压力,另一方面加快核心调度流程;
- 缓存管理:采用 caffeine,可调整缓存相关配置,例如缓存大小、过期时间等;
- 缓存读取:采用 spring-cache 机制,可直接在Spring配置文件中决定是否开启(默认关闭),配置在相关的 Java Mapper 层;
- 缓存刷新:通过 AOP 切面 @CacheEvict 监听 ApiServer 接口的业务数据更新,当有数据更新时会通过 Netty 发送 CacheExpireCommand 请求通知 MasterServer 进行缓存驱逐。
4.4任务分发
4.4.1、分片机制
分片策略是为了保证密集调度的高效性,以及解决任务重复分发执行的问题。调度密集或者耗时任务可能会导致任务阻塞,在分布式集群场景下,调度组件会小概率重复分发,针对这种情况,通常结合 “单机路由策略(如:一致性哈希)” + “阻塞策略(如:丢弃后续调度)” 来规避,最终避免任务重复执行;
无论是用户手动触发,还是定时调度器触发的工作流任务,都会先封装成命令并持久化至元数据DB中,随后等待MasterServer分发调度,MasterServer中的MasterSchedulerBootstrap线程会每隔一段时间扫描Command表,取出命令、封装后投放至任务队列,等待线程消费;
由于采用去中心化的设计思想,DolphinScheduler集群会有一定数量的MasterServer节点在同时工作,意味着同一时刻可能会有多个MasterServer节点在扫描Command表,如果多个MasterServer都取到同一条Command则会导致工作流任务被执行若干次,这显然是不合理的,为了保证单条命令只能由一个MasterServer接管,开发者设计了分片机制,原理比较简单,MasterServer从Command表分页获取满足 Id % MasterCount = MasterSlotId 的记录行,其中:
- Id:Command表中的记录ID;
- MasterCount:分片总数,成功注册在ZooKeeper的MasterServer总数;
- MasterSlotId:分片序号,当前MasterServer在ZooKeeper的位置索引。
例如集群有3个MasterServer,按照分片策略,Command表记录会公平分配到每个MasterServer。值得说明的是,分片是以 MasterServer 为维度,动态扩容 MasterServer 以增加分片数量,在进行大数据量业务操作时可有效提升任务处理能力和速度:

下面思考一个问题,如何保证同一个命令只被一个MasterServer执行?在任务分片路由的过程中,假如 MasterServer 正在做水平扩缩,由于 MasterServer 的分片总数和分片索引发生变化,可能会导致同一个命令被分发至不同的 MasterServer 中,如下图例子,扩容了1台 MasterServer,id=6的命令根据哈希计算又分配给了MasterServer 3,为了避免同一个命令被重复执行,MasterServer 在领取到命令后,会通过数据库事务完成命令和工作流实例的转换、删除命令等操作,如果删除操作失败便回滚事务,意味着命令已经被其它MasterServer认领,则丢弃调度,这样即可保证同一个命令只能被一个MasterServer执行。
4.4.2、负载均衡策略
MasterServer将DAG任务下发至WorkerServer前,会根据负载均衡策略选出合适的WorkerServer节点,而负载均衡策略有如下三种:
- 加权随机(Random):随机选择一个节点;算法缺点是所有节点被访问到的概率是相同的,具有不可预测性,在一次完整的轮询中,有可能负载低的完全没被选中,而负载高的频繁被选中;
- 加权轮询(LowerWeight):默认策略。WorkerServer节点每隔一段时间向ZooKeeper上报心跳信息(包含cpuload、可用物理内存、启动时间、线程数量等信息),MasterServer分发任务时根据WorkerServer节点的CPU Load平均值、可用物理内存、系统平均负载、服务启动耗时计算节点权重值,值越大意味着节点负载越低,选中的优先级越高;算法缺点是在某些特殊的权重下,会生成不均匀的序列,这种不平滑的负载可能会导致节点出现瞬间高负载的现象,导致节点存在宕机风险;
- 平滑加权轮询(RoundRobin):节点宕机时降低有效权重值,节点正常时提高有效权重值;降权起到缓慢剔除宕机节点的效果,提权起到缓冲恢复宕机节点的效果。
所有的负载均衡算法都是基于WorkerServer节点的权重进行加权计算的,权重影响分发结果,考虑到JIT优化,Worker在启动后会低功率地运行一段时间(默认十分钟),随后逐渐达到最佳性能,此过程称为“JVM 预热”,预热期间WorkerServer节点的权重会缓慢动态调整,实现代码可参见 HostWeight 类。
MasterServer 的核心服务如下:
- Scheduler:分布式调度组件,主要负责Quartz定时任务的启动,当Quartz调度任务后,MasterServer内部任务线程池负责处理任务的后续操作;
- MasterRegistryClient:ZooKeeper客户端,封装了MasterServer与ZooKeeper相关的操作,例如注册、监听、删除、注销等;
- MasterConnectionStateListener:监听MasterServer和ZooKeeper连接状态,一旦断连则触发MasterServer的自杀逻辑;
- MasterRegistryDataListener:监听ZooKeeper的MasterServer临时节点事件,一旦发生节点移除事件,则先移除ZooKeeper上的临时节点,再触发MasterServer的故障转移(过程和FailoverExecuteThread一致);
- MasterSchedulerBootstrap:调度线程,每隔一段时间扫描DB,按照分片策略批量取出Command,封装成工作流任务执行线程(WorkflowExecuteThread),投放至缓冲队列中,等待下一个线程消费;
- FailoverExecuteThread:故障转移线程,每隔一段时间扫描DB,筛选出分配到故障节点的工作流实例,向WorkerServer发送TaskKillRequestCommand请求杀死运行中的任务;向Command表写入RECOVER_TOLERANCE_FAULT_PROCESS记录,等待MasterServer消费;
- EventExecuteService:工作流的执行线程,包含两部分:
- ProcessInstanceExecCacheManager:工作流实例的缓冲队列。MasterSchedulerBootstrap按照分片策略取出Command,封装成工作流实例执行线程(WorkflowExecuteThread)后投放;
- WorkflowExecuteThreadPool:从缓冲队列中取出WorkflowExecuteThread,并监听线程的执行情况(执行前先检查是否已经被其它线程启动);
- TaskPriorityQueueConsumer:任务队列消费线程,根据负载均衡算法将任务分发至Worker;
- TaskPluginManager:任务插件管理器,启动时会将TaskChannelFactory的所有实现类持久化到t_ds_plugin表中;因此,如果开发者需要自定义任务插件,只需集成实现TaskChannelFactory即可;
- MasterRPCServer:MasterServer RPC服务端,封装了Netty服务端创建等通用逻辑,并注册了各种消息处理器:
- CacheProcessor:接收来自ApiServer的CacheExpireCommand请求,强制刷新缓存;
- LoggerRequestProcessor:接收来自ApiServer的GetLogBytesRequestCommand、ViewLogRequestCommand、RollViewLogRequestCommand、RemoveTaskLogRequestCommand请求,操作日志;
- StateEventProcessor:接收StateEventChangeCommand请求,处理工作流实例/任务实例的状态变更,包括工作流实例/任务实例的提交成功、运行中、成功、失败、超时、杀死、准备暂停、暂停、准备停止、停止、准备阻塞、阻塞、故障转移等;
- TaskEventProcessor:接收TaskEventChangeCommand请求,处理任务实例的状态变更,包括:强制启动、唤醒;
- TaskKillResponseProcessor:接收来自WorkerServer的TaskKillResponseCommand请求,请求内容是杀死任务实例请求的响应结果;
- TaskExecuteRunningProcessor:接收来自WorkerServer的TaskExecuteRunningCommand请求,请求内容是任务实例的运行信息(工作流实例ID、任务实例ID、运行状态、执行机器信息、开始时间、程序运行目录、日志目录等)
- TaskExecuteResponseProcessor:接收来自WorkerServer的TaskExecuteResultCommand请求,请求内容是任务实例的运行结果信息(工作流实例ID、任务实例ID、开始时间、结束时间、运行状态、执行机器信息、程序运行目录、日志目录等);
- WorkflowExecutingDataRequestProcessor:接收来自ApiServer的WorkflowExecutingDataRequestCommand请求,向指定的WorkerServer查询执行中的工作流实例信息。