如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为;
本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。
适用场景:需要消费来源于不同kafka的消息、需要在不同的kafka生产消息。
1、配置自定义Kafka Properties信息
-servers = my-server1,my-server2
-id = my-consumer
-deserializer =
-deserializer =
-auto-commit = false
-offset-reset = latest
-serializer =
-serializer =
-mode = manual
2、代码定义生产者和消费者
当然也可以只定义生产者或者只定义消费者,按需进行,以下示例是同时定义生产者和消费者。
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* 自定义kafka配置
*/
@Configuration
public class CustomKafkaConfig {
/** 生产者 */
@Bean("kafkaCustomTemplate")
public KafkaTemplate<String, Object> kafkaCustomTemplate(
@Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
return new KafkaTemplate<>(producerFactory(customKafkaProperties));
}
private ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
().setAcks((().getAcks()) ? "all" : ().getAcks());
Map<String, Object> properties = ();
(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(properties);
}
/** 消费者 */
@Bean("kafkaCustomContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, String>> kafkaCustomContainerFactory(
@Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Long, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
(consumerFactory(customKafkaProperties));
((().getConcurrency()) ?
().availableProcessors() : ().getConcurrency());
().setAckMode((().getAckMode()) ?
: ().getAckMode());
return factory;
}
private ConsumerFactory<Long, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(());
}
/** 配置文件*/
@ConfigurationProperties(prefix = "")
@Bean("customKafkaProperties")
public KafkaProperties customKafkaProperties() {
return new KafkaProperties();
}
// @Primary 要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错
@Primary
@ConfigurationProperties(prefix = "")
@Bean
public KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
@Primary
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(@Autowired ProducerFactory<?, ?> kafkaProducerFactory, @Autowired KafkaProperties kafkaProperties,
ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
(kafkaTemplate::setMessageConverter);
(kafkaProducerListener);
(().getDefaultTopic());
return kafkaTemplate;
}
}
Tips:
1)消费者
需要实现自定义的KafkaListenerContainerFactory Bean
2)生产者
需要实现自定义的KafkaTemplate Bean
3)@Primary
@Autowired注解默认是根据类型Type来自动注入的,当有多个相同类型的bean时,使用@Primary来赋予bean更高的优先级。
3、应用
1)消费者
@Component
@Slf4j
public class TestKafkaListener {
@KafkaListener(
topics = {"myTestTopic"},
containerFactory = "kafkaCustomContainerFactory")
public void testReceive(ConsumerRecord<Long, String> record, Acknowledgment ack) {
// 业务代码 start
// ...
// 业务代码 end
();
}
}
2)生产者
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
@RefreshScope
@Service
@Transactional
@Slf4j
public class TestKafkaService implements TestKafkaServiceI {
@Qualifier("kafkaCustomTemplate")
@Autowired
private KafkaTemplate<String, Object> kafkaCustomTemplate;
@Override
public void testSend(String jsonParam) {
// 发送kafka消息
TestKafkaEvent<String> event = new TestKafkaEvent<>(jsonParam);
try {
(());
}
catch (Exception e) {
throw new RuntimeException("发送消息失败");
}
}
}
import .slf4j.Slf4j;
import ;
import ;
@Slf4j
public class TestKafkaEvent<T extends Serializable> {
private T source;
@Override
public ProducerRecord<String, Object> getProducerRecord() {
("发送消息: {}", getSource());
return new ProducerRecord<>("my-tes-topic", getSource());
}
private TestKafkaEvent(){}
public TestKafkaEvent(T source) {
= source;
}
public T getSource() {
return ;
}
public void setSource(T source) {
= source;
}
}
参考:聊聊在springboot项目中如何配置多个kafka消费者