Storm应用系列之——最基本的例子

时间:2022-01-20 18:00:30

Storm应用最基本的例子


1. 建立Maven项目

我们用Maven来管理项目,方便lib依赖的引用和版本控制。

建立最基本的pom.xml如下:


[html] view plain copy print?
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3. <modelVersion>4.0.0</modelVersion>  
  4. <groupId>com.edi.storm</groupId>  
  5. <artifactId>storm-samples</artifactId>  
  6. <version>0.0.1-SNAPSHOT</version>  
  7. <packaging>jar</packaging>  
  8.   
  9.   
  10. <properties>  
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  12. </properties>  
  13.   
  14.   
  15. <repositories>  
  16. <repository>  
  17. <id>clojars.org</id>  
  18. <url>http://clojars.org/repo</url>  
  19. </repository>  
  20. </repositories>  
  21.   
  22.   
  23. <build>  
  24. <finalName>storm-samples</finalName>  
  25. <plugins>  
  26. <plugin>  
  27. <groupId>org.apache.maven.plugins</groupId>  
  28. <artifactId>maven-compiler-plugin</artifactId>  
  29. <version>3.1</version>  
  30. <configuration>  
  31. <source>1.7</source>  
  32. <target>1.7</target>  
  33. <encoding>${project.build.sourceEncoding}</encoding>  
  34. </configuration>  
  35. </plugin>  
  36.   
  37.   
  38. <plugin>  
  39. <artifactId>maven-assembly-plugin</artifactId>  
  40. <configuration>  
  41. <descriptorRefs>  
  42. <descriptorRef>jar-with-dependencies</descriptorRef>  
  43. </descriptorRefs>  
  44. </configuration>  
  45. <executions>  
  46. <execution>  
  47. <id>make-assembly</id>  
  48. <phase>package</phase>  
  49. <goals>  
  50. <goal>single</goal>  
  51. </goals>  
  52. </execution>  
  53. </executions>  
  54.   
  55.   
  56. </plugin>  
  57. </plugins>  
  58. </build>  
  59.   
  60.   
  61. <dependencies>  
  62. <dependency>  
  63. <groupId>storm</groupId>  
  64. <artifactId>storm</artifactId>  
  65. <version>0.9.0-rc2</version>  
  66. <scope>provided</scope>  
  67. </dependency>  
  68. </dependencies>  
  69. </project>  
这里我额外添加了两个build 插件:

maven-compiler-plugin : 为了方便指定编译时jdk。Storm的依赖包里面某些是jdk1.5的.

和 

maven-assembly-plugin: 为了把所有依赖包最后打到一个jar包去,方便测试和部署。后面会提到如果不想打到一个jar该怎么做。


2. 建立Spout

前文提到过,Storm中的spout负责发射数据。

我们来实现这样一个spout:

它会随机发射一系列的句子,句子的格式是 谁:说的话

代码如下:

[java] view plain copy print?
  1. public class RandomSpout extends BaseRichSpout {  
  2.   
  3.     private SpoutOutputCollector collector;  
  4.   
  5.     private Random rand;  
  6.       
  7.     private static String[] sentences = new String[] {"edi:I'm happy""marry:I'm angry""john:I'm sad""ted:I'm excited""laden:I'm dangerous"};  
  8.       
  9.     @Override  
  10.     public void open(Map conf, TopologyContext context,  
  11.             SpoutOutputCollector collector) {  
  12.         this.collector = collector;  
  13.         this.rand = new Random();  
  14.     }  
  15.   
  16.     @Override  
  17.     public void nextTuple() {  
  18.         String toSay = sentences[rand.nextInt(sentences.length)];  
  19.         this.collector.emit(new Values(toSay));  
  20.     }  
  21.   
  22.     @Override  
  23.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  24.         declarer.declare(new Fields("sentence"));  
  25.     }  
  26.   
  27. }  

这里要先理解Tuple的概念。

Storm中,基本元数据是靠Tuple才承载的。或者说,Tuple是数据的一个大抽象。它要求实现类必须能序列化。


该Spout代码里面最核心的部分有两个:

a. 用collector.emit()方法发射tuple。我们不用自己实现tuple,我们只需要定义tuple的value,Storm会帮我们生成tuple。Values对象接受变长参数。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,...)的参数的index,例如我们emit(new Values("v1", "v2")), 那么Tuple的属性即为:{ [ "v1" ], [ "V2" ] }

b. declarer.declare方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。


3. 建立Bolt

既然有了源,那么我们就来建立节点处理源流出来的数据。怎么处理呢?为了演示,我们来做些无聊的事情:末尾添加"!",然后打印。

两个功能,两个Bolt。

先看添加"!"的Bolt

[java] view plain copy print?
  1. public class ExclaimBasicBolt extends BaseBasicBolt {  
  2.   
  3.     @Override  
  4.     public void execute(Tuple tuple, BasicOutputCollector collector) {  
  5.         //String sentence = tuple.getString(0);  
  6.         String sentence = (String) tuple.getValue(0);  
  7.         String out = sentence + "!";  
  8.         collector.emit(new Values(out));  
  9.     }  
  10.   
  11.     @Override  
  12.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  13.         declarer.declare(new Fields("excl_sentence"));  
  14.     }  
  15.   
  16. }  

在RandomSpout中,我们发射的Tuple具有这样的属性 { [ "edi:I'm Happy" ] }, 所以tuple的value list中第0个值,肯定是个String。我们用tuple.getvalue(0)取到。

Storm为tuple封装了一些方法方便我们取一些基本类型,例如String,我们可以直接用getString(int N) 。

