前期博客
Storm编程入门API系列之Storm的Topology默认Workers、默认executors和默认tasks数目
Storm编程入门API系列之Storm的Topology多个Workers数目控制实现
继续编写
StormTopologyMoreTask.java
package zhouls.bigdata.stormDemo; import java.util.Map; import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; public class StormTopologyMoreTask { public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
} int num = ;
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num));
Utils.sleep();
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
} } public static class MyBolt extends BaseRichBolt{ private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
} public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
System.out.println("线程id:"+Thread.currentThread().getId()+",接收的值为:"+num);
} public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout());
topologyBuilder.setBolt(bolt_id, new MyBolt()).setNumTasks().shuffleGrouping(spout_id); Config config = new Config();
String topology_name = StormTopologyMoreTask.class.getSimpleName();
if(args.length==){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} } }
打jar包
[hadoop@master jar]$ pwd
/home/hadoop/app/apache-storm-1.0./jar
[hadoop@master jar]$ ll
total
-rw-r--r-- hadoop hadoop Jul : StormTopology.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreExecutor.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreWorker.jar
[hadoop@master jar]$ rz [hadoop@master jar]$ ll
total
-rw-r--r-- hadoop hadoop Jul : StormTopology.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreExecutor.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreTask.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreWorker.jar
[hadoop@master jar]$
提交作业之前
为什么,会是如上的数字呢?大家要学,就要深入去学和理解。
因为,我之前运行的StormTopologyMoreExecutor没有停掉
为什么,会是如上的数字呢?大家要学,就要深入去学和理解。