Flume整合Spark Streaming

时间:2023-03-09 17:23:40
Flume整合Spark Streaming

Spark版本1.5.2,Flume版本:1.6

Flume agent配置文件:spool-8.51.conf

agent.sources = source1
agent.channels = memoryChannel
agent.sinks = sink1 agent.sources.source1.type = spooldir
agent.sources.source1.spoolDir=/data/apache-flume-1.6.0-bin/spooldir
agent.sources.source1.fileHeader = true
#agent.sources.source1.deletePolicy =immediate agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.keep-alive = 1000 agent.sinks.sink1.type = avro
agent.sinks.sink1.hostname = 192.168.1.11 # 这是spark集群中任意executor 的ip
agent.sinks.sink1.port = 23004
agent.sinks.sink1.channel = memoryChannel
agent.sources.source1.channels = memoryChannel

  

  maven文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>cn.test</groupId>
<artifactId>sparkTest</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging> <name>pconliners</name>
<url>http://maven.apache.org</url> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.1</version>
</dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>1.5.2</version>
</dependency> <dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.5.2</version>
</dependency> </dependencies> <build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

  

  Java测试代码

public final class FlumeEventCount {

    public static void main(String[] args) {

        String host = args[0];
int port = Integer.parseInt(args[1]); Duration batchInterval = new Duration(Integer.parseInt(args[2]));
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); System.out.println("flumeStream.count():"+flumeStream.count());; flumeStream.count().map(new Function<Long, String>() {
private static final long serialVersionUID = -572435064083746235L; public String call(Long in) {
System.out.println("Flume test ....."+in);
return "Received " + in + " flume events....";
}
}).print(); ssc.start();
ssc.awaitTermination();
}
}

  

打成jar包,启动Spark streaming程序

spark-submit --class cn.test.FlumeEventCount --master spark://192.168.1.10:7077 sparkTest-0.0.1-jar-with-dependencies.jar 192.168.1.11 23004 5000

   

运行agent:

cd到flume安装目录,执行。

 bin/flume-ng agent -n agent -c conf -f conf/spool-8.51.conf -Dflume.root.logger=DEBUG,console

复制文件到监控目录:

cp spool-test.txt  /data/apache-flume-1.6.0-bin/spooldir/

  查看提交Spark 任务输出:

-------------------------------------------
Time: 1472202305000 ms
-------------------------------------------
Received 120 flume events....

  

基于拉模式
Java代码:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent; public class SparkStreamingFlume2 { public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Streaming...");
StreamingContext streamingContext = new StreamingContext(conf,Durations.seconds(30));
JavaStreamingContext ssc = new JavaStreamingContext(streamingContext);
String host = args[0];
int port = Integer.parseInt(args[1]);
JavaReceiverInputDStream<SparkFlumeEvent> pollingStream = FlumeUtils.createPollingStream(ssc, host, port);
pollingStream.count().map(new Function<Long, String>() { public String call(Long v1) throws Exception {
return "Received " + v1 + " flume events.";
}
}).print();
ssc.start();
ssc.awaitTermination();
}
}

  flume配置文件,在flume的conf目录创建一个flume-pull.conf文件,

数据来源是netcat:

a1.sources = r1
a1.channels = c1
a1.sinks = k1 a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.11.16
a1.sources.r1.port = 22222
a1.sources.r1.channels = c1 a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.11.16
a1.sinks.k1.port = 11111
a1.sinks.k1.channel = c1

 数据来源是文件夹:

a1.sources = r1
a1.channels = c1
a1.sinks = k1 a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/data/apache-flume-1.6.0-bin/spooldir
a1.sources.r1.fileHeader = true
a1.sources.r1.deletePolicy =immediate
a1.sources.r1.channels = c1 a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.11.16
a1.sinks.k1.port = 11111
a1.sinks.k1.channel = c1

  这里使用的是文件夹作为数据来源。

  由于用到了agent的sink是 org.apache.spark.streaming.flume.sink.SparkSink类型,需要把spark-streaming-flume-sink_2.10-1.5.2.jar复制到flume的lib目录,否则,会报找不到org.apache.spark.streaming.flume.sink.SparkSink类的错误。

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:71)
at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:410)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.flume.sink.SparkSink
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:190)
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
... 11 more

  

