Trident继承kafka

时间:2023-03-09 14:54:52
Trident继承kafka

1.Kafka涉及的类

  上一个类是不透明事务

  后一个是完全事务

  Trident继承kafka

2.启动服务

  Trident继承kafka

3..驱动类

  重要的地方是修改了两个部分:

  1.数据的来源是kafka

  2.第二个是字段的Fields是str

 package com.jun.tridentWithKafka;

 import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.OpaqueTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.MemoryMapState; public class TridentWithKafka {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TridentTopology tridentTopology=new TridentTopology();
//使用Kafka中的数据
BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
String topic = "nginxlog";
TridentKafkaConfig conf = new TridentKafkaConfig(hosts, topic); conf.scheme = new SchemeAsMultiScheme(new StringScheme());
conf.forceFromStart = true; OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(conf); //流处理
Stream stream=tridentTopology.newStream("orderAnalyse",spout)
//过滤
.each(new Fields("str"),new ValidLogFilter())
//解析
.each(new Fields("str"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
//投影
.project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
//时间解析
.each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter"))
;
//分流
//1.基于minter统计订单数量,分组统计
TridentState state=stream.groupBy(new Fields("minter"))
//全局聚合,使用内存存储状态信息
.persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter"));
// state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter()); //2.另一个流,基于分钟的订单金额,局部聚合
Stream partitionStream=stream.each(new Fields("orderAmtStr"),new TransforAmtToDoubleFunction(),new Fields("orderAmt"))
.groupBy(new Fields("minter"))
//局部聚合
.chainedAgg() //聚合链
.partitionAggregate(new Fields("orderAmt"),new LocalSum(),new Fields("orderAmtSumOfLocal"))
.chainEnd(); //聚合链
// partitionStream.each(new Fields("minter","orderAmtSumOfLocal"),new PrintFilter());
//做一次全局聚合
TridentState partitionState=partitionStream.groupBy(new Fields("minter"))
//全局聚合
.persistentAggregate(new MemoryMapState.Factory(),new Fields("orderAmtSumOfLocal"),new Sum(),new Fields("totalOrderAmt"));
partitionState.newValuesStream().each(new Fields("minter","totalOrderAmt"),new PrintFilter()); //提交
Config config=new Config();
if(args==null || args.length<=0){
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
}else {
config.setNumWorkers(2);
StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
}
}
}

4.输入数据

  Trident继承kafka

5.控制台

  Trident继承kafka