0. 前言
消息队列MQ,这个在一般的系统上都是会用到的一个中间件,我选择Kafka作为练手的一个中间件,Kafka依赖Zookeeper。Zookeeper安装上一篇博客已经介绍过了。
1. Kafka Image构建
Dockerfile
FROM openjdk:8-jdk-alpine RUN apk add --no-cache bash && rm -rf /var/cache/apk/* && /bin/bash RUN wget http://mirrors.aliyun.com/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz && \
tar -zxvf kafka_2.12-2.0.1.tgz && \
rm -rf kafka_2.12-2.0.1.tgz && \
mv kafka_2.12-2.0.1 /var/app/ ENV PATH /var/app/bin:$PATH COPY env.sh /env.sh
RUN chmod +x env.sh EXPOSE 9092 ENTRYPOINT ["/env.sh"]
env.sh
#!/bin/sh BROKER_ID=${BRODER_ID:-"0"}
LISTENERS=${LISTENERS:-"PLAINTEXT://:9092"}
ZOOKEEPER_CONNECT=${ZOOKEEPER_CONNECT:-"localhost:2181"} sed -i "s/^broker.id=.*$/broker.id=$BROKER_ID/" /var/app/config/server.properties
sed -i "s;^#listeners=.*$;listeners=$LISTENERS;g" /var/app/config/server.properties
sed -i "s/^zookeeper.connect=.*$/zookeeper.connect=$ZOOKEEPER_CONNECT/" /var/app/config/server.properties sed -i "s#/tmp/kafka-logs#/data#g" /var/app/config/server.properties /var/app/bin/kafka-server-start.sh /var/app/config/server.properties
2. docker-compose配置Zookeeper、zkui、Kafka
docker-compose.yml
version: '3'
services:
zookeeper:
image: registry.cn-shenzhen.aliyuncs.com/wunaozai/zookeeper
ports:
- 2181:2181
zkui:
image: registry.cn-shenzhen.aliyuncs.com/wunaozai/zkui
ports:
- 9090:9090
environment:
- ZK_SERVER=zookeeper:2181
kafka:
image: registry.cn-shenzhen.aliyuncs.com/wunaozai/kafka
ports:
- 9092:9092
environment:
- BROKER_ID=1
- LISTENERS=PLAINTEXT://kafka:9092
- ZOOKEEPER_CONNECT=zookeeper:2181
volumes:
- /root/workspace/docker/kafka/kafka/data:/data
启动docker-compose、并通过zkui预览、注意由于kafka的监听是使用域名方式,因此需要往 /etc/hosts 中写入 127.0.0.1 kafka, 如果客户端开发,在Windows系统的,要往 C:\Windows\System32\drivers\etc\hosts 里写入 172.16.23.203 kafka
3. Kafka读写
创建主题(Topic)
./bin/kafka-topics.sh --create --zookeeper 172.16.23.203:2181 --replication-factor 1 --partitions 1 --topic wunaozai
查看主题(Topic)
./bin/kafka-topics.sh --list --zookeeper 172.16.23.203:2181 --list
发送消息(生产者)
./bin/kafka-console-producer.sh --broker-list 172.16.23.203:9092 --topic wunaozai
消费消息(消费者)
./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic wunaozai
4. Kafka Java Client 测试
POM依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
生产者ProducerSample.java
package demo.kafka; import java.util.HashMap;
import java.util.Map; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerSample { public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put("zk.connect", "172.16.23.203:2181");
props.put("bootstrap.servers", "172.16.23.203:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "wunaozai";
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic, "key1", "value111"));
producer.send(new ProducerRecord<String, String>(topic, "key2", "value222"));
producer.send(new ProducerRecord<String, String>(topic, "key3", "value333"));
producer.close();
}
}
消费者 ConsumerSample.java
package demo.kafka; import java.time.Duration;
import java.util.Arrays;
import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerSample { public static void main(String[] args) {
String topic = "wunaozai"; Properties props = new Properties();
props.put("bootstrap.servers", "172.16.23.203:9092");
props.put("group.id", "JLGroup");// Consumer Group Name
props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交
props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
//Duration timeout = Duration.ofMillis(10000);
Duration timeout = Duration.ofSeconds(5);
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.close();
}
}
ProducerSample 生产消息,通过./bin/kafka-console-consumer.sh 消费消息
./bin/kafka-console-producer.sh 生产消息,通过ConsumerSample 消费消息
docker-compose 部分日志
5. 小结
一路练习下来,发现这个Kafka配置还是有点复杂的,就先了解到这里,以后再深入了解。一步一步慢慢来。
参考资料:
https://hub.docker.com/r/wurstmeister/kafka/
https://raw.githubusercontent.com/wurstmeister/kafka-docker/master/docker-compose.yml
https://github.com/wurstmeister/kafka-docker
https://github.com/apache/kafka
https://www.cnblogs.com/smail-bao/p/7794636.html
https://www.cnblogs.com/wunaozai/p/8249657.html