先启动flume,

bin/flume-ng agent --conf conf --conf-file conf/flume-pull.conf --name a1 -Dflume.root.logger=INFO,console

注意,flume的--name参数项要跟配置项的agent名一致,配置文件不要弄错。

在控制台看到如下如下信息,agent分别启动了channel、sink、source,说明,agent启动成功

2016-08-30 15:20:45,990 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
2016-08-30 15:20:45,991 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
2016-08-30 15:20:45,991 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
2016-08-30 15:20:45,992 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:78)] SpoolDirectorySource source starting with directory: /data/apache-flume-1.6.0-bin/spooldir
2016-08-30 15:20:45,992 (lifecycleSupervisor-1-1) [INFO - org.apache.spark.streaming.flume.sink.Logging$class.logInfo(Logging.scala:47)] Starting Spark Sink: k1 on port: 11111 and interface: 192.168.11.16 with pool size: 10 and transaction timeout: 60.
2016-08-30 15:20:46,021 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2016-08-30 15:20:46,021 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
2016-08-30 15:20:46,462 (lifecycleSupervisor-1-1) [INFO - org.apache.spark.streaming.flume.sink.Logging$class.logInfo(Logging.scala:47)] Starting Avro server for sink: k1
2016-08-30 15:20:46,464 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.spark.streaming.flume.sink.Logging$class.logInfo(Logging.scala:47)] Blocking Sink Runner, sink will continue to run..

  再提交Spark Streaming任务,

spark-submit --class cn.test.SparkStreamingFlume2 --master spark://192.168.8.51:7077 sparkTest-0.0.1-jar-with-dependencies.jar 192.168.11.16 11111

  

192.168.11.16就是启动了agent的ip。

看到agent的控制台输出了Spark Streaming任务已经连接了agent的消息:

2016-08-30 15:21:10,896 (New I/O server boss #1 ([id: 0xbad2f716, /192.168.11.16:11111])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4e25ab4e, /192.168.8.55:55293 => /192.168.11.16:11111] OPEN
2016-08-30 15:21:10,898 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4e25ab4e, /192.168.8.55:55293 => /192.168.11.16:11111] BOUND: /192.168.11.16:11111
2016-08-30 15:21:10,898 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4e25ab4e, /192.168.8.55:55293 => /192.168.11.16:11111] CONNECTED: /192.168.8.55:55293

在启动agent的机器,复制一些文件到/data/apache-flume-1.6.0-bin/spooldir,cp data.txt /data/apache-flume-1.6.0-bin/spooldir/

看到Spark 任务输出,说明测试成功。

-------------------------------------------
Time: 1472541990000 ms
-------------------------------------------
Received 54 flume events.

  如果agent出现以下错误

org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1$$anonfun$apply$1.apply$mcV$sp(TransactionProcessor.scala:123)
at scala.util.control.Breaks.breakable(Breaks.scala:37)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:119)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

  那就设置一下channel的内存好了

a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000

  获取Flume数据:示例:

pollingStream.map(new Function<SparkFlumeEvent, String>() {

	public String call(SparkFlumeEvent v1) throws Exception {
return new String(v1.event().getBody().array());
}
}).print();

  测试拉模式

首先,我们将Spark Streaming 任务停掉,然后将文件复制到监控文件夹下,cp ../conf/* .

这时候Flume的日志是:

2016-08-30 16:47:49,727 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4c760232, /192.168.8.56:58196 :> /192.168.11.16:11111] DISCONNECTED
2016-08-30 16:47:49,728 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4c760232, /192.168.8.56:58196 :> /192.168.11.16:11111] UNBOUND
2016-08-30 16:47:49,728 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4c760232, /192.168.8.56:58196 :> /192.168.11.16:11111] CLOSED
2016-08-30 16:47:49,728 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.8.56:58196 disconnected.

再提交Spark Streaming任务,看到Flume控制台如下输出:Flume重新

2016-08-30 16:48:49,726 (Spark Sink Processor Thread - 2) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Spark was unable to successfully process the events. Transaction is being rolled back.

  Spark控制台输出:

-------------------------------------------
Time: 1472546940000 ms
-------------------------------------------
Received 54 flume events.

  这个测试说明,Flume基于拉模式下,数据不会丢失。