Kafka+Storm+HDFS 整合示例

时间:2023-03-09 00:34:24
Kafka+Storm+HDFS 整合示例

  消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,最后将结果保存在HDFS中,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。下面开发一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,最后将结果保存至HDFS。

1. kafka程序

package com.dxss.storm;

import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import java.util.Properties; /**
* Created by hadoop on 2017/7/29.
*/
public class KakfaProceduer {
public static void main(String[] args) throws InterruptedException{
Properties properties = new Properties();
properties.put("zookeeper.connect","12.13.41.41:2182");
properties.put("metadata.broker.list","12.13.41.41:9092");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer producer = new Producer<String, String>(producerConfig);
KeyedMessage<String,String> keyedMessage = new KeyedMessage<String, String>("sunfei","I am a chinese");
while(true){
producer.send(keyedMessage);
}
}
}

2. Storm--spout

package com.dxss.stormkafka;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import scala.collection.mutable.ArrayBuilder.ofBoolean; public class PrinterBolt extends BaseRichBolt{ private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
} @Override
public void execute(Tuple input) {
// System.out.println("kafak所发布的消息为"+input.getString(0));
String value = input.getString();
String[] strings = value.split(" ");
for(String var : strings){
collector.emit(new Values(var,));
}
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "num")); } }

3. Storm-bolt

package com.dxss.stormkafka;

import java.util.HashMap;
import java.util.Map; import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; public class CountBolt extends BaseRichBolt{ private OutputCollector collector;
private Map<String,Integer> map = new HashMap(); @Override
public void execute(Tuple arg0) {
// TODO Auto-generated method stub
String word = arg0.getString();
Integer num = arg0.getInteger();
if (map.containsKey(word)){
Integer number = map.get(word);
map.put(word,number+);
}else{
map.put(word, num);
}
String result = word + ":" + map.get(word);
this.collector.emit(new Values(result));
} @Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
this.collector = arg2;
} @Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("result")); } }

4. Storm--Topology

package com.dxss.stormkafka;

import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts; public class StormKafka { public static StormTopology createTopology(){
BrokerHosts boBrokerHosts = new ZkHosts("12.13.41.41:2182");
String topic = "sunfei";
String zkRoot = "/sunfei";
String spoutId = "sunfei_storm";
SpoutConfig spoutConfig = new SpoutConfig(boBrokerHosts, topic, zkRoot, spoutId);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
// 输出字段分隔符
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(":");
// 每1000个tuple同步到hdfs一次
SyncPolicy syncPolicy = new CountSyncPolicy();
// 每个文件的大小为100M
FileRotationPolicy policy = new FileSizeRotationPolicy(100.0f,FileSizeRotationPolicy.Units.MB);
// 设置输出目录
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/test/storm")
.withPrefix("storm_").withExtension(".txt");
// 执行HDFS地址
HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://12.13.41.43:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(policy)
.withSyncPolicy(syncPolicy); TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new KafkaSpout(spoutConfig));
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
builder.setBolt("totalBolt", new CountBolt()).fieldsGrouping("print", new Fields("word"));
builder.setBolt("hdfsBolt", hdfsBolt).shuffleGrouping("totalBolt"); return builder.createTopology(); }
public static void main(String[] args) throws InterruptedException { Config config = new Config();
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, );
StormTopology topology = StormKafka.createTopology();
config.setNumWorkers();
config.setMaxTaskParallelism();
//提交集群模式
// try {
// StormSubmitter.submitTopology("mywordcount", config, topology);
// } catch (AlreadyAliveException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// } catch (InvalidTopologyException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
//本地运行模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafkastorm", config, topology);
Thread.sleep(); } }

5. 项目pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.</modelVersion>
<groupId>com.kafka</groupId>
<artifactId>StormDemo</artifactId>
<version>0.0.-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>0.10.</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency> <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.</version>
</dependency>
<!-- storm-kafka模块需要的依赖 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.5.</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<repository>
<id>central</id>
<url>http://repo1.maven.org/maven2/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>clojars</id>
<url>https://clojars.org/repo/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>scala-tools</id>
<url>http://scala-tools.org/repo-releases</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>conjars</id>
<url>http://conjars.org/repo/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-</encoding>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build> </project>