twitter storm源码走读之3--topology提交过程分析

时间:2021-09-17 23:37:30

概要

storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配。除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色。supervisor作为中层*,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下达新的工作。作为车间主任,supervisor领到的活是不用自己亲力亲为去作的,他手下有着一班的普通工人。supervisor对这些工人只会喊两句话,开工,收工。注意,讲收工的时候并不意味着worker手上的活已经干完了,只是进入休息状态而已。

topology的提交过程涉及到以下角色。

  • storm client   负责将用户创建的topology提交到nimbus
  • nimbus        通过thrift接口接收用户提交的topology
  • supervisor       根据zk接口上提示的消息下载最新的任务安排,并负责启动worker
  • worker            worker内可以运行task,这些task要么属于bolt类型,要么属于spout类型
  • executor         executor是一个个运行的线程,同一个executor内可以运行同一种类型的task,即一个线程中的task要么全部是bolt类型,要么全部是spout类型

一个worker等同于一个进程,一个executor等同于一个线程,同一个线程中能够运行一或多个tasks。在0.8.0版之前,一个task是对应于一个线程的,在0.8.0版本中引入了executor概念,变化引入之后,task与thread之间的一一对应关系就取消了,同时在zookeeper server中原本存在的tasks-subtree也消失了,有关这个变化,可以参考http://storm-project.net/2012/08/02/storm080-released.html

storm client

storm client需要执行下面这句指令将要提交的topology提交给storm cluster 假设jar文件名为storm-starter-0.0.1-snapshot-standalone.jar,启动程序为 storm.starter.ExclamationTopology,给这个topology起的名称为exclamationTopology.

#./storm jar $HOME/working/storm-starter/target/storm-starter-0.0.-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamationTopology

这么短短的一句话对于storm client来说,究竟意味着什么呢? 源码面前是没有任何秘密可言的,那好打开storm client的源码文件

