Storm 从入门到精通 第十五讲 Storm Word Count 示例代码

时间:2022-01-07 16:52:19

Storm 从入门到精通 第十五讲 Storm Word Count 示例代码Storm 从入门到精通 第十五讲 Storm Word Count 示例代码

1. CountWordTopology

package com.john.learn.storm.countword;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.john.learn.storm.countword.blot.WordReportBlot;
import com.john.learn.storm.countword.blot.WordCountBlot;
import com.john.learn.storm.countword.blot.WordSplitBlot;
import com.john.learn.storm.countword.spout.WordSpout;

public class CountWordTopology {

	public static void main(String[] args) throws InterruptedException {

		Config config = new Config();
		config.setNumWorkers(2);
		config.setDebug(false);

		TopologyBuilder topologyBuilder = new TopologyBuilder();

		topologyBuilder.setSpout("WordSpout", new WordSpout());

		topologyBuilder.setBolt("WordSplitBlot", new WordSplitBlot(), 5).shuffleGrouping("WordSpout");

		topologyBuilder.setBolt("WordCountBlot", new WordCountBlot(), 8).fieldsGrouping("WordSplitBlot",
				new Fields("Word"));

		topologyBuilder.setBolt("ReportBlot", new WordReportBlot()).globalGrouping("WordCountBlot");

		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("CountWordTopology", config, topologyBuilder.createTopology());

		Thread.sleep(20000);

		localCluster.killTopology("CountWordTopology");
		localCluster.shutdown();
	}
}

2. Spout

package com.john.learn.storm.countword.spout;

import java.io.Serializable;
import java.io.UnsupportedEncodingException;

import org.apache.storm.shade.org.apache.commons.lang.StringUtils;

public class Word implements Serializable {

	private String word;

	private static final long serialVersionUID = 1L;

	public Word() {

	}

	public Word(String word) {

		this.word = word.toLowerCase();
	}

	public Word(byte[] bytes) {

		this.word = new String(bytes);
	}

	public byte[] toBytes(String charset) {

		if (this.word == null) {

			return new byte[0];
		}

		try {

			return this.word.getBytes(charset);

		} catch (UnsupportedEncodingException e) {

			return this.word.getBytes();
		}

	}

	public String getWord() {
		return word;
	}

	public void setWord(String word) {
		this.word = word;
	}

	@Override
	public int hashCode() {

		return StringUtils.trimToEmpty(word).toLowerCase().hashCode();
	}

	@Override
	public boolean equals(Object obj) {

		if (!(obj instanceof Word)) {

			return false;
		}

		return StringUtils.trimToEmpty(((Word) obj).word).equalsIgnoreCase(this.word);
	}

	@Override
	public String toString() {

		return this.word;
	}

}

WordSpout

package com.john.learn.storm.countword.spout;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import clojure.main;

public class WordSpout extends BaseRichSpout {

	private static final long serialVersionUID = 1L;

	@Override
	public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;
		is = Thread.currentThread().getContextClassLoader()
				.getResourceAsStream("WhyUseStorm.txt");
		bufferedReader = new BufferedReader(new InputStreamReader(is));

	}

	@Override
	public void nextTuple() {

		if (bufferedReader == null) {

			return;
		}

		try {

			String line = null;

			while ((line = bufferedReader.readLine()) != null) {

				collector.emit(new Values(reviseSentenceOnMeaninglessSymbo(line)));

			}

		} catch (Exception e) {

			e.printStackTrace();

		} finally {

			try {

				this.bufferedReader.close();

				bufferedReader = null;

			} catch (Exception e) {

				e.printStackTrace();
			}
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

		fieldsDeclarer.declare(new Fields("Sentence"));

	}

	private static String reviseSentenceOnMeaninglessSymbo(String sentence) {

		System.out.println("line:" + sentence);

		for (char c : MeaninglessSymbols) {
			sentence = StringUtils.remove(sentence, String.valueOf(c));
		}

		return sentence;
	}

	private void sleep(int millis) {

		try {
			Thread.sleep(millis);
		} catch (InterruptedException e) {

		}

	}

	private SpoutOutputCollector collector;

	private static final char[] MeaninglessSymbols = ".,!@#$%^&*()_+|{}[]:;<>?\"'’“”《》~·、。,".toCharArray();

	private int readIndex = 0;

	private InputStream is = null;

	private BufferedReader bufferedReader = null;

}

