sparkStreaming消费kafka-1.0.1方式:direct方式(存储offset到zookeeper)

时间:2023-03-09 02:53:46
sparkStreaming消费kafka-1.0.1方式:direct方式(存储offset到zookeeper)

版本声明:

kafka:1.0.1

spark:2.1.0

注意:在使用过程中可能会出现servlet版本不兼容的问题,因此在导入maven的pom文件的时候,需要做适当的排除操作

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>kafkaDirect</groupId>
<artifactId>kafkaDirect</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories> <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-streaming-kafka_2.11</artifactId>-->
<!--<version>2.1.0</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>javax.servlet</groupId>-->
<!--<artifactId>servlet-api</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency> </dependencies>
</project>

代码:

因为使用了zookeeper作为offset的存储,因此任何能够监控zookeeper的框架,都可以监控当前kafka消费状况

例如:kafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor/releases

其中注意的小点:

1:在zookeeper中offset存储路径:/consumers/[groupId]/offsets/topic/[partitionId]

2:读取offset操作,其实就是去zookeeper的路径下拿offset值,代码:

 def readOffsets(
topics: Seq[String],
groupId:String ,
zkUtils: ZkUtils
): Map[TopicPartition, Long] = { val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long] val partitionMap = zkUtils.getPartitionsForTopics(topics) // /consumers/<groupId>/offsets/<topic>/ partitionMap.foreach(topicPartitions => { val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
//遍历每一个分区下的数据
topicPartitions._2.foreach(partition => { val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
try { val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong) } } catch { case e: Exception => // println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
println("message: {} , topic: {}, partition: {}, node path: {}" , e.getMessage , topics , topicPartitions , offsetPath)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L) } }) }) topicPartOffsetMap.toMap }

3:提交offset代码,实际就是将offset存储到zookeeper中

def persistOffsets(
offsets: Seq[OffsetRange],
groupId: String,
storeEndOffset: Boolean = true,
zkUtils: ZkUtils
): Unit = { offsets.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
println(or.topic.toString , or.partition.toString , offsetVal , offsetPath)
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls) }) }

完整代码

 package offsetInZookeeper

 import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.{Assign, Subscribe}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.ACL import scala.collection.JavaConversions
import scala.collection.mutable.ListBuffer /**
* Created by angel
*/
object KafkaOffsetInZookeeper {
def main(args: Array[String]): Unit = {
//5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181
if (args.length < 5) {
System.err.println("Usage: KafkaDirectStreamTest " +
"<batch-duration-in-seconds> " +
"<kafka-bootstrap-servers> " +
"<kafka-topics> " +
"<kafka-consumer-group-id> " +
"<kafka-zookeeper-quorum>")
System.exit(1)
} val batchDuration = args(0)
val bootstrapServers = args(1).toString
val topicsSet = args(2).toString.split(",").toSet
val consumerGroupID = args(3)
val zkQuorum = args(4)
val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
.setMaster("local[4]")//Uncomment this line to test while developing on a workstation
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
val topics = topicsSet.toArray
val topic = topics(0)
// /consumers/[groupId]/offsets/topic/[partitionId]
//+"/consumers/"+consumerGroupID+"/offsets/"+topic
val zkKafkaRootDir = zkQuorum + "/consumers/"+consumerGroupID+"/offsets/"+topic
val zkSessionTimeOut = 10000
val zkConnectionTimeOut = 10000
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkKafkaRootDir, zkSessionTimeOut, zkConnectionTimeOut)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false) val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> consumerGroupID,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
) //去zookeeper上拿offset
val fromOffsets: Map[TopicPartition, Long] = readOffsets(topics , consumerGroupID , zkUtils)
//根据offset获取数据
// val inputDStream = KafkaUtils.createDirectStream[String, String](
// ssc,
// PreferConsistent,
// Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets)
// ) //offsets: ju.Map[TopicPartition, jl.Long]
// val inputDStream = KafkaUtils.createDirectStream[String, String](
// ssc,
// PreferConsistent,
// Subscribe[String, String](topics, kafkaParams , fromOffsets)
// )
val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
//处理数据,处理完事之后将offset写入zookeeper
var storeEndOffset: Boolean = false
inputDStream.foreachRDD((rdd,batchTime) => { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(
offset =>
println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset)
)
val newRDD = rdd.map(message => processMessage(message))
// newRDD.count()
persistOffsets(offsetRanges,consumerGroupID,storeEndOffset,zkUtils)
}) // println("Number of messages processed " + inputDStream.count())
ssc.start()
ssc.awaitTermination() } /*
Create a dummy process that simply returns the message as is.
*/
def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]={
message
} def readOffsets(
topics: Seq[String],
groupId:String ,
zkUtils: ZkUtils
): Map[TopicPartition, Long] = { val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long] val partitionMap = zkUtils.getPartitionsForTopics(topics) // /consumers/<groupId>/offsets/<topic>/ partitionMap.foreach(topicPartitions => { val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
//遍历每一个分区下的数据
topicPartitions._2.foreach(partition => { val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
try { val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong) } } catch { case e: Exception => // println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
println("message: {} , topic: {}, partition: {}, node path: {}" , e.getMessage , topics , topicPartitions , offsetPath)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L) } }) }) topicPartOffsetMap.toMap } def persistOffsets(
offsets: Seq[OffsetRange],
groupId: String,
storeEndOffset: Boolean = true,
zkUtils: ZkUtils
): Unit = { offsets.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
println(or.topic.toString , or.partition.toString , offsetVal , offsetPath)
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls) }) } }

