Apache Spark 1.3.0引入了Direct API,利用Kafka的低层次API从Kafka集群中读取数据,并且在Spark Streaming系统里面维护偏移量相关的信息,并且通过这种方式去实现零数据丢失(zero data loss)相比使用基于Receiver的方法要高效。但是因为是Spark Streaming系统自己维护Kafka的读偏移量,而Spark Streaming系统并没有将这个消费的偏移量发送到Zookeeper中,这将导致那些基于偏移量的Kafka集群监控软件(比如:Apache Kafka监控之Kafka Web Console、Apache Kafka监控之KafkaOffsetMonitor等)失效。本文就是基于为了解决这个问题,使得我们编写的Spark Streaming程序能够在每次接收到数据之后自动地更新Zookeeper中Kafka的偏移量。
我们从Spark的官方文档可以知道,维护Spark内部维护Kafka便宜了信息是存储在HasOffsetRanges
类的offsetRanges
中,我们可以在Spark Streaming程序里面获取这些信息:
1 |
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
|
这样我们就可以获取所以分区消费信息,只需要遍历offsetsList,然后将这些信息发送到Zookeeper即可更新Kafka消费的偏移量。完整的代码片段如下:
01 |
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
|
02 |
messages.foreachRDD(rdd = > {
|
03 |
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
|
04 |
val kc = new KafkaCluster(kafkaParams)
|
05 |
for (offsets < - offsetsList) {
|
06 |
val topicAndPartition = TopicAndPartition( "iteblog" , offsets.partition)
|
07 |
val o = kc.setConsumerOffsets(args( 0 ), Map((topicAndPartition, offsets.untilOffset)))
|
09 |
println(s "Error updating the offset to Kafka cluster: ${o.left.get}" )
|
KafkaCluster
类用于建立和Kafka集群的链接相关的操作工具类,我们可以对Kafka中Topic的每个分区设置其相应的偏移量Map((topicAndPartition, offsets.untilOffset))
,然后调用KafkaCluster
类的setConsumerOffsets
方法去更新Zookeeper里面的信息,这样我们就可以更新Kafka的偏移量,最后我们就可以通过KafkaOffsetMonitor之类软件去监控Kafka中相应Topic的消费信息,下图是KafkaOffsetMonitor的监控情况:
从图中我们可以看到KafkaOffsetMonitor监控软件已经可以监控到Kafka相关分区的消费情况,这对监控我们整个Spark Streaming程序来非常重要,因为我们可以任意时刻了解Spark读取速度。另外,KafkaCluster工具类的完整代码如下:
01 |
package org.apache.spark.streaming.kafka
|
03 |
import kafka.api.OffsetCommitRequest
|
04 |
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
|
05 |
import kafka.consumer.SimpleConsumer
|
06 |
import org.apache.spark.SparkException
|
07 |
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
|
09 |
import scala.collection.mutable.ArrayBuffer
|
10 |
import scala.util.Random
|
11 |
import scala.util.control.NonFatal
|
19 |
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
|
20 |
* 过往记忆博客微信公共帐号:iteblog_hadoop
|
23 |
class KafkaCluster( val kafkaParams : Map[String, String]) extends Serializable {
|
24 |
type Err = ArrayBuffer[Throwable]
|
26 |
@ transient private var _ config : SimpleConsumerConfig = null
|
28 |
def config : SimpleConsumerConfig = this .synchronized {
|
29 |
if ( _ config == null ) {
|
30 |
_ config = SimpleConsumerConfig(kafkaParams)
|
35 |
def setConsumerOffsets(groupId : String,
|
36 |
offsets : Map[TopicAndPartition, Long]
|
37 |
) : Either[Err, Map[TopicAndPartition, Short]] = {
|
38 |
setConsumerOffsetMetadata(groupId, offsets.map { kv = >
|
39 |
kv. _ 1 -> OffsetMetadataAndError(kv. _ 2 )
|
43 |
def setConsumerOffsetMetadata(groupId : String,
|
44 |
metadata : Map[TopicAndPartition, OffsetMetadataAndError]
|
45 |
) : Either[Err, Map[TopicAndPartition, Short]] = {
|
46 |
var result = Map[TopicAndPartition, Short]()
|
47 |
val req = OffsetCommitRequest(groupId, metadata)
|
49 |
val topicAndPartitions = metadata.keySet
|
50 |
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer = >
|
51 |
val resp = consumer.commitOffsets(req)
|
52 |
val respMap = resp.requestInfo
|
53 |
val needed = topicAndPartitions.diff(result.keySet)
|
54 |
needed.foreach { tp : TopicAndPartition = >
|
55 |
respMap.get(tp).foreach { err : Short = >
|
56 |
if (err == ErrorMapping.NoError) {
|
59 |
errs.append(ErrorMapping.exceptionFor(err))
|
63 |
if (result.keys.size == topicAndPartitions.size) {
|
67 |
val missing = topicAndPartitions.diff(result.keySet)
|
68 |
errs.append( new SparkException(s "Couldn't set offsets for ${missing}" ))
|
72 |
private def withBrokers(brokers : Iterable[(String, Int)], errs : Err)
|
73 |
(fn : SimpleConsumer = > Any) : Unit = {
|
74 |
brokers.foreach { hp = >
|
75 |
var consumer : SimpleConsumer = null
|
77 |
consumer = connect(hp. _ 1 , hp. _ 2 )
|
83 |
if (consumer ! = null ) {
|
90 |
def connect(host : String, port : Int) : SimpleConsumer =
|
91 |
new SimpleConsumer(host, port, config.socketTimeoutMs,
|
92 |
config.socketReceiveBufferBytes, config.clientId)
|