storm sum aggregate 原语 聚合 本地测试

时间:2023-03-08 18:33:53
storm sum aggregate 原语 聚合  本地测试

编写storm程序,对数据进行聚合并且写入到mysql,

本文  主要说明数据中有多个字段需要进行sum或其他操作时的程序写法

1.主程序main方法,storm 拓扑运行入口

public class CopyOfPvTopo {

	public static void main(String[] args) throws Exception {
System.out.println("--------------------------------------------------------------------------start");
BrokerHosts brokerHosts = new ZkHosts(Const.ZK_STR); //zookeeper配置主机名集合
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, Const.PVUV_TOPIC, "20160607p");//对接kafka,接收某一个管道
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

         //模拟批处理数据
FixedBatchSpout spout = new FixedBatchSpout(new Fields("str"), 3,
new Values("111|2|1|1|1|1|1|1|1|1|1|1|1|1"),
new Values("111|2|1|1|1|1|1|1|1|1|1|1|1|1"),
new Values("111|2|1|1|1|1|1|1|1|1|1|1|1|1"),
new Values("111|2|1|1|1|1|1|1|1|1|1|1|1|1"));
// spout.setCycle(true);//是否循环提交数据
TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);//本地测试没用
TridentTopology topology = new TridentTopology();//建一个拓扑
topology.newStream("20160607p", spout)//读取模拟数据
.parallelismHint(20)
.shuffle()
.each(new Fields("str"), new Senquece(), new Fields("mapid", "pv", "uv"))//处理原数据
.parallelismHint(80)
.groupBy(new Fields("mapid","uv","pv")).chainedAgg()//开始调用链
.aggregate(new Fields("uv"), new SumAgg(), new Fields("sumuv"))//求和uv
.aggregate(new Fields("pv"), new SumAgg(), new Fields("sumpv")).chainEnd()//结束调用链
.parallelismHint(35)
.each(new Fields("mapid", "sumpv", "sumuv"), new StoreFilter())//发送到下一步
.parallelismHint(20);
Config conf = new Config();
conf.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(Const.PVUV_TOPO, conf, topology.build());
// Utils.sleep(100000);
// cluster.killTopology("firstTopo");
// cluster.shutdown();
}
}

  2.sum类,做sum是需要用到的类在上面的类中可以找到调用

public class SumAgg implements CombinerAggregator<Integer> {

    private static final long serialVersionUID = -6764153182395797633L;

    @Override
public Integer init(TridentTuple tuple) {
return tuple.getInteger(0);
} @Override
public Integer combine(Integer val1, Integer val2) {
return val1 + val2;
} @Override
public Integer zero() {
return 0;
} }