kafka_2.11-2.0.0_常用操作

时间:2023-03-09 14:57:30
kafka_2.11-2.0.0_常用操作

参考博文:Kafka消费组(consumer group)

参考博文:kafka 1.0 中文文档(九):操作

参考博文:kafka集群管理工具kafka-manager部署安装

以下操作可以在mini01、mini02、mini03任意一台操作即可

1. kafka通过网页管理

参考博文:kafka集群管理工具kafka-manager部署安装

kafka_2.11-2.0.0_常用操作

2. 创建topic

 # 参数说明 --replication-factor  表示有2个副本
# --partitions 表示有4个分区
[yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01: --replication-factor --partitions --topic test
Created topic "test".
[yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01: --replication-factor --partitions --topic zhang
Created topic "zhang".
[yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01: # 再次查看
zhang
test

2.1. 各主机信息查看

mini01

 [yun@mini01 logs]$ pwd
/app/kafka/logs
[yun@mini01 logs]$ ll
total
………………
drwxrwxr-x yun yun Sep : test-
drwxrwxr-x yun yun Sep : test-
drwxrwxr-x yun yun Sep : test-

mini02

 [yun@mini02 logs]$ pwd
/app/kafka/logs
[yun@mini02 logs]$ ll
total
………………
drwxrwxr-x yun yun Sep : test-
drwxrwxr-x yun yun Sep : test-

mini03

 [yun@mini03 logs]$ pwd
/app/kafka/logs
[yun@mini03 logs]$ ll
total
………………
drwxrwxr-x yun yun Sep : test-
drwxrwxr-x yun yun Sep : test-
drwxrwxr-x yun yun Sep : test-

3. 修改topic

3.1. 增加分区数

注意:分区数不能减少

  Kafka目前不支持减少主题的分区数量。

 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:
__consumer_offsets
test
test01
test02
test03
test04
zhang
[yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01: --topic test01
Topic:test01 PartitionCount: ReplicationFactor: Configs:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
[yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01: --partitions --topic test01 # 失败,分区数不能减少
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic test01 currently has partitions, would not be an increase.
[-- ::,] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test01 currently has partitions, would not be an increase.
(kafka.admin.TopicCommand$)
[yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01: --partitions --topic test01 # 增加分区数
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01: --topic test01
Topic:test01 PartitionCount: ReplicationFactor: Configs:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:
Topic: test01 Partition: Leader: Replicas: Isr:

4. 删除topic

 # server.properties中设置delete.topic.enable=true 【当前版本默认就是true】否则只是标记删除或者直接重启
[yun@mini01 ~]$ kafka-topics.sh --delete --zookeeper mini01: --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01: # 再次查看 只有 zhang,则表示真的删除了
zhang

5. 查看所有topic

 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:
__consumer_offsets
test
zhang

6. 查看某个Topic的详情

 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01: --topic zhang
Topic:zhang PartitionCount: ReplicationFactor: Configs:
Topic: zhang Partition: Leader: Replicas: ,, Isr: ,,
Topic: zhang Partition: Leader: Replicas: ,, Isr: ,,
Topic: zhang Partition: Leader: Replicas: ,, Isr: ,,
Topic: zhang Partition: Leader: Replicas: ,, Isr: ,,

7. 通过shell命令生产消息

7.1. 输入单条数据

 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01: --topic zhang
>
>
>
>
>
>
>
>
>

7.2. 批量导入数据

 [yun@mini01 zhangliang]$ kafka-console-producer.sh --broker-list mini01: --topic liang < .info

8. 通过shell命令消费消息

 # --from-beginning 从最开始读取
# kafka-console-consumer.sh --zookeeper mini01: --from-beginning --topic zhang # 老版本
[yun@mini01 ~]$ kafka-console-consumer.sh --bootstrap-server mini01: --from-beginning --topic zhang

9. 消费组消费

9.1. 创建topic

 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01: --replication-factor  --partitions  --topic order
Created topic "order".
[yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01: # 查看所有topic列表
__consumer_offsets
order
test
zhang
[yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01: --topic order # 查看topic详情
Topic:order PartitionCount: ReplicationFactor: Configs:
Topic: order Partition: Leader: Replicas: Isr:
Topic: order Partition: Leader: Replicas: Isr:
Topic: order Partition: Leader: Replicas: Isr:
Topic: order Partition: Leader: Replicas: Isr:

9.2. 生产消息

 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01: --topic order
>
>
>
>
>

9.3. 消费组消费消息

mini02机器上运行

 # --group  指定组
[yun@mini02 ~]$ kafka-console-consumer.sh --bootstrap-server mini01: --topic order --group order-group

mini03机器上运行

 # --group  指定组
[yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01: --topic order --group order-group # 新开一个窗口执行
[yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01: --topic order --group order-group

  表示order-group消费组有3个消费者,消费topic order的信息。

9.4. 消费组消费位置信息查看

 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --describe --group order-group  # 查看消费情况

 TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                     HOST            CLIENT-ID
order consumer--2e9805db-e021--8c62-92f8691fbf20 /172.16.1.13 consumer-
order consumer--2e9805db-e021--8c62-92f8691fbf20 /172.16.1.13 consumer-
order consumer--9e65dcfb-246f--aaf7-3ee83532237f /172.16.1.13 consumer-
order consumer--ee17939d-1ffe-42c7--b19be8acea43 /172.16.1.12 consumer-
[yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --describe --group order-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer--9e65dcfb-246f--aaf7-3ee83532237f /172.16.1.13 consumer- order()
consumer--2e9805db-e021--8c62-92f8691fbf20 /172.16.1.13 consumer- order(,)
consumer--ee17939d-1ffe-42c7--b19be8acea43 /172.16.1.12 consumer- order()

10. 管理消费组

10.1. 查看所有消费组

 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --list
console-consumer-
console-consumer-
console-consumer-
console-consumer-
console-consumer-
console-consumer-
console-consumer-

10.2. 查看消费组消费情况【消费位置】

 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --describe --group console-consumer-
Consumer group 'console-consumer-26727' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
zhang - - -
zhang - - -
zhang - - -
zhang - - -
[yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --describe --group console-consumer- TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
zhang consumer--17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-
zhang consumer--17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-
zhang consumer--17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-
zhang consumer--17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-

--members

 # --members 此选项提供使用者组中所有活动成员的列表。
[yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --describe --group console-consumer- --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer--17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-

--verbose

 # --verbose 这个选项还提供了分配给每个成员的分区。
[yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --describe --group console-consumer- --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer--17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer- zhang(,,,)

--state

 # --state  这个选项提供了有用的组级信息。
[yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --describe --group console-consumer- --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
mini01: () range Stable

10.3. 删除一个或多个用户组

 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --list
console-consumer-
console-consumer-
console-consumer-
console-consumer-
console-consumer-
console-consumer-
console-consumer-
# 删除一个或多个组
[yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --delete --group console-consumer- --group console-consumer-
Deletion of requested consumer groups ('console-consumer-31713', 'console-consumer-11661') was successful.
[yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01: --list
console-consumer-
console-consumer-
console-consumer-
console-consumer-
console-consumer-

11. 增加副本因子

 [yun@mini01 kafka_20180916]$ kafka-topics.sh --create --zookeeper mini01: --replication-factor  --partitions  --topic order
Created topic "order".
[yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01: --topic order
Topic:order PartitionCount: ReplicationFactor: Configs:
Topic: order Partition: Leader: Replicas: Isr:
Topic: order Partition: Leader: Replicas: Isr:
Topic: order Partition: Leader: Replicas: Isr:
Topic: order Partition: Leader: Replicas: Isr:

要求:topic order 的副本数由1变为2, 之前每个分区在哪台机器上在上面已给出。

说明:part 0分布在群集0,1; part 1分布在集群1,2;part 2 分布在集群2,0;part 3分布在集群0,1。

11.1. 创建一个重新调整计划文件

 [yun@mini01 kafka_20180916]$ cat increase-replication-factor.json
{
"version":,
"partitions":[
{"topic": "order","partition": ,"replicas": [,]},
{"topic": "order","partition": ,"replicas": [,]},
{"topic": "order","partition": ,"replicas": [,]},
{"topic": "order","partition": ,"replicas": [,]}
]
}

11.2. 语句执行

 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01: --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment {"version":,"partitions":[{"topic":"order","partition":,"replicas":[],"log_dirs":["any"]},{"topic":"order","partition":,"replicas":[],"log_dirs":["any"]},{"topic":"order","partition":,"replicas":[],"log_dirs":["any"]},{"topic":"order","partition":,"replicas":[],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

11.3. 查看是否执行成功

 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01: --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition order- completed successfully
Reassignment of partition order- completed successfully
Reassignment of partition order- completed successfully
Reassignment of partition order- completed successfully

11.4. 再次查看该topic详情

 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01: --topic order  # 由下可见分配成功
Topic:order PartitionCount: ReplicationFactor: Configs:
Topic: order Partition: Leader: Replicas: , Isr: ,
Topic: order Partition: Leader: Replicas: , Isr: ,
Topic: order Partition: Leader: Replicas: , Isr: ,
Topic: order Partition: Leader: Replicas: , Isr: ,

12. 创建partitions时在broker的分配机制

 kafka-topics.sh --create --zookeeper mini01: --replication-factor  --partitions  --topic test01
kafka-topics.sh --create --zookeeper mini01: --replication-factor --partitions --topic test02

注意在各机器上partitions的分布

 mini01
test01-
test01-
test02-
test02-
test02- mini02
test01-
test01-
test02-
test02-
test02-
test02- mini03
test01-
test02-
test02-
test02-
test02-