第二种代码:

package offsetInZookeeper

/**
* Created by angel
*/
import java.lang.Object import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try
/**
* Kafka的连接和Offset管理工具类
*
* @param zkHosts Zookeeper地址
* @param kafkaParams Kafka启动参数
*/
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {
//Logback日志对象,使用slf4j框架
@transient private lazy val log = LoggerFactory.getLogger(getClass)
//建立ZkUtils对象所需的参数
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000)
//ZkUtils对象,用于访问Zookeeper
val zkUtils = new ZkUtils(zkClient, zkConnection, false)
/**
* 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
*
* @param ssc Spark Streaming Context
* @param topics Kafka话题
* @tparam K Kafka消息Key类型
* @tparam V Kafka消息Value类型
* @return Kafka Streaming流
*/
def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
val groupId = kafkaParams("group.id").toString
val storedOffsets = readOffsets(topics, groupId)
log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
kafkaStream
}
/**
* 从Zookeeper读取Kafka消息队列的Offset
*
* @param topics Kafka话题
* @param groupId Kafka Group ID
* @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0
*/
def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics)
// /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
val tryGetKafkaOffset = Try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
}
}
if(tryGetKafkaOffset.isFailure){
//http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
val consumer = new KafkaConsumer[String, Object](kafkaParams)
val partitionList = List(new TopicPartition(topicPartitions._1, partition))
consumer.assign(partitionList)
val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
consumer.close()
log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
}
})
})
topicPartOffsetMap.toMap
}
/**
* 保存Kafka消息队列消费的Offset
*
* @param rdd SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]
* @param storeEndOffset true=保存结束offset, false=保存起始offset
*/
def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
val groupId = kafkaParams("group.id").toString
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetsList.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
})
} } object Manager{
def main(args: Array[String]): Unit = {
//5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181
if (args.length < 5) {
System.err.println("Usage: KafkaDirectStreamTest " +
"<batch-duration-in-seconds> " +
"<kafka-bootstrap-servers> " +
"<kafka-topics> " +
"<kafka-consumer-group-id> " +
"<kafka-zookeeper-quorum>")
System.exit(1)
} val batchDuration = args(0)
val bootstrapServers = args(1).toString
val topicsSet = args(2).toString.split(",").toSet
val consumerGroupID = args(3)
val zkQuorum = args(4)
val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
.setMaster("local[4]") val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong)) val topics = topicsSet.toArray val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> consumerGroupID,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
) lazy val kafkaManager = new KafkaManager(zkQuorum , kafkaParams)
val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc , topics)
inputDStream.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(
offset =>
println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset)
)
kafkaManager.persistOffsets(rdd)
})
ssc.start()
ssc.awaitTermination() } }