storm启动nimbus源码分析-nimbus.clj

时间:2023-03-09 16:41:53
storm启动nimbus源码分析-nimbus.clj

nimbus是storm集群的"控制器",是storm集群的重要组成部分。我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus。bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数:

nimbus函数
def nimbus(klass="backtype.storm.daemon.nimbus"):
   """Syntax: [storm nimbus]

Launches the nimbus daemon. This command should be run under
   supervision with a tool like daemontools or monit.

See Setting up a Storm cluster for more information.
   (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
   """
   cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
   jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
       "-Dlogfile.name=nimbus.log",
       "-Dlog4j.configuration=storm.log.properties",
   ]
   exec_storm_class(
       klass,
       jvmtype="-server",
       extrajars=cppaths,
       jvmopts=jvmopts)

klass参数的默认值为backtype.storm.daemon.nimbus,backtype.storm.daemon.nimbus标识一个java类。STORM_DIR标识storm的安装目录,cppaths集合存放了log4j配置文件路径和storm配置文件storm.yaml路径,jvmopts存放传递给jvm的参数,包括log4j配文件路径、storm.yaml路径、log4j日志名称和log4j配置文件名称。

exec_storm_class函数的逻辑比较简单,具体实现如下:

exec_storm_class函数
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):  
   global CONFFILE  
   all_args = [  
       "java", jvmtype, get_config_opts(),  
       "-Dstorm.home=" + STORM_DIR,  
       "-Djava.library.path=" + confvalue("java.library.path", extrajars),  
       "-Dstorm.conf.file=" + CONFFILE,  
       "-cp", get_classpath(extrajars),  
   ] + jvmopts + [klass] + list(args)  
   print "Running: " + " ".join(all_args)  
   if fork:  
       os.spawnvp(os.P_WAIT, "java", all_args)  
   else:  
       os.execvp("java", all_args) # replaces the current process and never returns

get_config_opts()获取jvm的默认配置信息,confvalue("java.library.path", extrajars)获取storm使用的本地库JZMQ加载路径,get_classpath(extrajars)获取所有依赖jar包的完整路径,然后拼接一个java -cp命令运行klass的main方法。
klass默认值为backtype.storm.daemon.nimbus,所以exec_storm_class函数最终调用backtype.storm.daemon.nimbus类的main方法。

backtype.storm.daemon.nimbus类定义在nimbus.clj文件中,定义如下:

backtype.storm.daemon.nimbus类
(ns backtype.storm.daemon.nimbus
 (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
 (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
 (:import [org.apache.thrift.exception])
 (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
 (:import [java.nio ByteBuffer])
 (:import [java.io FileNotFoundException])
 (:import [java.nio.channels Channels WritableByteChannel])
 (:use [backtype.storm.scheduler.DefaultScheduler])
 (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
           Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
 (:use [backtype.storm bootstrap util])
 (:use [backtype.storm.config :only [validate-configs-with-schemas]])
 (:use [backtype.storm.daemon common])
 (:gen-class
   :methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]]))
   ...
   ;; 其他方法
   ...
   (defn -main []
 (-launch (standalone-nimbus)))

:gen-class指示Clojure生成Java类backtype.storm.daemon.nimbus,并且声明一个静态方法launch,launch方法接收一个实现backtype.storm.scheduler.INimbus接口的实例作为参数。launch函数的参数是由standalone-nimbus函数生成的。standalone-nimbus函数定义如下:返回一个实现INimbus接口的实例。

standalone-nimbus函数
(defn standalone-nimbus []
 ;; 实现INimbus接口
 (reify INimbus
   ;; prepare函数为空实现
   (prepare [this conf local-dir]
     )
   ;; allSlotsAvailableForScheduling获取所有可用的slot集合
   (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
     ;; supervisors标识集群所有supervisor的详细信息对象SupervisorDetails的集合
     (->> supervisors
          ;; 遍历supervisors,为supervisor的每个port生成对应的WorkerSlot对象,WorkerSlot包含两个属性节点id和port
          (mapcat (fn [^SupervisorDetails s]
                    (for [p (.getMeta s)]
                      (WorkerSlot. (.getId s) p))))
          set ))
   (assignSlots [this topology slots]
     )
   (getForcedScheduler [this]
     nil )
   ;; 获取supervisor主机名
   (getHostName [this supervisors node-id]
     (if-let [^SupervisorDetails supervisor (get supervisors node-id)]
       (.getHost supervisor)))
   ))

launch函数定义如下:

launch函数
()
                   (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
                   (.processor (Nimbus$Processor. service-handler))
                   )
      server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
   (add-shutdown-hook-with-force-kill-in-1-sec (fn []
                                                 (.shutdown service-handler)
                                                 (.stop server)))
   (log-message "Starting Nimbus server...")
   (.serve server)))

