Apache Kafka + Spark Streaming Integration

时间:2023-03-09 14:55:25
Apache Kafka + Spark Streaming Integration

1.目标

为了构建实时应用程序,Apache Kafka  - Spark Streaming Integration是最佳组合。因此,在本文中,我们将详细了解Kafka中Spark Streaming Integration的整个概念。此外,我们将看看Spark Streaming-Kafka示例。在此之后,我们将讨论基于接收器的方法和Kafka Spark Streaming Integration的直接方法。此外,我们将在Kafka Spark Streaming Integration中看到直接接近基于接收器的方法的优势。
那么,让我们开始Kafka Spark Streaming Integration

Apache Kafka + Spark Streaming Integration

Apache Kafka Spark Streaming Integration

你在卡夫卡有多好

2.什么是Kafka Spark Streaming Integration?

在Apache Kafka Spark Streaming Integration中,有两种方法可以配置Spark Streaming以从Kafka接收数据,即Kafka Spark Streaming Integration。首先是使用Receivers和Kafka的高级API,第二种以及新方法是不使用Receiver。这两种方法都有不同的编程模型,例如性能特征和语义保证。

Apache Kafka + Spark Streaming Integration

什么是Kafka-Spark Streaming Integration

让我们详细研究这两种方法。

一个。基于接收者的方法

在这里,我们使用Receiver接收数据。因此,通过使用Kafka高级消费者API,我们实现了Receiver。此外,接收的数据存储在Spark执行程序中。然后由Kafka发起的作业 - Spark Streaming处理数据。
尽管如此,这种方法可能会在默认配置下丢失故障数据。因此,我们必须在Kafka Spark Streaming中另外启用预写日志,以确保零数据丢失。这会将所有收到的Kafka数据同步保存到分布式文件系统上的预写日志中。通过这种方式,可以在故障时恢复所有数据。
Apache Kafka工作流程| Kafka Pub-Sub Messaging
此外,我们将讨论如何在我们的Kafka Spark Streaming应用程序中使用这种基于Receiver的方法。

一世。 链接

现在,使用SBT / Maven项目定义为Scala / Java应用程序链接您的Kafka流应用程序与以下工件。

  1. groupId = org.apache.spark
    artifactId = spark-streaming-kafka--8_2.
    version = 2.2.

但是,在为Python应用程序部署应用程序时,我们必须添加上面的库及其依赖项

II。 程序设计

然后,通过在流应用程序代码中导入KafkaUtils来创建输入DStream:

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
  1. import org.apache.spark.streaming.kafka._
  2. val kafkaStream = KafkaUtils。createStream (streamingContext,
  3. [ ZK quorum ] ,[ 消费者群组ID ] ,[ 消费的Kafka分区的每个主题数量] )

此外,使用createStream的变体,我们可以指定键和值类及其相应的解码器类。

III。 部署

与任何Spark应用程序一样,spark-submit用于启动您的应用程序。但是,Scala / Java应用程序和Python应用程序的细节略有不同。
详细了解Spark用例
此外,对于缺乏SBT / Maven项目管理的Python应用程序,使用-packages spark-streaming-Kafka-0-8_2.11及其依赖项可以直接添加到spark-submit。

  1. ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka--8_2.:2.2. ...

此外,我们还可以从Maven存储库下载Maven工件spark-streaming-Kafka-0-8-assembly的JAR。然后使用-jars将其添加到spark-submit。

湾 直接方法(无接收器)

在基于接收器的方法之后,引入了新的无接收器“直接”方法。它确保了更强大的端到端保证。此方法定期向Kafka查询每个主题+分区中的最新偏移量,而不是使用接收器来接收数据。此外,相应地定义要在每个批次中处理的偏移范围。此外,为了从Kafka读取定义的偏移范围,使用简单的消费者API,尤其是在启动处理数据的作业时。但是,它类似于从文件系统读取文件。
注意:此功能是在Spark 1.3中为Scala和Java API引入的,在Spark 1.4中为Python API引入。
现在,让我们讨论如何在流应用程序中使用此方法。
要了解有关Consumer API的更多信息,请访问以下链接:
Apache Kafka Consumer | 卡夫卡消费者的例子