3. 三个Bolts

WordSplitBlot

package com.john.learn.storm.countword.blot;

import java.util.Arrays;
import java.util.Map;

import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import com.john.learn.storm.countword.spout.Word;

import clojure.main;

public class WordSplitBlot extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		String[] words = StringUtils.splitByWholeSeparator(tuple.getStringByField("Sentence"), " ");

		for (String word : words) {

			if (StringUtils.isEmpty(word)) {
				continue;
			}

			collector.emit(new Values(new Word(word).toBytes("ios-8859-1")));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

		fieldsDeclarer.declare(new Fields("Word"));
	}

	private static final long serialVersionUID = 1L;

}

WordCountBlot

package com.john.learn.storm.countword.blot;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import com.john.learn.storm.countword.spout.Word;

public class WordCountBlot extends BaseRichBolt {

	@Override
	public void prepare(Map config, TopologyContext contexxt, OutputCollector collector) {

		wordCounts = new HashMap<>();
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {

		countWord(new Word(tuple.getBinaryByField("Word")));

	}

	private void countWord(Word word) {

		AtomicLong count = wordCounts.get(word);

		if (count == null) {

			count = new AtomicLong();
			wordCounts.put(word, count);
		}

		count.incrementAndGet();

		collector.emit(new Values(word.getWord(), count.get()));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("Word", "WordCount"));
	}

	private static final long serialVersionUID = 1L;

	private Map<Word, AtomicLong> wordCounts;

	private OutputCollector collector;

}

WordReportBlot

package com.john.learn.storm.countword.blot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

public class WordReportBlot extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		reportWords.put(tuple.getStringByField("Word"), tuple.getLongByField("WordCount"));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

	}

	@Override
	public void cleanup() {

		System.out.println("--------------------------------Final Report--------------------------------");
		List<String> keys = new ArrayList<>(reportWords.keySet());

		Collections.sort(keys);

		for (String key : keys) {

			System.out.println(key + ":" + reportWords.get(key));
		}

		System.out.println("--------------------------------------------------------------------------");
	}

	private Map<String, Long> reportWords = new HashMap<>();

}

运行结果:

WhyUseStorm.txt

Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!

Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.

Storm integrates with the queueing and database technologies you already use. A Storm topology consumes streams of data and processes those streams in arbitrarily complex ways, repartitioning the streams between each stage of the computation however needed. Read more in the tutorial.

--------------------------------Final Report--------------------------------
a:5
already:1
analytics:1
and:7
any:1
apache:1
arbitrarily:1
at:1
batch:1
be:2
benchmark:1
between:1
can:1
cases:1
clocked:1
complex:1
computation:3
consumes:1
continuous:1
data:3
database:1
did:1
distributed:2
doing:1
each:1
easy:2
etl:1
fast:1
fault-tolerant:1
for:2
free:1
fun:1
guarantees:1
hadoop:1
has:1
however:1
in:2
integrates:1
is:6
it:3
language:1
learning:1
lot:1
machine:1
makes:1
many:1
million:1
more:2
needed:1
node:1
of:4
online:1
open:1
operate:1
over:1
per:2
process:1
processed:2
processes:1
processing:2
programming:1
queueing:1
read:1
realtime:3
reliably:1
repartitioning:1
rpc:1
scalable:1
second:1
set:1
simple:1
source:1
stage:1
storm:7
streams:4
system:1
technologies:1
the:4
those:1
to:3
topology:1
tuples:1
tutorial:1
unbounded:1
up:1
use:3
used:1
ways:1
what:1
will:1
with:2
you:1
your:1
--------------------------------------------------------------------------