Kafka笔记--分布式环境搭建

时间:2022-12-13 09:17:09

部署:
  http://www.cnblogs.com/likehua/p/3999538.html
  http://blog.csdn.net/kimmking/article/details/8263057
  http://my.oschina.net/ielts0909/blog/93190
Demo:
  http://blog.csdn.net/honglei915/article/details/37563647

 

1)下载
kafka0.8(http://kafka.apache.org/downloads.html),保存到服务器/home/wwb目录下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)

2)解压 tar -zxvf kafka-0.8.0-beta1-src.tgz,产生文件夹kafka-0.8.0-beta1-src更改为kafka01 

3)配置

 修改kafka01/config/server.properties,其中broker.id, log.dirs, zookeeper.connect必须根据实际情况进行修改,其他项根据需要自行斟酌。大致如下:
  broker.id=1  
  port=9091  
  num.network.threads=2  
  num.io.threads=2  
  socket.send.buffer.bytes=1048576  
  socket.receive.buffer.bytes=1048576  
  socket.request.max.bytes=104857600  
  log.dir=./logs  
  num.partitions=2  
  log.flush.interval.messages=10000  
  log.flush.interval.ms=1000  
  log.retention.hours=168  
  #log.retention.bytes=1073741824  
  log.segment.bytes=536870912  
  num.replica.fetchers=2  
  log.cleanup.interval.mins=10  
  zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183  
  zookeeper.connection.timeout.ms=1000000  
  kafka.metrics.polling.interval.secs=5  
  kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
  kafka.csv.metrics.dir=/tmp/kafka_metrics  
  kafka.csv.metrics.reporter.enabled=false
 
4) 启动kafka01
  >bin/kafka-server-start.sh config/server.properties &  
 a)kafka02操作步骤与kafka01雷同,不同的地方如下
  修改kafka02/config/server.properties
  broker.id=2
  port=9092
  ##其他配置和kafka-0保持一致
  启动kafka02
  >bin/kafka-server-start.shconfig/server.properties &  
 b)kafka03操作步骤与kafka01雷同,不同的地方如下
  修改kafka03/config/server.properties
  broker.id=3
  port=9093
  ##其他配置和kafka-0保持一致
  启动kafka02
  >bin/kafka-server-start.shconfig/server.properties &
 
5)创建Topic(包含一个分区,三个副本)
    >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic
 
6)查看topic情况
    >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181
    topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0
 
7)创建发送者
   >bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic
    my test message1
    my test message2
    ^C
 
8)创建消费者
    >bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message1
    my test message2
    ^C
 
9)杀掉server1上的broker
  >pkill -9 -f config/server.properties
 
10)查看topic
  >bin/kafka-list-top.sh --zookeeper192.168.0.1:2181
  topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0
  发现topic还正常的存在
 
11)创建消费者,看是否能查询到消息
    >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
说明一切都是正常的。
 
注意:
Kafka测试
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Kafka也提供了一个消费消息的命令行工具。
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message