read-storm-config定义如下:

read-storm-config函数
(defn read-storm-config  
 []  
 ;; conf绑定storm集群配置信息
 (let [conf (clojurify-structure (Utils/readStormConfig))]  
   ;; validate-configs-with-schemas函数验证配置信息的正确性并删除不正确的配置信息
   (validate-configs-with-schemas conf)  
   conf))

read-storm-config函数调用了backtype.storm.utils.Utils类的静态方法readStormConfig,如下:

readStormConfig方法
public static Map readStormConfig() {  
       // 调用readDefaultConfig从defaults.yaml配置文件读取默认配置信息存入ret
       Map ret = readDefaultConfig();
       // 获取用户自定义配置文件路径
       String confFile = System.getProperty("storm.conf.file");  
       Map storm;  
       if (confFile==null || confFile.equals("")) {  
           storm = findAndReadConfigFile("storm.yaml", false);  
       } else {
           // 读取用户自定义配置信息
           storm = findAndReadConfigFile(confFile, true);  
       }  
       // 将用户自定义的配置信息覆盖更新到ret中
       ret.putAll(storm);
       // 将命令行方式提供的配置信息覆盖更新到ret中
       ret.putAll(readCommandLineOpts());  
       // 返回覆盖更新后的配置信息ret
       return ret;  
   }

service-handler函数定义如下:

defserverfn是一个宏,(defserverfn service-handler [conf inimbus] ... )返回一个名字为service-handler函数。宏扩展是在编译时进行的

