Storm示例剖析-fastWordCount

时间:2022-05-21 18:29:57

本文介绍介绍使用storm开发项目代码,介绍spout和bolt的相关接口等。storm的业务开发主要包括spout的开发,bolt的开发以及topology的创建等。

代码框架
Storm示例剖析-fastWordCount

spout
下述spout主要实现,随机的从给定的语句中发送一个sentence。

//FastRandomSentenceSpout.java
package spout.fastWordCount;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

public class FastRandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;

private static final String[] CHOICES = {
"marry had a little lamb whos fleese was white as snow",
"and every where that marry went the lamb was sure to go",
"one two three four five six seven eight nine ten",
"this is a test of the emergency broadcast system this is only a test",
"peter piper picked a peck of pickeled peppers"
};

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = ThreadLocalRandom.current();
}

@Override
public void nextTuple() {
String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
_collector.emit(new Values(sentence), sentence);
//_collector.emit(new Values(...), tupleid);
//为了管理spout的可靠性,发送元组时加上tupleid
}

@Override
public void ack(Object id) {
//Ignored
}

@Override
public void fail(Object id) {
_collector.emit(new Values(id), id);//失败时发送消息的ID
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}

FastRandomSentenceSpout继承并重写了BaseRichSpout类
/*
*open()方法会在进程的初始化时被调用
*conf:storm中关于spout的配置
*context:可用来获取关于这个任务在拓扑中的位置信息,包括任务的ID,任务的组件ID,输入/输出信息等
*collector:收集器,用于元组的发送,线程安全
*/

open(Map conf, TopologyContext context, SpoutOutputCollector collector);
/*
*用于元组的发送,storm框架会不断的调用该函数,如果没有元组可供发送,则简单的返回;
*在实践中如果没有元组可供发送,可调用sleep()睡眠短暂的时间,以减少CPU的负载
*/

nextTuple();
/*
*当storm确定标识符为id的消息已经被处理时,会调用ack()方法,并将消息移除发送队列
*/

ack(Object id);
/*
*当storm确定标识符为id的消息未被完全处理时,会调用该方法,并将消息重新放回发送队列
*/

fail(Object id);
/*
*提供会发送字段的声明
*/

declareOutputFields(OutputFieldsDeclarer declarer)
另外BaseRichSpout()还提供了待重写的方法close(),activate(),deactivate()
close():当一个spout被关闭时调用,但是并不能保证一定被调用,因为实践中可以随时kill一个进程
activate():当spout从失效模式中被激活时调用
deactivate():当spout失效时被调用

Bolt
Bolt->SplitSentence:实现将一句话以切分成单词,并发送到下一个bolt进行处理。

 //SplitSentence.java
package bolt.fastWordCount;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;

import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class SplitSentence extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for (String word: sentence.split("\\s+")) {
collector.emit(new Values(word, 1));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

Bolt->WordCount:从流中取出单词,并进行计数,并通过新增的日志文件进行输出。

//WordCount.java
这里增加了日志输出,关于日志的配置需要在/home/storm/log4j2/worker.xml增加
/*
<RollingFile name="stat"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.stat"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.stat.%i.gz">
<PatternLayout>
<pattern>${pattern}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
<DefaultRolloverStrategy max="3"/>
</RollingFile>

<logger name="stat" level="INFO">
<appender-ref ref="stat"/>
</logger>
*/


package bolt.fastWordCount;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

Logger log = LoggerFactory.getLogger("stat");

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
log.info("word:{}, count:{}",word,count);

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

BaseBasicBolt封装并提供了一套可靠的消息处理机制,会在execute()方法后根据处理情况自动的调用ack()或者fail()方法。
/*
*storm会不断的调用execute()方法,来处理一个输入元组,
*tuple:输入的元组
*collector:采集器,用于元组的发送
*/

execute(Tuple tuple, BasicOutputCollector collector);
如果WordCount继承自其它的类,例如IRichBolt或BaseRichBolt
那么还必须重写,参数的含义类似于spout的open()方法
public void prepare(Map conf, TopologyContext context, OutputCollector collector);
另外execute()方法也略有不同,在execute()中为了保证消息的可靠处理(并非必须的),必须自己调用ack()或者fail()方法,
public void execute(Tuple tuple);

Topology
创建拓扑

package topology.fastWordCount;

import bolt.fastWordCount.*;
import spout.fastWordCount.*;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;

import java.util.Map;

public class FastWordCountTopology {

public static void kill(Nimbus.Client client, String name) throws Exception
{
KillOptions opts = new KillOptions();
opts.set_wait_secs(0);
client.killTopologyWithOpts(name, opts);
}

public static void main(String[] args) throws Exception
{
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
builder.setBolt("split", new SplitSentence(), 4)
.shuffleGrouping("spout");//随机分组,语句切分后的单词会被随机Bolt上的任务,这样能保证每个任务得到相同数量的元组
builder.setBolt("count", new WordCount(), 4)
.fieldsGrouping("split", new Fields("word")); //按字段进行分组,保证相同的字段被同一个

Config conf = new Config();

String name = "wordcount";

if (args != null && args.length > 0) {
//生产环境
name = args[0];
conf.setNumWorkers(2); //设置工作进程数为2个
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
//以下代码展示了如何在生产环境中kill拓扑
Map clusterConf = Utils.readStormConfig();
clusterConf.putAll(Utils.readCommandLineOpts());
Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
Thread.sleep(5 * 60 * 1000);
kill(client, name);
} else {
//本地开发环境
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}

}

}

StormSubmitter类提供了提交拓扑的方法,常用的方法主要有
/*
*name:待提交拓扑的名称,需唯一
*stormConf:提交拓扑的配置
*topology:拓扑对象
*opts:提交时的附加参数
*/

import org.apache.storm.StormSubmitter;

submitTopology(String name, Map stormConf, StormTopology topology);
submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts);
submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology);
submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts);

LocalCluster类提供了本地开发环境的拓扑。
import org.apache.storm.LocalCluster;

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
//cluster.submitTopologyWithOpts(String arg0, Map arg1, StormTopology arg2, SubmitOptions arg3);
cluster.killTopology("test"); //kill拓扑
cluster.shutdown();//关闭本地集群

将上述代码编译后组成成jar包,提交到storm即可运行。