一世。链接

但是,此方法仅在Scala / Java应用程序中受支持。使用以下工件,链接SBT / Maven项目。

  1. groupId = org.apache.spark
    artifactId = spark-streaming-kafka--8_2.
    version = 2.2.

    II。程序设计

此外,在流应用程序代码中导入KafkaUtils并创建输入DStream:

import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
  1. import org.apache.spark.streaming.kafka._
  2. val directKafkaStream = KafkaUtils.createDirectStream [
  3. [ key class ] ,[ value class ] ,[ key decoder class ] ,[ value decoder class ] ] (
  4. streamingContext,[ 卡夫卡参数的地图] ,[ 要消费的主题集] )

我们必须在Kafka参数中指定metadata.broker.list或bootstrap.servers。因此,默认情况下,它将从每个Kafka分区的最新偏移开始消耗。但是,如果将Kafka参数中的配置auto.offset.reset设置为最小,它将从最小偏移开始消耗。
此外,使用KafkaUtils.createDirectStream的其他变体,我们可以从任意偏移开始消耗。然后,执行以下操作以访问每批中消耗的Kafka偏移量。

  1. //保持对当前偏移范围的引用,以便下游可以使用它
  2. // Hold a reference to the current offset ranges, so downstream can use it
    var offsetRanges = Array.empty[OffsetRange]
    directKafkaStream.transform { rdd =>
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
    }.map {
    ...
    }.foreachRDD { rdd =>
    for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
    }
    ...
    }

如果我们希望基于Zookeeper的Kafka监控工具显示流应用程序的进度,我们可以使用它来更新Zookeeper自己。
阅读前五篇Apache Kafka书籍| 学习卡夫卡的完整指南

III。部署

这里,部署过程类似于部署基于Receiver的方法的过程。

3.直接方法的优点

与Kafka的Spark Streaming集成相比,第二种方法优于第一种方法:

Apache Kafka + Spark Streaming Integration

直接方法在与Kafka的Spark Streaming集成中的优势

一个。简化的并行性

不需要创建多个输入Kafka流并将它们联合起来。但是,Kafka - Spark Streaming将使用直接流创建与要使用的Kafka分区一样多的RDD分区。这将同时从Kafka读取数据。因此,我们可以说,它是Kafka和RDD分区之间的一对一映射,更容易理解和调整。

湾 效率

在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这进一步复制了数据。这实际上是低效的,因为数据有效地被复制两次 - 一次由Kafka复制,第二次由预写日志复制。第二种方法消除了问题,因为没有接收器,因此不需要预写日志。只要我们有足够的Kafka保留,就可以从Kafka恢复消息。

C。完全一次的语义

基本上,我们使用Kafka的高级API在第一种方法中在Zookeeper中存储消耗的偏移量。但是,要消费来自Kafka的数据,这是一种传统方式。即使它可以确保零数据丢失,但在某些故障情况下,某些记录可能会被消耗两次。这是由于Kafka可靠接收的数据之间的不一致 - Spark Streaming和Zookeeper跟踪的偏移。因此,在第二种方法中,我们使用了一个不使用Zookeeper的简单Kafka API。
让我们修改Apache Kafka架构及其基本概念
因此,尽管出现故障,但Spark Streaming有效地接收了每条记录一次。因此,请确保将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移的原子事务。这有助于为我们的结果输出实现完全一次的语义。
虽然,还有一个缺点,即它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具不会显示进度。但是,我们仍然可以在每个批次中访问此方法处理的偏移量,并自行更新Zookeeper。
所以,这就是Apache Kafka Spark Streaming Integration。希望你喜欢我们的解释。
Apache Spark中的Spark Streaming Checkpoint

4。结论

因此,在这个Kafka-Spark Streaming Integration中,我们已经详细了解了Spark Kafka与Spark Kafka集成的整个概念。此外,我们讨论了Kafka Spark Streaming配置的两种不同方法,即接收方法和直接方法