Kafka随笔一

时间:2023-03-09 15:45:25
Kafka随笔一

一、KafKa所涉及到的名词概念:

1.    Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上。

2.    Partition:是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition。

3.    Offset:消息在Partition中的编号,编号顺序不跨Partition。

4.    Consumer:用于从Broker中取出/消费Message。

5.    Producer:用于往Broker中发送/生产Message。

6.    Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。

7.    Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。

8.    Broker:Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。

9.    ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。

二、KafKa模型和Partition Leader选举说明

kafka消费者模型
1、分区消费模型: 每个分区对应每个消费实例
2、组消费模型
kafka两种消费模式服务器端源码对比:
1、分区消费模式有以下特点
 a、指定消费topic、partition和offset通过向服务器发送RPC请求进行消费
 b、需要自己提交offset
 c、需要自己处理各种错误,如:leader切换错误
 d、需要自己处理消费者负载均衡策略
2、组消费模式具有以下特点
 a、最终也是通过向服务器发送RPC请求完成的(和分区消费模式一样)
 b、组消费模式由kafka服务器端处理各种错误,然后将消息放入队列再分装为迭代器(队列为FetchedDataChunk对象),客户端只需要在迭代器上迭代取出消息
 c、 由kafka服务器端周期性的通过scheduler提交当前消费的offset,无需客户端负责
d、kafka服务端处理消费者负载均衡
(监控工具kafka offset monitor 和 kafka manager 均是基于组消费模式)
kakfa Partition选主机制
    kafka的leader election 方案解决了上述问题,它在所有broker中选出一个controller,所有的partition的leader选举都有controller决定,controller会将leader的改变直接通过RPC的方式(比zookeeper queue的方式更高效)通知需为此作为响应的Broker

 

三、相关命令用法

1、启动kafka
[root@t_cluster03_109_26 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
2、手动创建topic
 
[root@t_cluster03_109_26 kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.109.26:2181 --replication-factor 2 --partitions 1 --topic test
Created topic "test".
3、查看topic列表(列出集群当前所有可用的topic)
bin/kafka-topics.sh --list -zookeeper zookeeper_address
[root@t_cluster03_109_26 kafka]# bin/kafka-topics.sh --list --zookeeper 192.168.109.26:2181
test
4、手动启动consumer:
[root@t_cluster03_109_26 kafka]#  bin/kafka-console-consumer.sh --zookeeper 192.168.109.25:2181 --topic test1 --from-beginning
5、手动启动producer
 [root@t_cluster02_109_25 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.109.25:9092 --topic test1
6、查看每个指定topic的分区和副本信息
[root@t_cluster03_109_26 kafka]# bin/kafka-topics.sh --describe --zookeeper 192.168.109.26:2181 --topic testtopic
Topic:testtopic    PartitionCount:2    ReplicationFactor:2    Configs:
    Topic: testtopic    Partition: 0    Leader: 3    Replicas: 3,1    Isr: 3,1
    Topic: testtopic    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 1,2
7、增加partition的数量(4是增加后的数)
bin/kafka-topics.sh --zookeeper zookeeper_address  --alter --topic topic_name --partitions 4 
 [root@t_cluster01_109_24 kafka]# bin/kafka-topics.sh --describe --zookeeper 192.168.109.25:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: test    Partition: 0    Leader: 2    Replicas: 2,3    Isr: 2,3
[root@t_cluster01_109_24 kafka]# bin/kafka-topics.sh --alter --zookeeper 192.168.109.25:2181 --topic test --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@t_cluster01_109_24 kafka]# bin/kafka-topics.sh --describe --zookeeper 192.168.109.25:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:2    Configs:
    Topic: test    Partition: 0    Leader: 2    Replicas: 2,3    Isr: 2,3
    Topic: test    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 1,2
[root@t_cluster01_109_24 kafka]# bin/kafka-topics.sh --alter --zookeeper 192.168.109.25:2181 --topic test --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@t_cluster01_109_24 kafka]# bin/kafka-topics.sh --describe --zookeeper 192.168.109.25:2181 --topic test
Topic:test    PartitionCount:3    ReplicationFactor:2    Configs:
    Topic: test    Partition: 0    Leader: 2    Replicas: 2,3    Isr: 2,3
    Topic: test    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 1,2
    Topic: test    Partition: 2    Leader: 1    Replicas: 1,3    Isr: 1,3
8、集群leader平衡:
bin/kafka-preferred-replica-election.sh --zookeeper zookeeper_address auto.leader.rebalance.enable=true

注:附带KafKa集群配置文件,相关说明可以查看官方文档

[root@t_cluster01_109_24 config]# grep -v '^#' server.properties|grep -v '^$'
broker.id=1
port=9092
host.name=192.168.109.24
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafkalog
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
message.max.bytes=502400
default.replication.factor=2
replica.fetch.max.bytes=502400
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.109.24:2181,192.168.109.25:2181,192.168.109.26:2181
zookeeper.connection.timeout.ms=6000