一、KafKa所涉及到的名词概念:
1. Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上。
2. Partition:是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition。
3. Offset:消息在Partition中的编号,编号顺序不跨Partition。
4. Consumer:用于从Broker中取出/消费Message。
5. Producer:用于往Broker中发送/生产Message。
6. Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。
7. Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。
8. Broker:Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。
9. ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。
二、KafKa模型和Partition Leader选举说明
kafka消费者模型
1、分区消费模型: 每个分区对应每个消费实例
2、组消费模型
kafka两种消费模式服务器端源码对比:
1、分区消费模式有以下特点
a、指定消费topic、partition和offset通过向服务器发送RPC请求进行消费
b、需要自己提交offset
c、需要自己处理各种错误,如:leader切换错误
d、需要自己处理消费者负载均衡策略
2、组消费模式具有以下特点
a、最终也是通过向服务器发送RPC请求完成的(和分区消费模式一样)
b、组消费模式由kafka服务器端处理各种错误,然后将消息放入队列再分装为迭代器(队列为FetchedDataChunk对象),客户端只需要在迭代器上迭代取出消息
c、 由kafka服务器端周期性的通过scheduler提交当前消费的offset,无需客户端负责
d、kafka服务端处理消费者负载均衡
(监控工具kafka offset monitor 和 kafka manager 均是基于组消费模式)
kakfa Partition选主机制
kafka的leader election 方案解决了上述问题,它在所有broker中选出一个controller,所有的partition的leader选举都有controller决定,controller会将leader的改变直接通过RPC的方式(比zookeeper queue的方式更高效)通知需为此作为响应的Broker
三、相关命令用法
1、启动kafka
2、手动创建topic
[root@t_cluster03_109_26 kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.109.26:2181 --replication-factor 2 --partitions 1 --topic test
Created topic "test".
3、查看topic列表(列出集群当前所有可用的topic)
bin/kafka-topics.sh --list -zookeeper zookeeper_address
4、手动启动consumer:
[
root@t_cluster03_109_26 kafka]# bin/kafka-console-consumer.sh --zookeeper 192.168.109.25:2181 --topic test1 --from-beginning
5、手动启动producer
6、查看每个指定topic的分区和副本信息
Topic:testtopic PartitionCount:2 ReplicationFactor:2 Configs:
Topic: testtopic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: testtopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
7、增加partition的数量(4是增加后的数):
bin/kafka-topics.sh --zookeeper zookeeper_address --alter --topic topic_name --partitions 4
Topic:test PartitionCount:1 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
[
root@t_cluster01_109_24 kafka]# bin/kafka-topics.sh --alter --zookeeper 192.168.109.25:2181 --topic test --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
[
root@t_cluster01_109_24 kafka]# bin/kafka-topics.sh --alter --zookeeper 192.168.109.25:2181 --topic test --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3
8、集群leader平衡:
bin/kafka-preferred-replica-election.sh --zookeeper zookeeper_address auto.leader.rebalance.enable=true
注:附带KafKa集群配置文件,相关说明可以查看官方文档
[root@t_cluster01_109_24 config]# grep -v '^#' server.properties|grep -v '^$'
broker.id=1
port=9092
host.name=192.168.109.24
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafkalog
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
message.max.bytes=502400
default.replication.factor=2
replica.fetch.max.bytes=502400
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.109.24:2181,192.168.109.25:2181,192.168.109.26:2181
zookeeper.connection.timeout.ms=6000