取到以后,我们在末尾添加"!"后,仍然发射一个Tuple,定义其唯一的value的field 名字为"excl_sentence"


打印Bolt

[java] view plain copy print?
  1. public class PrintBolt extends BaseBasicBolt {  
  2.   
  3.     @Override  
  4.     public void execute(Tuple tuple, BasicOutputCollector collector) {  
  5.         String rec = tuple.getString(0);  
  6.         System.err.println("String recieved: " + rec);  
  7.     }  
  8.   
  9.     @Override  
  10.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  11.         // do nothing  
  12.     }  
  13.   
  14. }  

仍然是取第一个,因为我们并没有定义过第二个value


4. 建立Topology

现在我们建立拓扑结构的主要组件都有了,可以创建topology了。

[java] view plain copy print?
  1. public class ExclaimBasicTopo {  
  2.   
  3.     public static void main(String[] args) throws Exception {  
  4.         TopologyBuilder builder = new TopologyBuilder();  
  5.           
  6.         builder.setSpout("spout"new RandomSpout());  
  7.         builder.setBolt("exclaim"new ExclaimBasicBolt()).shuffleGrouping("spout");  
  8.         builder.setBolt("print"new PrintBolt()).shuffleGrouping("exclaim");  
  9.   
  10.         Config conf = new Config();  
  11.         conf.setDebug(false);  
  12.   
  13.         if (args != null && args.length > 0) {  
  14.             conf.setNumWorkers(3);  
  15.   
  16.             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
  17.         } else {  
  18.   
  19.             LocalCluster cluster = new LocalCluster();  
  20.             cluster.submitTopology("test", conf, builder.createTopology());  
  21.             Utils.sleep(100000);  
  22.             cluster.killTopology("test");  
  23.             cluster.shutdown();  
  24.         }  
  25.     }  
  26. }  

很简单,对吧。

其中,

[java] view plain copy print?
  1. builder.setSpout("spout"new RandomSpout());  
定义一个spout,id为"spout"
[java] view plain copy print?
  1. builder.setBolt("exclaim"new ExclaimBasicBolt()).shuffleGrouping("spout");   
定义了一个id为"exclaim"的bolt,并且按照随机分组获得"spout"发射的tuple [java] view plain copy print?
  1. builder.setBolt("print"new PrintBolt()).shuffleGrouping("exclaim");  
定义了一个id为"print"的bolt,并且按照随机分组获得"exclaim”发射出来的tuple

[java] view plain copy print?
  1. .shuffleGrouping  
是指明Storm按照何种策略将tuple分配到后续的bolt去。

可以看到,如果我们运行时不带参数,是把topology提交到了LocalCluster的,即所有的task都在一个本地JVM去执行。可以用LocalCluster来调试。如果后面带一个参数,即为该topology的名字,那么就把该topology提交到集群上去了。

把项目用M2E插件导入Eclipse直接运行试试

[plain] view plain copy print?
  1. String recieved: marry:I'm angry!  
  2. String recieved: edi:I'm happy!  
  3. String recieved: john:I'm sad!  
  4. String recieved: edi:I'm happy!  
  5. String recieved: ted:I'm excited!  
  6. String recieved: laden:I'm dangerous!  
  7. String recieved: edi:I'm happy!  
  8. String recieved: edi:I'm happy!  

这里我们并没有指定并行,那么其实是每个spout、bolt仅有一个线程对应去执行。

我们修改下代码,指定并行数

[java] view plain copy print?
  1. builder.setBolt("exclaim"new ExclaimBasicBolt(), 2).shuffleGrouping("spout");  
  2. builder.setBolt("print"new PrintBolt(),3).shuffleGrouping("exclaim");  

由于我们并没有多指定task数目,所以默认,会有两个exectuor去执行两个exclaimBasicBolt的task,3个executor去执行3个PrintBolt的task。

为了方便体现确实是并行,我们修改PrintBolt代码如下:

[java] view plain copy print?
  1. public class PrintBolt extends BaseBasicBolt {  
  2.   
  3.     private int indexId;  
  4.       
  5.     @Override  
  6.     public void prepare(Map stormConf, TopologyContext context) {  
  7.         this.indexId = context.getThisTaskIndex();  
  8.     }  
  9.   
  10.     @Override  
  11.     public void execute(Tuple tuple, BasicOutputCollector collector) {  
  12.         String rec = tuple.getString(0);  
  13.         System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));  
  14.     }  
  15.   
  16.     @Override  
  17.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  18.         // do nothing  
  19.     }  
  20.   
  21. }  
这里从上下文中拿到该Bolt的TaskIndex,我们指定了3的并发度,所以理论上有3个task,那么该值应该为[1,2,3]。

运行下看看:

[plain] view plain copy print?
  1. Bolt[0] String recieved: marry:I'm angry!  
  2. Bolt[2] String recieved: john:I'm sad!  
  3. Bolt[2] String recieved: ted:I'm excited!  
  4. Bolt[2] String recieved: john:I'm sad!  
  5. Bolt[2] String recieved: john:I'm sad!  

证实确实是并发了。

本地测试通过了,我们用 mvn clean install 命令编译,然后把target目录下生成的 storm-samples-jar-with-dependencies.jar 拷到nimbus机器上,执行

[plain] view plain copy print?
  1. ./storm jar storm-samples-jar-with-dependencies.jar com.edi.storm.topos.ExclaimBasicTopo test  
在StormUI里面,点进 test

Storm应用系列之——最基本的例子

看到spout 已然已经emit了 11347280个tuple了…… 而id为exclaim的bolt也已经接受了2906920个tuple了。print没有输出,所以emit为0。