storm启动过程之源码分析

时间:2023-03-09 08:11:38
storm启动过程之源码分析

TopologyMaster:

处理拓扑的一些基本信息和工作,比如更新心跳信息,拓扑指标信息更新等
NimbusServer:
**
*
* NimbusServer work flow: 1. cleanup interrupted topology delete /storm-local-dir/nimbus/topologyid/stormdis delete /storm-zk-root/storms/topologyid
*
* 2. set /storm-zk-root/storms/topology stats as run
*
* 3. start one thread, every nimbus.monitor.reeq.secs set /storm-zk-root/storms/ all topology as monitor. when the topology's status is monitor, nimubs would
* reassign workers 4. start one threa, every nimubs.cleanup.inbox.freq.secs cleanup useless jar
*
* @author version 1: Nathan Marz version 2: Lixin/Chenjun version 3: Longda
*
*/
Nimbus在启动时先去读取配置文件,然后初始化follower线程,每个follower分别到zk注册,在Znode下创建临时节点记录已启动的时间,另外一个节点在初始化线程时,先后调用两个方法:(FollowerRunnable.java)
data.getStormClusterState().update_nimbus_slave(hostPort, data.uptime());
data.getStormClusterState().update_nimbus_detail(hostPort, null);
然后Nimbus会判断集群是否已经存在leader,如果不存在则经历一次选举过程,如果有,则去检查leader的配置并同步,过程可以参照笔记stormNimbus集群
在启动nimbus前初始化,初始化(NimbusServer.init())
1、清理一些中断了的topology(nimbus目录下/storm.local.dir/stormdist下存在,zk中storms/topologyid中不存在的topology): 删除ZK上相关信息(清理tasks/topologyid; storms/topologyid; assignments/topologyid这个三个目录)。
2、初始化任务分派会启动一个后台线程:TopologyAssign,下面分析一下这个后台线程怎样分配任务的:(doTopologyAssignment)
1、创建一个任务调度器(DefaultTopologyScheduler)
2、创建任务,将配置和topology的信息初始化TopologyAssignEvent,用默认的任务调度器指派任务(DefaultTopologyScheduler.assignTasks)
3、通过初始化过的TopologyAssignEvent初始化上下文(TopologyAssignContext context = prepareTopologyAssign(event) 
4、在prepareTopologyAssign方法中,获取当前所有在运行的supervisor                                             *****!!!!!非常重要的方法,深入研究
4、 指派任务时,先根据任务分派类型(Rebalance/new/monitor)判断需要处理的task: (TopologyAssign)
如果是new类型,则将context中缓存的所有task都加入任务列表;
如果是rebalance,将所有不在运行的task加入任务列表
如果是monitor,将所有死掉task加入任务列表
5、任务列表初始化成功后,获取topology节点的配置信息,然后在zk上创建节点,存放superVisor和对应host信息,worker信息等
因为所有的指令通过thrift的回调都通过ServiceHandler,当调用submitTopology时,会真正调用到submitTopologyWithOpts,代码逻辑基于thrift的框架开发
thread.setName("TopologyAssign");
thread.setDaemon(true);
thread.start();
分析一下submitTopologyWithOpts的执行过程和逻辑:
如果在pendingSubmitTopologys的队列中有同名的topology,则抛异常
// create /local-dir/nimbus/topologyId/xxxx files
setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology); // generate TaskInfo for every bolt or spout in ZK
// /ZK/tasks/topoologyId/xxx
setupZkTaskInfo(conf, topologyId, stormClusterState); // make assignments for a topology
LOG.info("Submit for " + topologyName + " with conf " + serializedConf);
makeAssignment(topologyName, topologyId, options.get_initial_status()); // when make assignment for a topology,so remove the topologyid form
// pendingSubmitTopologys
data.getPendingSubmitTopologys().remove(topologyId); // push start event after startup
StartTopologyEvent startEvent = new StartTopologyEvent();
this.data.getMetricRunnable().pushEvent(startEvent);
notifyTopologyActionListener(topologyName, "submitTopology");
将startEvent添加到队列之后,触发submitTopology的action. 
private void init(Map conf) throws Exception {

    NimbusUtils.cleanupCorruptTopologies(data);

    initTopologyAssign();

    initTopologyStatus();

    initCleaner(conf);

    serviceHandler = new ServiceHandler(data);

    if (!data.isLocalMode()) {

       //data.startMetricThreads();

        initMonitor(conf);

        initThrift(conf);

    }
}
Supervisor:
* Supevisor workflow 1. write SupervisorInfo to ZK
*
* 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2 release useless worker 2.3 assgin new task to /local-dir/supervisor/localstate
* 2.4 add one syncProcesses event
*
* 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless worker 3.2 start new worker
*
* 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write SupervisorInfo to ZK
1、supervisor清空临时目录和文件等,然后创建心跳对象,将supervisor_id,ip,端口等更新到心跳对象,然后调用update方法在zookeep上创建临时节点,用以保存supervisor的信息,在zk上注册完成后,创建supervisor的同步线程,同步线程中创建worker
supervisor启动worker过程:
主要类: SyncProcessEvent
public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) {
LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
lastTime = TimeUtils.current_time_secs();
try { /**
* Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
*/
if (localAssignments == null) {
localAssignments = new HashMap<>();
}
LOG.debug("Assigned tasks: " + localAssignments); /**
* Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
*/
Map<String, StateHeartbeat> localWorkerStats;
try {
localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
} catch (Exception e) {
LOG.error("Failed to get Local worker stats");
throw e;
}
LOG.debug("Allocated: " + localWorkerStats); /**
* Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
*/
Map<String, Integer> taskCleaupTimeoutMap;
Set<Integer> keepPorts = null;
try {
taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
} catch (IOException e) {
LOG.error("Failed to kill workers", e);
} // check new workers
checkNewWorkers(conf); // check which topology need update
checkNeedUpdateTopologys(localWorkerStats, localAssignments); // start new workers
startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); } catch (Exception e) {
LOG.error("Failed Sync Process", e);
// throw e
} }
TopologySubmitter: