1、安装+配置(集群)
192.168.0.10、192.168.0.11、192.168.0.12(每台服务器kafka+zookeeper)
# kafka依赖java环境,需要提前安装好jdk。下面使用自己安装的zookeeper(也可以使用kafka自带的zk)
cd /data/server
tar -zxv -f kafka_2.-0.9.0.0.tgz
ln -s kafka_2.-0.9.0.0 kafka
server.propeties
broker.id= # broker唯一标识,依次累加
delete.topic.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://10.3.33.162:9092
num.network.threads=
num.io.threads=
socket.send.buffer.bytes=
socket.receive.buffer.bytes=
socket.request.max.bytes=
log.dirs=/data/logs/kafka-logs # 存放日志和消息的目录
num.partitions= # 主题默认的分区数
num.recovery.threads.per.data.dir=
log.retention.hours= # 日志的过期时间,超过会被删除
log.segment.bytes= # 日志文件的最大体积,超过会新建
log.retention.check.interval.ms=
zookeeper.connect=192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181 # zk的连接配置
zookeeper.connection.timeout.ms=
consumer.properties
zookeeper.connect=192.168.0.10:,192.168.0.11:,192.168.0.12:
zookeeper.connection.timeout.ms=
producer.properties
bootstrap.servers=192.168.0.10:,192.168.0.11:,192.168.0.12:
compression.type=none
启动(需要先启动zk)
/data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties
停止
/data/server/kafka/bin/kafka-server-stop.sh
ps: 如果使用kafka自带的zk,zk的配置文件为zookeeper.properties,启动脚本为./bin/zookeeper-server-start.sh
2、查看主题以及主题信息
查看所有的主题
./kafka-topics.sh --zookeeper zk_host:port --list
查看主题描述
./kafka-topics.sh --zookeeper zk_host:port --describe --topic my_topic_name
3、查看消费者
consumers that use the Java consumer API
./kafka-consumer-groups.sh --new-consumer --bootstrap-server kf_host:port --list
consumers that use ZooKeeper
./kafka-consumer-groups.sh --zookeeper zk_host:port --list
4、查看消费者信息和offset lag
consumers that use the Java consumer API
./kafka-consumer-groups.sh --new-consumer --bootstrap-server kf_host:port --group test_group --describe
consumers that use ZooKeeper
./kafka-consumer-groups.sh --zookeeper zk_host:port --group test-group --describe
5、重新分配主题分区个数
每个主题默认一个分区
num.partitions=
example: 主题my_topic_name分区扩展到3个
kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --partitions