Apache Kafka 源码剖析

时间:2021-12-05 07:21:30

Getting Start

下载

  • http://kafka.apache.org/

优点和应用场景

  • Kafka消息驱动,符合发布-订阅模式,优点和应用范围都共通
  • 发布-订阅模式优点
    1. 解耦合 : 两个应用不需要相互调用
    2. 可扩展性 : 消费者的个数可实时扩展
    3. 实时性 : 消费者能实时的获取生产者发布的事件
    4. 高效 :减少由于多个消费者请求数据造成的数据计算带来的资源消耗
    5. 异步通讯 :发布-订阅模式是天生的异步通讯
  • Kafka其他优点
    1. 持久化 : 消息丢失的可控性极高
    2. 高性能 : 磁盘顺序读写性能比内存随机读写还高,每秒10万条消息
    3. 高吞吐量 :每秒上百MB的吞吐量
    4. 顺序性
  • 发布-订阅模式应用范围
    1. 适合数据一被生产,就需要被处理的情况
    2. 适合数据具有潜在消费者的情况
    3. 适合无论有没有消费者,数据都在生产的情况
    4. 不适合对数据的处理时间有特殊限定的情况
  • 应用场景
    1. 最为消息中间件,实现消息队列和消息的发布-订阅,消息驱动的服务
    2. 数据总线,一对多的模式
    3. 日志收集,消息中间件的一种应用
    4. 数据库主从同步

核心概念

  • Broker
    1. 一个Kafka server就是一个Broker
    2. 一般情况下,一个Broker独占一台服务器,发挥微服务的优势
    3. 服务器资源有限的情况下,需要设计出Broker/Topic/Partition/Replica的最优分配策略,从而充分利用服务器资源
    4. 一个broker可以有多个topic
  • Topic
    1. 存储消息的逻辑上的消息集合
    2. 每个Topic有多个分区
  • 分区 Partition
    1. 同一个Topic的不同分区分配在不同Broker上,也就是一个分区一个服务器
    2. 不同Topic的分区可以共享一个服务器
    3. 同一个分区的消息是有序的,通过维护offset实现
    4. 相同key的消息会被发布到同一个分区
    5. 同一个分区的消息会被一个消费组里固定的一个消费者独占消费
    6. 通过增加分区来增加并行处理能力
    7. 每个分区可以有多个副本
  • 消费组 Consumer Group
    1. 实现一个消息只被同组的一个消费者独占消费
    2. 消费组里的消费者有变化的时候会触发Rebalance操作重新分配分区与消费者的对应关系
    3. Rebalance操作实现了分区消费的故障转移
    4. 通过增加分区和消费组里的消费者数量来水平扩展,理想情况一对一,也可以一对多,最好不要多对一,造成浪费
  • 副本 Replica
    1. 同一个分区的不同副本分配在不同Broker上,但是这些Broker可以是在同一台服务器上,也可以不是
    2. 副本是一个热备份设计,会选举一个作为Leader,提供对外服务
    3. Fllower副本批量的从Leader副本同步消息
  • HW & LEO
    1. HW是所有ISR副本都有的最新offset,HW之前的消息在所有副本中都存在,HW由Leader副本维护
    2. 所有消费者只能获取HW之前的消息,这样保证了Leader副本不可用的情况下,所有消费者的状态是一致的
    3. LEO是每个副本各自的最新offset
  • ISR集合
    1. 满足两个条件的副本会被选入ISR可用副本集合
      1. 副本与Zookeeper连接
      2. 副本的LEO与Leader副本的LEO差值不超过阈值
    2. ISR集合保证了Kafka不会被故障副本拖累,也保证了Leader的HW与LEO的差值在阈值内
  • 生产者
    1. 异步提交
      1. acks=0 : 生产者只管提交,不会等待Leader副本返回,不保证数据不丢失
    2. 同步提交
      1. acks=1 : 默认设置,生产者等待Leader副本返回成功,保证数据在Leader中部丢失,但是不保证重新选举后数据不丢失
    3. 同步复制
      1. acks=all : 生产者等待所有副本同步消息后才算提交成功,保证数据不丢失,性能低
  • Log
    1. 一个副本对应一个Log,用于持久化数据,Kafka采用顺序读写的方式,性能高
    2. 一个Log里有多个Segment,每个Segment有一个日志文件和一个索引文件
    3. 日志文件的大小有限制,超出后会生成新的Segment
    4. 日志消息保留策略有两种
      1. 消息的保留时间超过指定时间,可以被删除
      2. Topic的存储满,可以开始删除最旧的消息
      3. 保留策略可以全局配置,也可以按Topic配置
    5. 日志压缩
      1. 开启日志压缩后,相同的key会被定期合并,只保留最新的value

Kafka/zookeeper 命令

  1. 启动Zookeeper
    1. ./zookeeper-server-start.sh ../config/zookeeper.properties
  2. 启动Kafka
    1. ./kafka-server-start.sh ../config/server.properties
  3. 查看Topic
    1. ./kafka-topics.sh --list --zookeeper localhost:9860
  4. 删除Topic
    1. ./kafka-topics.sh --delete --zookeeper localhost:9860 /kafka --topic test
    2. 不会立马删除topic
  5. 查看Topic的详细信息
    1. ./kafka-topics.sh --zookeeper localhost:9860 --topic test--describe
  6. 查看zk信息
    1. ./zookeeper-shell.sh 127.0.0.1:9860
  7. 生产数据
    1. ./kafka-console-producer.sh --broker-list cvatap3d.nam.nsroot.net:9801 --topic midcurve-ds
  8. 消费数据
    1. ./kafka-console-consumer.sh --zookeeper localhost:9860 --topic midcurve-ds-subscribe --from-beginning

Kafka集群

zookeeper集群配置 : zookeeper.properties

  • clientPort=2180
    1. 端口号
  • dataDir=/tmp/zookeeper
    1. 集群信息记录目录,清空目录可以重置zookeeper
    2. 如果需要在同一台server上启动多个node,这个路径必须不同
  • tickTime=2000
    1. zookeeper副本与leader之间维护心跳的频率
  • initLimit=5
    1. zookeeper的leader初始化连接follower时等待多少个tickTime时间的心跳,超时副本连接失败
  • syncLimit=2
    1. leader与follower之间发送消息,请求和应答超时是多少个tickTime
  • server.0=cvatap3d.nam.nsroot.net:2888:3888
  • server.1=cvatap3d.nam.nsroot.net:2889:3889
  • server.2=cvatap3d.nam.nsroot.net:2890:3890
    1. 第一个启动的为leader
    2. zookeeper集群数量必须是基数3,5,7...
    3. 0,1,2是服务id,需要在对应的dataDir=/tmp/zookeeper下面创建myid文件,内容就是服务id,比如0
    4. ip或者host都可以
    5. 后面两个端口是zookeeper内部通讯使用
      1. 第一个端口是用于副本与Leader建立TCP连接
      2. 第二个端口是用于Leader选举的TCP端口

Kafka配置 : server.properties

  • broker.id=0
    1. 同一个zookeeper集群下的broker的id必须唯一
  • log.dirs=/tmp/kafka-logs
    1. 启动kafka会从zookeeper下载配置到log目录
    2. 如果修改了server.properties可能因为配置与存储的配置不匹配导致启动失败,这时候可以删除这个目录
    3. 如果需要在同一台server上启动多个broker,这个路径必须不同
  • zookeeper.connect=localhost:2181
    1. zookeeper集群,以逗号隔开
  • listeners=PLAINTEXT://cvatap3d.nam.nsroot.net:9093
    1. broker的host:port

zookeeper与kafka

  • Kafka将Broker信息注册到zookeeper
    1. zookeeper会维护topic与broker的关系,选举Leader
    2. 监控partition leader存活性,发现Leader异常会重新选举Leader
    3. 当异常Broker恢复后,会在一段时间后重新分配Leader
    4. Broker从zookeeper获取集群中其它Broker信息
  • Consumer端将自己注册到zookeeper
    1. 用来获取broker列表
    2. 并和partition leader建立socket连接
    3. 在Consumer Group发生变化时进行rebalance
    4. Zookeeper管理consumer的offset跟踪当前消费的offset。
  • Producer端将自己注册到zookeeper
    1. 用来获取broker列表和分区状态,从而将消息发布到正确的Broker
    2. Zookeeper不管理producer

Kafka使用经验总结

Consumer

  • Consumer默认自动提交Offset,并且是一获取便提交,默认间隔5秒
    1. 当发生错误重启,如果你当消费能力强,可能造成重复消费5秒内当Offset
    2. 如果消费能力比较弱,也可能提交的Offset没有消费完,造成Offset丢失
  • 消费者使用pull的方式去拿消息
    1. 简化kafka实现,消费者自己控制消费进度,不会有消息积压的压力
    2. kafka通过长轮询/长连接来提高pull的实时性
    3. 可以设置消费端缓存消息大小:queue.buffering.max.meesages ,在自动提交模式下缓存大小需要适当控制

Kafka强一致性保证

  • Producer同步复制,性能下降
  • Kafka幂等设置
    1. Kafka使用Produce Id和sequence number实现幂等,判断一次提交的所有消息的seq num一样,Produce Id由zookeeper随机生成,每次不一样
    2. 单分区幂等,不支持分区幂等,也就是当重新分配key与分区关系当时候不支持幂等
    3. 单会话幂等,不提供重启Prodicer后单幂等
    4. Kafka当幂等大部分情况下有效,单不能完全信任
    5. enabled.idempotence=true, 此时就会默认把acks设置为all,所以不需要再设置acks属性了。也就是说幂等自动开启同步复制?
  • 消费者手动提交offset
  • 分区事物管理
  • 安全关闭
    1. producer.close():优先把消息处理完毕,优雅退出。
    2. producer.close(timeout): 超时时,强制关闭。
  • 可重试/不可重试异常区别对待

Spring Cloud Stream 使用Kafka

Producer 生产数据

  • 随机发送
    1. spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression不设置
    2. spring.cloud.stream.bindings.[channelName].producer.partitionCount=10
      1. 如果partitionCount > Partition数量, 会报错,但如果autoAddPartitions=true,则不报错而自动添加Partition,与instanceCount无关
      2. 如果如果partitionCount < Partition数量,则会被Partition数量覆盖
  • 定向发送
    1. spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression=payload.currency
    2. 当Partition变多后,重启会重新分配,但是不重启的情况下还是保持不变,也就是说消费者自动添加Partiton后,还需要重启生产者

Consumer Offset管理

  • Offset的起驶位置
    1. spring.cloud.stream.kafka.bindings.channelName.consumer.resetOffsets
      1. true : 每次从startOffset开始
      2. false:从Consumer当前位置开始
    2. spring.cloud.stream.kafka.bindings.channelName.consumer.startOffset
      1. 新消费组都起驶位置,latest=0, earliest=lastOffset+1
  • 不设置组名:groupId=anonymous
    1. resetOffsets 默认true, startOffset 默认latest, 所以默认每次都从最新都消息消费
    2. 无法设置resetOffsets=false,因为无法知道它当前Offset,但是可以改变startOffset
  • 设置组名:
    1. spring.cloud.stream.bindings. .group
    2. resetOffsets 默认false, startOffset 默认earliest, 所以新消费组默认从0开始消费,并且拥有记录Offset能力
  • Offset 提交
    1. spring.cloud.stream.kafka.bindings.channelName.consumer.autoCommitOffset
      1. 默认true, 自动提交Offset
      2. false会在Message对象都header字段里添加一个kafka_acknowledgment对象,可以用来手动提交offset
      3. 但是它的AckMode.MANUAL并不是立刻提交的,而是所有pull到的Offset都处理后批量自动提交,所以只能控制哪些Offset需要提交,不能控制什么时候提交
    2. spring.cloud.stream.kafka.bindings.channelName.consumer.ackEachRecord
      1. 默认false:当所有一次pull到的Offset都消费完里之后(@StreamListener都方法执行完),才会自动提交Offset
      2. true: 每一个消息消费完都提交Offset
    3. Offset管理的是最后提交的Offset,而不是处理好的Offset的list

Consumer Partition分配

  • 自动分配
    1. spring.cloud.stream.kafka.bindings.channelName.consumer.autoRebalanceEnabled=true
    2. 自动分配Partition给消费组成员,并且会在当前pull的消息被处理完后才分配,有效避免消息被重复消费,但是也不能完全信任,比如消息处理缓慢造成心跳失败或者pull轮询使得它直接认为成员丢失而进行rebalance,但是数据其实还在处理
    3. 如果Comsumer数量 > Partition数量, Consumer会闲置, 但如果autoAddPartitions=true,会根据 max(instanceCount*concurrency,minPartitionCount)自动添加,并在一段时间后自动分配
    4. 自动分配与instanceCount/instanceIndex。对concurrency的处理和手动分配一样
  • 手动分配
    1. autoRebalanceEnabled=false
    2. spring.cloud.stream.instanceCount : 根据Partition数量/(instanceCountconcurrency)数量来决定分配到这个instance的Partition。如果instanceCountconcurrency > Partition数量, 会报错, 但如果autoAddPartitions=true,则不报错而自动添加Partition
    3. spring.cloud.stream.instanceIndex : iinstanceIndex必须在indexCount范围内。如果index一样,将消费相同Partition的消息,这样就违反了一个Partition只能被一个Comsumer Member消费的原则,造成消息的重复消费
    4. spring.cloud.stream.bindings.channelName.consumer.partitioned????????
  • Instance并发消费
    1. spring.cloud.stream.bindings.channelName.consumer.concurrency=10
    2. 同一个instance的同一个channel的多个消费者Listener也会消费相同Partition的消息,并且是同步处理,造成低性能的消息的重复消费,与concurrency无关,也就是说spring的一个instance的一个channel就只能有一个consumer的@StreamListener。是否可以定义多个channel监听同一个topic来实现instance级别的concurrency?
    3. concurrency=10代表你想在一个instance中启动10consumer线程去处理10个partition的message

数据强一致性

  • 保证消息至少被发送一次
  • 保证消息只被发送一次
  • 保证消息至少被消费一次
    1. ackEachRecord=false保证消息至少被消费一次,但是可以有一整批消息会被消费多次
    2. ackEachRecord=true保证消息至少被消费一次,而且保证Consumer故障减少后?消息不会被重复消费,但是不保证增加Consumer
  • 保证消息只被消费一次

性能优化

在线水平伸缩