【Kafka】kafka环境搭建及使用

时间:2022-12-13 09:26:30

Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计

  • Kafka将消息以topic为单位进行归纳。
  • 将向Kafka topic发布消息的程序成为producers.
  • 将预订topics并消费消息的程序成为consumer.
  • Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.

下面来看下如何简单的使用:

首先,去官网下载kakfa的安装包 http://kafka.apache.org/downloads.html

下载完了自后,tar -zxvf *.tar.gz 解压,目录如下:

【Kafka】kafka环境搭建及使用

先来看下bin目录下:

【Kafka】kafka环境搭建及使用

从这些脚本可以看出kafka本身是结合了zookeeper来使用的

Zookeeper 协调控制
1. 管理broker与consumer的动态加入与离开。
2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一
个consumer group内的多个consumer的订阅负载平衡。
3. 维护消费关系及每个partion的消费信息。

Zookeeper上的细节:
1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。
2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

对于入门,我们就暂且使用kafka安装包中自带的zookeeper程序,这个程序在libs库中

【Kafka】kafka环境搭建及使用

可以看到这里有zkclient和zookeeper的依赖包,所以当我们使用该kafka程序的时候,得先启动zookeeper

再来看下config下面是什么东东:

【Kafka】kafka环境搭建及使用

这里就是一些具体的配置信息了

我们来看一个producer.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none , gzip, snappy.
# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
compression.codec=none

# message encoder
serializer.class=kafka.serializer.DefaultEncoder

# allow topic level compression
#compressed.topics=

############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.buffering.max.ms=

# the maximum size of the blocking queue for buffering on the producer
#queue.buffering.max.messages=

# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueue.timeout.ms=

# the number of messages batched at the producer
#batch.num.messages=
这里配置了服务地址,消息发送的类型,同步还是异步,是否压缩,消息编码等信息

下面我们就来用一下,展示一下:

1、启动zookeeper

【Kafka】kafka环境搭建及使用

2、启动kafka

【Kafka】kafka环境搭建及使用

3、都启动成功了,我们就生产消费吧

创建topic

[root@com23 bin]# sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic CMCC
Created topic "CMCC".

生产信息

[root@com23 bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic CMCC
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hwl^H^H
Hwll
Hello Kafka !

消费信息

[root@com23 bin]# sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic CMCC --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hwl
Hwll
Hello Kafka !


至此,一个简单的DEMO就演示结束了,下面看看一个简单的集群怎么来玩??

首先,我们要再配置两个服务

第一个

broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1
第二个

broker.id=2
port=9094
log.dirs=/tmp/kafka-logs-2
可以看到这两个配置和初始的server.properties结构一模一样,只是文件中的属性值变了而已,kafka中通过brokder.id来唯一标识集群中的一个服务,所以我们基本是修改了配置中的broker.id   port   log.dirs三个属性

下面将这个两个服务也启动起来

sh bin/kafka-server-start.sh config/server-1.properties &
sh bin/kafka-server-start.sh config/server-2.properties &

下面创建topic

sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic chiwei

看下topic描述

[root@com23 kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiwei
Topic:chiwei PartitionCount:1 ReplicationFactor:3 Configs:
Topic: chiwei Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
[root@com23 kafka_2.10-0.8.1.1]#
官方描述

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
还记得我们第一次创建的topic,看看他的描述

[root@com23 kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic CMCC
Topic:CMCC PartitionCount:1 ReplicationFactor:1 Configs:
Topic: CMCC Partition: 0 Leader: 0 Replicas: 0 Isr: 0

下面我们开始生产消息

[root@com23 kafka_2.10-0.8.1.1]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !

消费信息

[root@com23 kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !
成功了!


下面我们来测试一下这个集群的容错能力怎么样?

刚才我们看到主题描述的信息显示brokder.id=2是leader,那么我现在把leader进程给杀了,来看看什么情况啊?

ps -ef | grep server-2
kill -9 9663
[root@com23 kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiweiTopic:chiwei    PartitionCount:1        ReplicationFactor:3     Configs:        Topic: chiwei   Partition: 0    Leader: 1       Replicas: 2,1,0 Isr: 1,0[root@com23 kafka_2.10-0.8.1.1]# 

leader已经被换了,同时我们注意到replicas已然显示有2,而isr没有2了,因为isr显示的是“in-sync”的服务id,而2已经不再同步服务了;而replicas为什么还显示2呢

  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
这句话就说明原因了“even if they are currently alive”

这时候我们再去消费刚才主题的消息看看

[root@com23 kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !
已然存在!说明消息同步到了集群的每个服务节点上。

附录:一些常用操作

1、查看所有的topic

./kafka-topics.sh --list --zookeeper ip:port

2、创建主题topic

sh kafka-topics.sh --create -zookeeper 192.168.11.176:2181 --replication-factor 2 --partitions 4 --topic cmcc

这里的replication-factor的个数不能大于broker的数量

3、查看具体主题明细

sh kafka-topics.sh --describe --zookeeper 192.168.11.176:2181 --topic cmcc

4、查看具体主题的不可用分区

sh kafka-topics.sh --describe --unavailable-partitions --zookeeper 192.168.11.176:2181 --topic chiwei

5、删除主题

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic chiwei --zookeeper 192.168.11.176:2181

Command must include exactly one action: --list, --describe, --create or --alter
Option Description
------ -----------
--alter Alter the configuration for the topic.
--config <name=value> A topic configuration override for the
topic being created or altered.
--create Create a new topic.
--deleteConfig <name> A topic configuration override to be
removed for an existing topic
--describe List details for the given topics.
--help Print usage information.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment A list of manual partition-to-broker
<broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.