service-handler函数
(
                       (
                       ()
             (throw (InvalidTopologyException. "Number of executors must be greater than 0"))
             ))
         ;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:rebalance状态,:rebalance状态是一个"临时状态",最终修改topology状态为:rebalancing,:rebalancing状态为"持久状态"
         ;; 通过state-transitions函数我们可以知道只允许从:active和:inactive状态转换到:rebalance状态,并调用rebalance-transition函数,rebalance-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:rebalancing状态
         ;; 转换到:do-rebalance状态,并调用do-rebalance函数(重新设置topology的进程数和组件并行度,然后调用mk-assignments函数重新进行任务分配),然后将topology状态修改成:rebalancing的前一个状态
         (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
         ))
       ;; 激活topology,将topology状态修改成:active,处理过程与killTopologyWithOpts、rebalance相似
     (activate [this storm-name]
       (transition-name! nimbus storm-name :activate true)
       )
     ;; 将topology状态修改成:inactive,deactivate处理过程与activate相似
     (deactivate [this storm-name]
       (transition-name! nimbus storm-name :inactivate true))
     ;; beginFileUpload()函数获取nimbus存放jar的目录
     (beginFileUpload [this]
       (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
         (.put (:uploaders nimbus)
               fileloc
               (Channels/newChannel (FileOutputStream. fileloc)))
         (log-message "Uploading file from client to " fileloc)
         fileloc
         ))
     ;; 上传jar包文件
     (^void uploadChunk [this ^String location ^ByteBuffer chunk]
       (let [uploaders (:uploaders nimbus)
             ^WritableByteChannel channel (.get uploaders location)]
         (when-not channel
           (throw (RuntimeException.
                   "File for that location does not exist (or timed out)")))
         (.write channel chunk)
         (.put uploaders location channel)
         ))
     ;; 上传jar包完成,关闭Channel
     (^void finishFileUpload [this ^String location]
       (let [uploaders (:uploaders nimbus)
             ^WritableByteChannel channel (.get uploaders location)]
         (when-not channel
           (throw (RuntimeException.
                   "File for that location does not exist (or timed out)")))
         (.close channel)
         (log-message "Finished uploading file from client: " location)
         (.remove uploaders location)
         ))
     ;; 获取文件输入流
     (^String beginFileDownload [this ^String file]
       (let [is (BufferFileInputStream. file)
             id (uuid)]
         (.put (:downloaders nimbus) id is)
         id
         ))
     ;; 读取文件
     (^ByteBuffer downloadChunk [this ^String id]
       (let [downloaders (:downloaders nimbus)
             ^BufferFileInputStream is (.get downloaders id)]
         (when-not is
           (throw (RuntimeException.
                   "Could not find input stream for that id")))
         (let [ret (.read is)]
           (.put downloaders id is)
           (when (empty? ret)
             (.remove downloaders id))
           (ByteBuffer/wrap ret)
           )))
     ;; 获取storm集群配置信息
     (^String getNimbusConf [this]
       (to-json (:conf nimbus)))
     ;; 获取topology配置信息
     (^String getTopologyConf [this ^String id]
       (to-json (try-read-storm-conf conf id)))
     ;; 获取StormTopology
     (^StormTopology getTopology [this ^String id]
       (system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id)))

(^StormTopology getUserTopology [this ^String id]
       (try-read-storm-topology conf id))
           ;; 获取当前集群的汇总信息包括supervisor汇总信息,nimbus启动时间,所有活跃topology汇总信息
     (^ClusterSummary getClusterInfo [this]
       (let [storm-cluster-state (:storm-cluster-state nimbus)
                   ;; supervisor-infos绑定supervisor id->SupervisorInfo对象键值对的map
             ;; SupervisorInfo定义:(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])
             supervisor-infos (all-supervisor-info storm-cluster-state)
             ;; TODO: need to get the port info about supervisors...
             ;; in standalone just look at metadata, otherwise just say N/A?
             ;; 根据SupervisorInfo数据创建SupervisorSummary数据
             supervisor-summaries (dofor [[id info] supervisor-infos]
                                         (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
                                               ]
                                           (SupervisorSummary. (:hostname info)
                                                               (:uptime-secs info)
                                                               (count ports)
                                                               (count (:used-ports info))
                                                               id )
                                           ))
             ;; nimbus-uptime绑定nimbus启动时间                              
             nimbus-uptime ((:uptime nimbus))
             ;; bases绑定集群上所有活跃topology的StormBase数据集合
             bases (topology-bases storm-cluster-state)
             ;; topology-summaries绑定活跃topology的TopologySummary数据
             topology-summaries (dofor [[id base] bases]
                                       (let [assignment (.assignment-info storm-cluster-state id nil)]
                                         (TopologySummary. id
                                                           (:storm-name base)
                                                           (->> (:executor->node+port assignment)
                                                                keys
                                                                (mapcat executor-id->tasks)
                                                                count)
                                                           (->> (:executor->node+port assignment)
                                                                keys
                                                                count)                                                            
                                                           (->> (:executor->node+port assignment)
                                                                vals
                                                                set
                                                                count)
                                                           (time-delta (:launch-time-secs base))
                                                           (extract-status-str base))
                                         ))]
         ;; 创建ClusterSummary数据
         (ClusterSummary. supervisor-summaries
                          nimbus-uptime
                          topology-summaries)
         ))
     ;; 获取指定storm-id的topology的TopologyInfo数据
     (^TopologyInfo getTopologyInfo [this ^String storm-id]
       ;; storm-cluster-state绑定StormClusterState对象
       (let [storm-cluster-state (:storm-cluster-state nimbus)
             ;; task->component绑定任务id->组件名称键值对的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
             task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id))
             ;; bases绑storm-id的StormBase
             base (.storm-base storm-cluster-state storm-id nil)
             ;; assignment绑定该topology的AssignmentInfo信息,(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
             assignment (.assignment-info storm-cluster-state storm-id nil)
             ;; beats绑定该topology所有executor-id->心跳信息的map
             beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
             ;; all-components绑定该topology所有component-id集合
             all-components (-> task->component reverse-map keys)
             ;; errors绑定component-id->组件错误信息的map
             errors (->> all-components
                         (map (fn [c] [c (get-errors storm-cluster-state storm-id c)]))
                         (into {}))
             ;; executor-summaries绑定ExecutorSummary集合
             executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
                                       (let [host (-> assignment :node->host (get node))
                                             heartbeat (get beats executor)
                                             stats (:stats heartbeat)
                                             stats (if stats
                                                     (stats/thriftify-executor-stats stats))]
                                         (doto
                                             (ExecutorSummary. (thriftify-executor-id executor)
                                                               (-> executor first task->component)
                                                               host
                                                               port
                                                               (nil-to-zero (:uptime heartbeat)))
                                           (.set_stats stats))
                                         ))
             ]
         ;; 创建TopologyInfo对象
         (TopologyInfo. storm-id
                        (:storm-name base)
                        (time-delta (:launch-time-secs base))
                        executor-summaries
                        (extract-status-str base)
                        errors
                        )
         ))
     
     Shutdownable
     (shutdown [this]
       (log-message "Shutting down master")
       (cancel-timer (:timer nimbus))
       (.disconnect (:storm-cluster-state nimbus))
       (.cleanup (:downloaders nimbus))
       (.cleanup (:uploaders nimbus))
       (log-message "Shut down master")
       )
     DaemonCommon
     (waiting? [this]
       (timer-waiting? (:timer nimbus))))))

nimbus-data函数定义如下:

nimbus-data函数
()
    "Error when processing an event")
                                ))
    ;; 由mk-scheduler函数创建scheduler调度器,通过分析mk-scheduler函数,可以发现在没有配置用户自定义的scheduler情况下,mk-scheduler函数默认返回DefaultScheduler,mk-scheduler函数参见其定义部分
    :scheduler (mk-scheduler conf inimbus)
    }))

mk-scheduler函数定义如下:

mk-scheduler函数
(defn mk-scheduler [conf inimbus]
 ;; 当前版本getForcedScheduler函数返回nil
 (let [forced-scheduler (.getForcedScheduler inimbus)
       ;; scheduler绑定IScheduler接口的实现
   ;; cond等价于java中的switch,我们可以发现首先检查forced-scheduler,如果forced-scheduler为nil,则检查是否有用户自定义的scheduler,如果没有则
   ;; 使用默认的DefaultScheduler
       scheduler (cond
                   forced-scheduler
                   (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
                       forced-scheduler)
   
                   (conf STORM-SCHEDULER)
                   (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
                       (-> (conf STORM-SCHEDULER) new-instance))
   
                   :else
                   (do (log-message "Using default scheduler")
                       (DefaultScheduler.)))]
   ;; 先调用prepare函数
   (.prepare scheduler conf)
   ;; 然后返回scheduler
   scheduler
   ))

cleanup-corrupt-topologies!函数定义如下:

