深入理解Kafka Producer内部机制

时间:2022-12-11 08:00:59

总的来说,Kafka Producer是将数据发送到kafka集群的客户端。其组成部分如下图所示:

深入理解Kafka Producer内部机制

基本组件:

  • Producer Metadata——管理生产者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
  • Partitioner——计算给定记录的分区。
  • Serializers——记录键和值序列化器。 序列化程序将对象转换为字节数组。
  • Producer Interceptors——可能改变记录的拦截器。
  • Record Accumulator——累积记录并按主题分区将它们分组为批次。
  • Transaction manager——管理事务并维护必要的状态以确保幂等生产。
  • Sender——向 Kafka 集群发送数据的后台线程。

配置Kafka Producer

kafka producer有三个必须指定的参数:

  • bootstrap.servers — 主机/端口对列表,用于建立与 Kafka 集群的初始连接。 格式:“host1:port1,host2:port2,…”
  • key.serializer — 代表实现 org.apache.kafka.common.serialization.Serializer 接口的key序列化器的完全限定类名。
  • value.serializer — 代表实现 org.apache.kafka.common.serialization.Serializer 接口的value序列化器的完全限定类名。

数据发送到kafka流程

Kafka Producer异步发送消息,返回一个Future,代表发送结果。 此外,用户可以选择提供在 Kafka broker确认记录时调用的回调。 虽然它看起来很简单,但在背后却发生了一些事情。

深入理解Kafka Producer内部机制

  1. 生产者将消息传递给已配置的拦截器列表。 例如,拦截器可能会改变消息并返回更新版本。
  2. 序列化程序将记录键和值转换为字节数组
  3. 如果没有特别指定,则使用默认或配置的分区程序计算主题分区。
  4. 记录累加器使用配置的压缩算法将消息附加到生产者批次。

此时,消息还在内存中,并没有发送给Kafka broker。 Record Accumulator 按主题和分区对内存中的消息进行分组。

深入理解Kafka Producer内部机制

Sender线程将具有相同broker的多个批次作为领导者分组到请求中并发送它们。 此时,消息被发送到Kafka。

发送时间

深入理解Kafka Producer内部机制

Kafka Producer 提供配置参数来控制在各个阶段花费的时间:

  • max.block.ms — 等待元数据获取和缓冲区分配的时间
  • linger.ms — 等待发送其他记录的时间
  • retry.backoff.ms——重试失败请求前等待的时间
  • request.timeout.ms — 等待 Kafka broker 响应的时间
  • delivery.timeout.ms — 后来引入,是 KIP-91 的一部分,用于为用户提供有保证的超时上限,而无需调整生产者组件内部结构

数据持久性

用户可以通过 acks 配置参数控制写入 Kafka 的消息的持久性。 允许的值为:

  • 0, producer 不会等待 broker 确认
  • 1,一个生产者只会等待分区领导者写消息,而不需要等待所有的追随者
  • all,生产者将等待所有同步副本确认消息。 这是以延迟成本为代价的,代表了最强大的可用保证。

使用 acks=all 时,对于同步副本有一些细微差别需要澄清。 在 Kafka 方面,两个设置和当前状态会影响行为:

  • Topic replication factor
  • min.insync.replicas 设置
  • 当前同步副本的数量,包括领导者本身。

min.insync.replicas 指定 acks=all 请求的同步副本的最小阈值。 如果不能满足这个要求,Broker 会拒绝 producer 的请求,甚至不尝试写等待确认。 下表说明了可能的情况。

深入理解Kafka Producer内部机制

在瞬时故障期间,同步副本数可能低于副本总数,但只要它大于或等于 min.insync.replicas——带有 acks=all 的请求就会成功。

用户可以通过重新发送失败的请求来减轻暂时性故障并增加持久性。 这可以通过重试(默认 MAX_INT)和 delivery.timeout.ms(默认 120000)设置来实现。 重试可能会导致消息重复并改变消息的顺序。 这些副作用可以通过设置 enable.idempotence=true 来减轻,但它是以降低吞吐量为代价的。

分区

主题中的消息被组织成分区。 用户可以通过消息键或可插入的 ProducerPartitioner 实现来控制分区分配。 Partitioner 可以使用 partitioner.class 配置来设置,它应该是一个实现 org.apache.kafka.clients.producer.Partitioner 接口的完全限定类名。

Kafka 提供了三种开箱即用的实现:DefaultPartitioner、RoundRobinPartitioner 和 UniformStickyPartitioner。

DefaultPartitioner——如果消息键为空——使用当前分区,并在下一批中更改。 对于非空键,它使用以下公式计算:murmur2hash(key) % total nr of topic partitions。

RoundRobinPartitioner — 忽略消息键,以循环方式在所有活动分区之间平均分配消息。 如果分区有一个指定的代理作为领导者,则分区被认为是活跃的。

UniformStickyPartitioner——忽略消息键,使用当前分区,并在下一批更改分区。