def jar(jarfile, klass, *args):
"""Syntax: [storm jar topology-jar-path class ...] Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
args=args,
jvmopts=["-Dstorm.jar=" + jarfile])
def exec_storm_class(klass, jvmtype="-server", jvmopts=[],
extrajars=[], args=[], fork=False):
global CONFFILE
all_args = [
"java", jvmtype, get_config_opts(),
"-Dstorm.home=" + STORM_DIR,
"-Djava.library.path=" + confvalue("java.library.path", extrajars),
"-Dstorm.conf.file=" + CONFFILE,
"-cp", get_classpath(extrajars),
] + jvmopts + [klass] + list(args)
print "Running: " + " ".join(all_args)
if fork:
os.spawnvp(os.P_WAIT, "java", all_args)
else:
os.execvp("java", all_args) # replaces the current process and
never returns

exec_storm_class说白了就是要运行传进来了的WordCountTopology类中main函数,再看看main函数的实现

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config();
conf.setDebug(true); if (args != null && args.length > 0) {
conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
}

对于storm client侧来说,最主要的函数StormSubmitter露出了真面目,submitTopology才是我们真正要研究的重点。

public static void submitTopology(String name, Map stormConf,
StormTopology topology, SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException
{
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
try {
String serConf = JSONValue.toJSONString(stormConf);
if(localNimbus!=null) {
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
if(topologyNameExists(conf, name)) {
throw new RuntimeException("Topology with name `"
+ name
+ "` already exists on cluster");
}
submitJar(conf);
try {
LOG.info("Submitting topology " + name
+ " in distributed mode with conf " + serConf);
if(opts!=null) {
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, submittedJar, serConf, topology);
}
} catch(InvalidTopologyException e) {
LOG.warn("Topology submission exception", e);
throw e;
} catch(AlreadyAliveException e) {
LOG.warn("Topology already alive exception", e);
throw e;
} finally {
client.close();
}
}
LOG.info("Finished submitting topology: " + name);
} catch(TException e) {
throw new RuntimeException(e);
}
}

submitTopology函数其实主要就干两件事,一上传jar文件到storm cluster,另一件事通知storm cluster文件已经上传完毕,你可以执行某某某topology了.

先看上传jar文件对应的函数submitJar,其调用关系如下图所示

twitter storm源码走读之3--topology提交过程分析

再看第二步中的调用关系,图是我用tikz/pgf写的,生成的是pdf格式。

twitter storm源码走读之3--topology提交过程分析

在上述两幅调用关系图中,处于子树位置的函数都曾在storm.thrift中声明,如果此刻已经忘记了的点话,可以翻看一下前面1.3节中有关storm.thrift的描述。client侧的这些函数都是由thrift自动生成的。

由于篇幅和时间的关系,在storm client侧submit topology的时候,非常重要的函数还有TopologyBuilder.java中的源码。

nimbus

storm client侧通过thrift接口向nimbus发送了了jar并且通过预先定义好的submitTopologyWithOpts来处理上传的topology,那么nimbus是如何一步步的进行文件接收并将其任务细化最终下达给supervisor的呢。

submitTopologyWithOpts

一切还是要从thrift说起,supervisor.clj中的service-handler具体实现了thrift定义的Nimbus接口,代码这里就不罗列了,太占篇幅。主要看其是如何实现submitTopologyWithOpts

(^void submitTopologyWithOpts
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
^SubmitOptions submitOptions]
(try
(assert (not-nil? submitOptions))
(validate-topology-name! storm-name)
(check-storm-active! nimbus storm-name false)
(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
storm-name
(from-json serializedConf)
topology)
(swap! (:submitted-count nimbus) inc)
(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
storm-conf (normalize-conf
conf
(-> serializedConf
from-json
(assoc STORM-ID storm-id)
(assoc TOPOLOGY-NAME storm-name))
topology)
total-storm-conf (merge conf storm-conf)
topology (normalize-topology total-storm-conf topology)
topology (if (total-storm-conf TOPOLOGY-OPTIMIZE)
(optimize-topology topology)
topology)
storm-cluster-state (:storm-cluster-state nimbus)]
(system-topology! total-storm-conf topology) ;; this validates the structure of the topology
(log-message "Received topology submission for " storm-name " with conf " storm-conf)
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus)
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
(start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
(mk-assignments nimbus)))
(catch Throwable e
(log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
(throw e))))

storm cluster在zookeeper server上创建的目录结构。目录结构相关的源文件是config.clj.

白话一下上面这个函数的执行逻辑,对上传的topology作必要的检测,包括名字,文件内容及格式,好比你进一家公司上班之前做的体检。这些工作都完成之后进入关键区域,是进入关键区域所以上锁,呵呵。

normalize-topology

(defn all-components [^StormTopology topology]
(apply merge {}
(for [f thrift/STORM-TOPOLOGY-FIELDS]
(.getFieldValue topology f)
)))

一旦列出所有的components,就可以读出这些component的配置信息。

mk-assignments

在这关键区域内执行的重点就是函数mk-assignments,mk-assignment有两个主要任务,第一是计算出有多少task,即有多少个spout,多少个bolt,第二就是在刚才的计算基础上通过调用zookeeper应用接口,写入assignment,以便supervisor感知到有新的任务需要认领。

先说第二点,因为逻辑简单。在mk-assignment中执行如下代码在zookeeper中设定相应的数据以便supervisor能够感知到有新的任务产生

(doseq [[topology-id assignment] new-assignments
:let [existing-assignment (get existing-assignments topology-id)
topology-details (.getById topologies topology-id)]]
(if (= existing-assignment assignment)
(log-debug "Assignment for " topology-id " hasn't changed")
(do
(log-message "Setting new assignment for topology id " topology-id ": "
(pr-str assignment))
(.set-assignment! storm-cluster-state topology-id assignment)
)))

调用关系如下图所示
twitter storm源码走读之3--topology提交过程分析

而第一点涉及到的计算相对繁杂,需要一一仔细道来。其实第一点中非常重要的课题就是如何进行任务的分发,即scheduling.
也许你已经注意到目录src/clj/backtype/storm/scheduler,或者注意到storm.yaml中与scheduler相关的配置项。那么这个scheduler到底是在什么时候起作用的呢。mk-assignments会间接调用到这么一个名字看起来奇怪异常的函数。compute-new-topology->executor->node+por,也就是在这么很奇怪的函数内,scheduler被调用

_ (.schedule (:scheduler nimbus) topologies cluster)
new-scheduler-assignments (.getAssignments cluster)
;; add more information to convert SchedulerAssignment to Assignment
new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]

schedule计算出来的assignments保存于Cluster.java中,这也是为什么new-scheduler-assignment要从其中读取数据的缘由所在。有了assignment,就可以计算出相应的node和port,其实就是这个任务应该交由哪个supervisor上的worker来执行。

storm在zookeeper server上创建的目录结构如下图所示

twitter storm源码走读之3--topology提交过程分析

有了这个目录结构,现在要解答的问题是在topology在提交的时候要写哪几个目录?assignments目录下会新创建一个新提交的topology的目录,在这个topology中需要写的数据,其数据结构是什么样子?

supervisor

一旦有新的assignment被写入到zookeeper中,supervisor中的回调函数mk-synchronize-supervisor立马被唤醒执行

主要执行逻辑就是读入zookeeper server中新的assignments全集与已经运行与本机上的assignments作比较,区别出哪些是新增的。在sync-processes函数中将运行具体task的worker拉起。

要想讲清楚topology提交过程中,supervisor需要做哪些动作,最主要的是去理解下面两个函数的处理逻辑。

  • mk-synchronize-supervisor  当在zookeeper server的assignments子目录内容有所变化时,supervisor收到相应的notification, 处理这个notification的回调函数即为mk-synchronize-supervisor,mk-sychronize-supervisor读取所有的assignments即便它不是由自己处理,并将所有assignment的具体信息读出。尔后判断分析出哪些assignment是分配给自己处理的,在这些分配的assignment中,哪些是新增的。知道了新增的assignment之后,从nimbus的相应目录下载jar文件,用户自己的处理逻辑代码并没有上传到zookeeper server而是在nimbus所在的机器硬盘上。
  • sync-processes mk-synchronize-supervisor预处理过完与assignment相关的操作后,将真正启动worker的动作交给event-manager, event-manager运行在另一个独立的线程中,这个线程中进行处理的一个主要函数即sync-processes. sync-processes会将当前运行着的worker全部kill,然后指定新的运行参数,重新拉起worker.
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
;;read assignments from zookeeper
all-assignment (read-assignments
assignments-snapshot
(:assignment-id supervisor))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
;;task在assignment中
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
(log-debug "All assignment: " all-assignment)
(log-debug "New assignment: " new-assignment) ;; download code first
;; This might take awhile
;; - should this be done separately from usual monitoring?
;; should we only download when topology is assigned to this supervisor?
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
(log-message "Downloading code for storm id "
storm-id
" from "
master-code-dir)
(download-storm-code conf storm-id master-code-dir)
(log-message "Finished downloading code for storm id "
storm-id
" from "
master-code-dir)
)) (log-debug "Writing new assignment "
(pr-str new-assignment))
(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))]
(.killedWorker isupervisor (int p)))
(.assigned isupervisor (keys new-assignment))
(.put local-state
LS-LOCAL-ASSIGNMENTS
new-assignment)
(reset! (:curr-assignment supervisor) new-assignment)
;; remove any downloaded code that's no longer assigned or active
;; important that this happens after setting the local assignment so that
;; synchronize-supervisor doesn't try to launch workers for which the
;; resources don't exist
(doseq [storm-id downloaded-storm-ids]
(when-not (assigned-storm-ids storm-id)
(log-message "Removing code for storm id "
storm-id)
(rmr (supervisor-stormdist-root conf storm-id))
))
(.add processes-event-manager sync-processes)
)))

注意加亮行
assignments-snapshot是去zookeeper server中的assignments子目录读取所有的topology-ids及其内容,会使用zk/get-children及zk/get-data原语。调用关系如下

assignments-snapshot-->assignment-info-->clusterstate/get-data-->zk/get-data

代码下载 (download-storm-code conf storm-id master-code-dir),storm client将代码上传到nimbus,nimbus将其放到自己指定的目录,这个目录结构在nimbus所在机器的文件系统上可以找到。supervisor现在要做的事情就是去将nimbus上的代码下载复制到本地。

(.add processes-event-manager sync-processes) 添加事件到event-manager,event-manager是一个独立运行的线程,新添加的事件处理函数为sync-processes, sync-processes的主要功能在本节开始处已经描述。

(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
keepers (filter-val
(fn [[state _]] (= state :valid))
allocated)
keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
new-worker-ids (into
{}
(for [port (keys reassign-executors)]
[port (uuid)]))
]
;; 1. to kill are those in allocated that are dead or disallowed
;; 2. kill the ones that should be dead
;; - read pids, kill -9 and individually remove file
;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
;; 3. of the rest, figure out what assignments aren't yet satisfied
;; 4. generate new worker ids, write new "approved workers" to LS
;; 5. create local dir for worker id
;; 5. launch new workers (give worker-id, port, and supervisor-id)
;; 6. wait for workers launch (log-debug "Syncing processes")
(log-debug "Assigned executors: " assigned-executors)
(log-debug "Allocated: " allocated)
(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)
))
(doseq [id (vals new-worker-ids)]
(local-mkdirs (worker-pids-root conf id)))
(.put local-state LS-APPROVED-WORKERS
(merge
(select-keys (.get local-state LS-APPROVED-WORKERS)
(keys keepers))
(zipmap (vals new-worker-ids) (keys new-worker-ids))
))
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
(let [id (new-worker-ids port)]
(log-message "Launching worker with assignment "
(pr-str assignment)
" for this supervisor "
(:supervisor-id supervisor)
" on port "
port
" with id "
id
)
(launch-worker supervisor
(:storm-id assignment)
port
id)
id)))
))

worker

worker是被supervisor通过函数launch-worker带起来的。并没有外部的指令显示的启动或停止worker,当然kill除外, :).

worker的主要任务有

  • 发送心跳消息
  • 接收外部tuple的消息
  • 向外发送tuple消息

这些工作集中在mk-worker指定处理句柄。源码在此处就不一一列出了。

executor

executor是通过worker执行mk-executor完成初始化过程。

(defn mk-executor [worker executor-id]
(let [executor-data (mk-executor-data worker executor-id)
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
(map (fn [t] [t (task/mk-task executor-data t)]))
(into {})
(HashMap.))
_ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
report-error-and-die (:report-error-and-die executor-data)
component-id (:component-id executor-data) ;; starting the batch-transfer->worker ensures that anything publishing to that queue
;; doesn't block (because it's a single threaded queue and the caching/consumer started
;; trick isn't thread-safe)
system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
handlers (with-error-reaction report-error-and-die
(mk-threads executor-data task-datas))
threads (concat handlers system-threads)]
(setup-ticks! worker executor-data) (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
;; TODO: add method here to get rendered stats... have worker call that when heartbeating
(reify
RunningExecutor
(render-stats [this]
(stats/render-stats! (:stats executor-data)))
(get-executor-id [this]
executor-id )
Shutdownable
(shutdown
[this]
(log-message "Shutting down executor " component-id ":" (pr-str executor-id))
(disruptor/halt-with-interrupt! (:receive-queue executor-data))
(disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
(doseq [t threads]
(.interrupt t)
(.join t)) (doseq [user-context (map :user-context (vals task-datas))]
(doseq [hook (.getHooks user-context)]
(.cleanup hook)))
(.disconnect (:storm-cluster-state executor-data))
(when @(:open-or-prepare-was-called? executor-data)
(doseq [obj (map :object (vals task-datas))]
(close-component executor-data obj)))
(log-message "Shut down executor " component-id ":" (pr-str executor-id)))
)))

上述代码中mk-threads用来为spout或者bolt创建thread.

mk-threads使用到了clojure的函数重载机制,借用一下java或c++的术语吧。在clojure中使用defmulti来声明一个重名函数。

mk-threads函数有点长而且逻辑变得更为复杂,还是先从大体上有个概念为好,再去慢慢查看细节。

  • async-loop 线程运行的主函数,类似于pthread_create中的参数start_routine
  • tuple-action-fn spout和bolt都会收到tuple,处理tuple的逻辑不同但有一个同名的处理函数即是tuple-action-fn
  • event-handler 在这个创建的线程中又使用了disruptor模式,disruptor模式一个重要的概念就是要定义相应的event-handler。上面所讲的tupleaction-fn就是在event-handler中被处理。

调用逻辑如下图所示

twitter storm源码走读之3--topology提交过程分析

spout

先来看看如果是spout,mk-threads的处理步骤是啥样的,先说这个async-loops

[(async-loop
(fn []
;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
(while (not @(:storm-active-atom executor-data))
(Thread/sleep 100)) (log-message "Opening spout " component-id ":" (keys task-datas))
(doseq [[task-id task-data] task-datas
:let [^ISpout spout-obj (:object task-data)
tasks-fn (:tasks-fn task-data)
send-spout-msg (fn [out-stream-id values message-id out-task-id]
(.increment emitted-count)
(let [out-tasks (if out-task-id
(tasks-fn out-task-id out-stream-id values)
(tasks-fn out-stream-id values))
rooted? (and message-id has-ackers?)
root-id (if rooted? (MessageId/generateId rand))
out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
(fast-list-iter [out-task out-tasks id out-ids]
(let [tuple-id (if rooted?
(MessageId/makeRootId root-id id)
(MessageId/makeUnanchored))
out-tuple (TupleImpl. worker-context
values
task-id
out-stream-id
tuple-id)]
(transfer-fn out-task
out-tuple
overflow-buffer)
))
(if rooted?
(do
(.put pending root-id [task-id
message-id
{:stream out-stream-id :values values}
(if (sampler) (System/currentTimeMillis))])
(task/send-unanchored task-data
ACKER-INIT-STREAM-ID
[root-id (bit-xor-vals out-ids) task-id]
overflow-buffer))
(when message-id
(ack-spout-msg executor-data task-data message-id
{:stream out-stream-id :values values}
(if (sampler) 0))))
(or out-tasks [])
))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive receive-queue}
storm-conf (:user-context task-data)) (.open spout-obj
storm-conf
(:user-context task-data)
(SpoutOutputCollector.
(reify ISpoutOutputCollector
(^List emit [this ^String stream-id ^List tuple ^Object message-id]
(send-spout-msg stream-id tuple message-id nil)
)
(^void emitDirect [this ^int out-task-id ^String stream-id
^List tuple ^Object message-id]
(send-spout-msg stream-id tuple message-id out-task-id)
)
(reportError [this error]
(report-error error)
)))))
(reset! open-or-prepare-was-called? true)
(log-message "Opened spout " component-id ":" (keys task-datas))
(setup-metrics! executor-data) (disruptor/consumer-started! (:receive-queue executor-data))
(fn []
;; This design requires that spouts be non-blocking
(disruptor/consume-batch receive-queue event-handler) ;; try to clear the overflow-buffer
(try-cause
(while (not (.isEmpty overflow-buffer))
(let [[out-task out-tuple] (.peek overflow-buffer)]
(transfer-fn out-task out-tuple false nil)
(.removeFirst overflow-buffer)))
(catch InsufficientCapacityException e
)) (let [active? @(:storm-active-atom executor-data)
curr-count (.get emitted-count)]
(if (and (.isEmpty overflow-buffer)
(or (not max-spout-pending)
(< (.size pending) max-spout-pending)))
(if active?
(do
(when-not @last-active
(reset! last-active true)
(log-message "Activating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.activate spout))) (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
(do
(when @last-active
(reset! last-active false)
(log-message "Deactivating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
;; TODO: log that it's getting throttled
(Time/sleep 100))))
(if (and (= curr-count (.get emitted-count)) active?)
(do (.increment empty-emit-streak)
(.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
(.set empty-emit-streak 0)
))
0))
:kill-fn (:report-error-and-die executor-data)
:factory? true
:thread-name component-id)]))

对于spout来说,如何处理收到的数据呢,这一切都要与disruptor/consume-batch关联起来,注意上述代码红色加亮部分内容。

再看event-handler的定义, event-handler (mk-task-receiver executor-data tuple-action-fn)。上面的调用关系图就可以串起来了。

spout中的tuple-action-fn定义如下,这个tuple-action-fn很重要,如果诸位看官还记得本博前一篇讲解tuple消息发送途径文章内容的话,tuple接收的处理逻辑尽在于此了。

(fn [task-id ^TupleImpl tuple]
[stream-id (.getSourceStreamId tuple)]
ondp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
(when-not (= stored-task-id task-id)
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
)))
;; TODO: on failure, emit tuple to failure stream
))))

有关bolt相关thread的创建与消息接收处理函数就不一一罗列了,各位自行分析应该没有问题了。

twitter storm源码走读之3--topology提交过程分析的更多相关文章

  1. twitter storm 源码走读之5 -- worker进程内部消息传递处理和数据结构分析

    欢迎转载,转载请注明出处,徽沪一郎. 本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义.试图从代码实现的角度来回答,如果是从 ...

  2. twitter storm源码走读之2 -- tuple消息发送场景分析

    欢迎转载,转载请注明出处源自徽沪一郎.本文尝试分析tuple发送时的具体细节,本博的另一篇文章<bolt消息传递路径之源码解读>主要从消息接收方面来阐述问题,两篇文章互为补充. worke ...

  3. twitter storm源码走读之4 -- worker进程中线程的分类及用途

    欢迎转载,转载请注明出版,徽沪一郎. 本文重点分析storm的worker进程在正常启动之后有哪些类型的线程,针对每种类型的线程,剖析其用途及消息的接收与发送流程. 概述 worker进程启动过程中最 ...

  4. twitter storm源码走读之1 -- nimbus启动场景分析

    欢迎转载,转载时请注明作者徽沪一郎及出处,谢谢. 本文详细介绍了twitter storm中的nimbus节点的启动场景,分析nimbus是如何一步步实现定义于storm.thrift中的servic ...

  5. twitter storm源码走读之7 -- trident topology可靠性分析

    欢迎转载,转载请注明出处,徽沪一郎. 本文详细分析TridentTopology的可靠性实现, TridentTopology通过transactional spout与transactional s ...

  6. twitter storm源码走读之6 -- Trident Topology执行过程分析

    欢迎转载,转载请注明出处,徽沪一郎. TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射.关于Tride ...

  7. twitter storm源码走读之8 -- TridentTopology创建过程详解

    欢迎转载,转载请注明出处,徽沪一郎. 从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation.在实现层面来看,无论是str ...

  8. 【原】storm源码之mac os x编译twitter storm源码

    twitter storm是由backtype公司创始人nathanmarz一手研发和开源的流计算(实时计算)框架,堪称实时计算领域的hadoop.nathanmarz也是在mac os x环境下开发 ...

  9. Storm源码分析--Nimbus-data

    nimbus-datastorm-core/backtype/storm/nimbus.clj (defn nimbus-data [conf inimbus] (let [forced-schedu ...

随机推荐

  1. Laptop Issue Letter &lpar;读取Excel中指定内容&comma;然后生成新的Excel文件&rpar;

    $xl = New-Object -ComObject "Excel.Application" $cmdbwb = $xl.Workbooks.Open("F:\Ivan ...

  2. 高级UIKit-03&lpar;NSFileManager、NSFileHandle&rpar;

    fileManager文件管理器 [day04_1_FileManager_Search] :查找文件 fileManager有一个方法可以判断文件是否是文件夹, fileExistsAtPath:i ...

  3. UserManager

    刚刚学习servlet,打算学做一个小项目把前边学到的知识做一个总结. 由于只是实现了一些简单的功能,所以美工就凑合着看吧(美工其实也不太会). 首先项目整体架构如图 项目准备工作: 要用到mysql ...

  4. Samba服务器的安装与配置

    Samba服务器主要的功能是实现本地windows系统下方便读写局域网内虚拟机下的文件: Samba与window连接需要使用NetBIOS协议,请确认你的Windows系统已经安装了NetBIOS协 ...

  5. Vue&lpar;小案例&lowbar;vue&plus;axios仿手机app&rpar;&lowbar;首页(底部导航栏&plus;轮播图&plus;九宫格)

    ---恢复内容开始--- 一.前言                        1.底部导航(两种做法)                                         2.轮播图 ...

  6. 2017-12 CDQZ集训&lpar;已完结&rpar;

    从联赛活了下来(虽然分数倒一……),接下来要去CDQZ集训啦…… DAY -2 2017-12-16 被老师安排负责一部分同学的住宿以及安排…… 抓紧时间继续学习,LCT真好玩啊真好玩…… 晚上放假了 ...

  7. performance Counter

    Logman https://technet.microsoft.com/en-us/library/bb490956.aspx http://blogs.technet.com/b/askperf/ ...

  8. 插件式WebApi服务及自动生成Api帮助文档

    上一篇博客中,讲到了将WebApi Host到控制台和IIS,本篇总结一下如何将WebApi的Service以插件的形式进行动态部署,并设置Hoster的首页显示Api帮助文档,当然,也包括动态部署进 ...

  9. 电子商务&lpar;电销&rpar;平台中内容模块&lpar;Content&rpar;数据库设计明细

    以下是自己在电子商务系统设计中的数据库设计经验总结,而今发表出来一起分享,如有不当,欢迎跟帖讨论~ 文章表 (article)|-- 自动编号|-- 文章标题 (title)|-- 文章类别编号 (c ...

  10. &lbrack;BZOJ&rsqb;BST again

    Description 求有多少棵大小为n的深度为h的二叉树.(树根深度为0:左右子树有别:答案对1000000007取模) Input 第一行一个整数T,表示数据组数. 以下T行,每行2个整数n和h ...