cleanup-corrupt-topologies!函数
(defn cleanup-corrupt-topologies! [nimbus]
 ;; 获取nimbus这个map中保存的StormCluterState实例
 (let [storm-cluster-state (:storm-cluster-state nimbus)
       ;; code-ids绑定了nimbus服务器上{storm.local.dir}/nimbus/stormdist/目录下所有子目录的名称,即提交给nimbus的所有topology的id
       code-ids (set (code-ids (:conf nimbus)))
   ;; active-topologies绑定zookeeper上/storms/目录中所有文件名称,即当前storm集群上正在运行的topology的id
       active-topologies (set (.active-storms storm-cluster-state))
   ;; corrupt-topologies绑定active-topologies和code-ids的差集,即当前正在运行的,但丢失jar包、topology信息和配置信息的topology的id
       corrupt-topologies (set/difference active-topologies code-ids)]
   ;; 将id包含在corrupt-topologies集合的topology的分配信息从zookeeper的/assignments目录删除,同时将StormBase信息从zookeeper的/storms目录删除
   (doseq [corrupt corrupt-topologies]
     (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
     (.remove-storm! storm-cluster-state corrupt)
     )))

transition!函数定义如下:

transition!函数的作用十分重要,负责topology状态转换,在启动nimbus场景下,event的值为":startup"关键字,error-on-no-transition?的值为false。transition!函数有两个重载版本。

transition!函数
(defn transition!
 ([nimbus storm-id event]
    (transition! nimbus storm-id event false))
 ([nimbus storm-id event error-on-no-transition?]
    ;; 加锁
    (locking (:submit-lock nimbus)
      ;; system-events绑定一个集合#{:startup}
      (let [system-events #{:startup}
            ;; 在启动nimbus场景下,event绑定[:startup],event-args为nil
            [event & event-args] (if (keyword? event) [event] event)
            ;; 从zookeeper上获取topology的状态,一个map对象,绑定到status上
            status (topology-status nimbus storm-id)]
        ;; handles the case where event was scheduled but topology has been removed
        (if-not status
          ;; 如果status为nil则记录日志,transition!函数执行结束
          (log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
          ;; 如果status不为nil,get-event绑定一个函数
          (let [get-event (fn [m e]
                            (if (contains? m e)
                              (m e)
                              (let [msg (str "No transition for event: " event
                                             ", status: " status,
                                             " storm-id: " storm-id)]
                                (if error-on-no-transition?
                                  (throw-runtime msg)
                                  (do (when-not (contains? system-events event)
                                        (log-message msg))
                                      nil))
                                )))
                ;; state-transitions函数返回一个状态转换映射map,这个map中规定了由一种状态可以转换到哪些状态,并且在状态转换后执行哪些处理(即调用哪个函数),参见其定义部分
                ;; 通过分析state-transitions函数,我们可以发现只有当topology的当前状态为":killed"和":rebalancing"时,才允许转换到":startup"状态,如果当前状态是其他状态,transition将为nil
                ;; 我们先讨论其他状态,这时transition为nil,接着transition通过if判断将绑定一个(fn [] nil)函数,这样new-status将为nil。所以在启动nimbus场景下,topology由其他状态转换到":startup"状态时,transition!函数什么都没做
                transition (-> (state-transitions nimbus storm-id status)
                               (get (:type status))
                               (get-event event))
                transition (if (or (nil? transition)
                                   (keyword? transition))
                             (fn [] transition)
                             transition)
                new-status (apply transition event-args)
                new-status (if (keyword? new-status)
                             {:type new-status}
                             new-status)]
            (when new-status
              (set-topology-status! nimbus storm-id new-status)))))
      )))

1、如果topology由":killed"转换到":startup"(kill topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定

(fn [] (delay-event nimbus
                   storm-id
                   (:kill-time-secs status)
                   :remove)
   nil)

new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :remove false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":killed"转换到":remove", 调用函数

(fn []
   (log-message "Killing topology: " storm-id)
   ;; 删除zookeeper上该topology的StormBase信息和分配信息
   (.remove-storm! (:storm-cluster-state nimbus)
                   storm-id)
   nil)

2、如果topology由":rebalancing"转换到":startup"(rebalance topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定

(fn [] (delay-event nimbus
               storm-id
               (:delay-secs status)
               :do-rebalance)
   nil)

new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :do-rebalance false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":rebalancing"转换到":do-rebalance",调用函数

(fn []
  (do-rebalance nimbus storm-id status)
  (:old-status status))

由于这个函数返回:rebalancing状态的前一个状态,所以storm定时器所执行的定时任务会将topology的状态由:rebalancing修改成前一个状态。以上就是启动nimbus场景下,topology可能的状态转换处理过程。 delay-event函数定义如下:主要功能就是将#(transition! nimbus storm-id event false)函数作为"定时任务"添加到storm定时器中。

(defn delay-event [nimbus storm-id delay-secs event]
 (log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
 (schedule (:timer nimbus)
           delay-secs
           #(transition! nimbus storm-id event false)
           ))

state-transitions函数定义如下:

state-transitions函数
(defn state-transitions [nimbus storm-id status]
 {:active {:inactivate :inactive            
           :activate nil
           :rebalance (rebalance-transition nimbus storm-id status)
           :kill (kill-transition nimbus storm-id)
           }
  :inactive {:activate :active
             :inactivate nil
             :rebalance (rebalance-transition nimbus storm-id status)
             :kill (kill-transition nimbus storm-id)
             }
  :killed {:startup (fn [] (delay-event nimbus
                                        storm-id
                                        (:kill-time-secs status)
                                        :remove)
                            nil)
           :kill (kill-transition nimbus storm-id)
           :remove (fn []
                     (log-message "Killing topology: " storm-id)
                     (.remove-storm! (:storm-cluster-state nimbus)
                                     storm-id)
                     nil)
           }
  :rebalancing {:startup (fn [] (delay-event nimbus
                                             storm-id
                                             (:delay-secs status)
                                             :do-rebalance)
                                nil)
                :kill (kill-transition nimbus storm-id)
                :do-rebalance (fn []
                                (do-rebalance nimbus storm-id status)
                                (:old-status status))
                }})

do-cleanup函数定义如下:

do-cleanup函数
(defn do-cleanup [nimbus]
 (let [storm-cluster-state (:storm-cluster-state nimbus)
       conf (:conf nimbus)
       submit-lock (:submit-lock nimbus)]
   ;; to-cleanup-ids绑定需要清理的topology的id,即不再活跃的topology的id,cleanup-storm-ids函数参见其定义部分
   (let [to-cleanup-ids (locking submit-lock
                          (cleanup-storm-ids conf storm-cluster-state))]
     (when-not (empty? to-cleanup-ids)
       (doseq [id to-cleanup-ids]
         (log-message "Cleaning up " id)
         ;; 从zookeeper上删除/workerbeats/{id}节点(清理其心跳信息)
         (.teardown-heartbeats! storm-cluster-state id)
         ;; 从zookeeper上删除/errors/{id}节点(清理其错误信息)
         (.teardown-topology-errors! storm-cluster-state id)
         ;; 从nimbus服务器上删除{storm.local.dir}/nimbus/stormdist/{id}目录(删除其jar包,topology信息,配置信息)
         (rmr (master-stormdist-root conf id))
         ;; 将该topology的心跳信息从nimbus的心跳缓存中删除
         (swap! (:heartbeats-cache nimbus) dissoc id))
       ))))

cleanup-storm-ids函数定义如下:

cleanup-storm-ids函数
(defn cleanup-storm-ids [conf storm-cluster-state]
 ;; heartbeat-ids绑定有心跳的topology的id集合
 (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
       ;; error-ids绑定有错误信息的topology的id集合
       error-ids (set (.error-topologies storm-cluster-state))
       ;; code-ids绑定在nimbus服务器上有jar包的topology的id集合
       code-ids (code-ids conf)
       ;; assigned-ids绑定当前活跃的topology的id集合
       assigned-ids (set (.active-storms storm-cluster-state))]
   ;; heartbeat-ids、error-ids、code-ids的并集再与assigned-ids做差集就是不活跃的topology的id
   (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
   ))

以上就是nimbus启动源码分析。