Spout数据源

时间:2023-03-09 03:00:37
Spout数据源

Spout 数据源

  消息源 Spout 是 Storm 的 Topology 中的消息生产者(即 Tuple 的创造者)。

Spout 介绍

  1. Spout 的结构
  Spout 是 Storm 的核心组件之一,最源头的接口是 IComponent,如图 3-2 所示,几个Spout 接口都继承自 IComponent。
Spout数据源

                图 3-2 Spout 类图

  Spout 发出的消息

  Spout 从外部获取数据后,向 Topology 中发出的 Tuple 可以是可靠的,也可以是不可靠的。

  注意:一个可靠的消息源可以重新发射一个 Tuple(如果该 Tuple 没有被 Storm 成功处理),但是一个不可靠的消息源 Spout 一旦发出,一个 Tuple 就把它彻底“遗忘”,也就不可能再发了。

  Spout 发射的流
  Spout 可以发射多个流。要达到这样的效果,使用 OutputFieldsDeclarer.declareStream 来定义多个流(即定义多个 Stream),然后使用 SpoutOutputCollector 来发射指定的流。

  Spout 的重要方法

  Spout 的重要方法是 nextTuple。 nextTuple 方法发射一个新的元组到 Topology,如果没有新元组发射,则直接返回。注意任务 Spout 的 nextTuple 方法都不要实现成阻塞的,因为Storm 是在相同的线程中调用 Spout 的方法。 Spout 的另外两个重要方法是 ack 和 fail 方法。当 Spout 发射的元组被拓扑成功处理时,调用 ack 方法;当处理失败时,调用 fail 方法。 ack和 fail 方法仅被可靠的 Spout 调用。

  Spout 的组件
  Spout 的 最 顶 层 抽 象 是 ISpout 接 口。 在 通 常 情 况 下(Shell 和 事 务 型 的 除 外), 实 现一 个 Spout, 可 以 直 接 实 现 接 口 IRichSpout, 如 果 不 想 写 多 余 的 代 码, 可 以 直 接 继 承BaseRichSpout。

Spout 实例

  下 面 通 过 创 建 一 个 实 例 RandomSpout 来 介 绍 Spout, 图 3-3 为 RandomSpout 继 承 自BasicRichSpout 及其实现的原理图。
  图 3-4 列出了实例 RandomSpout 继承自 BaseRichSpout 中的几个重要方法。

  下面对图 3-4 中的方法进行详细介绍。
  (1) open 方法
  当一个 Task 被初始化时会调用此 open 方法。一般都会在此方法中初始化发送 Tuple 的对象 SpoutOutputCollector 和配置对象 TopologyContext。
  代码示例如下:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
random = new Random();
}

Spout数据源

  图 3-3 RandomSpout 类图

Spout数据源

  图 3-4 RandomSpout 类的主要方法

  (2) declareOutputFields 方法
  此方法用于声明当前 Spout 的 Tuple 发送流。流的定义是通过 OutputFieldsDeclare.declareStream方法完成的,其中的参数包括了发送的域 Fields。
  示例代码如下:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("randomInt"));
}

  (3) nextTuple 方法
  这是 Spout 类中最重要的一个方法。发射一个 Tuple 到 Topology 都是通过该方法来实现的。

示例代码如下:
public void nextTuple() {
while(true){
Values val = new Values(random.nextInt(100));
collector.emit(val);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

  以上代码从 100 以内的整数中随机生成一个数作为 Tuple 的值,然后通过_collector 发送到 Topology。
  另外,除了上述几个方法之外,还有 getComponentConf iguration、ack、fail 和 close 方法等。 getComponentConfiguration 方法用于配置当前组件的参数, Storm 监测到一个 Tuple 被成功处理时调用 ack 方法,处理失败时调用 fail 方法,这两个方法在 BaseRichSpout 类中已经被隐式实现了。