SpringBoot整合Kafka

时间:2023-03-08 16:37:13

一、准备工作

提前启动zk,kafka,并且创建一个Topic("Hello-Kafk")

bin/kafka-topics.sh --create --zookeeper 192.168.204.139:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka

确保你的kafka能够访问,如果访问不了,需要打开外网访问。
config/server.properties

advertised.listeners=PLAINTEXT://192.168.239.128:

Maven 依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.</artifactId>
<version>0.8.2.1</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

二、项目结构
为了更加体现实际开发需求,一般生产者都是在调用某些接口的服务处理完逻辑之后然后往kafka里面扔数据,然后有一个消费者不停的监控这个Topic,然后处理数据,所以这里把生产者作为一个接口,消费者放到kafka这个目录下,注意@Component注解,不然扫描不到@KafkaListener

SpringBoot整合Kafka

三、具体实现代码
SpringBoot配置文件 application.properties

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.204.139: #=============== provider =======================
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer =======================
# 指定默认消费者group id
#spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval= # 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者

package cn.kafka.demo;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.Properties; /**
* 测试kafka生产者
*/
@RestController
@RequestMapping("/kafka")
public class TestKafkaProducerController { @Autowired
private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send")
public String send(String msg){
kafkaTemplate.send("Hello-Kafka",msg);
return "success";
}
}

消费者
这里的消费者会监听这个主题,有消息就会执行,不需要进行while(true)

package cn.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; /**
* kafka消费者测试
*/
@Component
public class TestConsumer { @KafkaListener(topics = "Hello-Kafka")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.err.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

项目启动类

package cn.kafka.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class TestApplication{ public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}

四、测试
运行项目,执行:http://localhost:8080/kafka/send?msg=hello

控制台输出:

topic = test_topic, offset = 19, value = hello 

为了体现消费者不止执行一次就结束,再调用一次接口:
http://localhost:8080/kafka/send?msg=kafka

topic = test_topic, offset = 20, value = kafka