spring boot 整合 RabbitMq (注解)

时间:2023-03-10 07:15:35
spring boot 整合 RabbitMq (注解)

1、增加rabbitmq的依赖包

<!-- ampq 依赖包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application.yaml文件中配置

spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
publisher-confirms: true
virtual-host: /

3、RabbitMq的工厂连接和模板创建

@Configuration
public class RabbitConfig
{
@Value("${spring.rabbitmq.host}")
private String host; @Value("${spring.rabbitmq.port}")
private String port; @Value("${spring.rabbitmq.username}")
private String username; @Value("${spring.rabbitmq.password}")
private String password; @Value("${spring.rabbitmq.publisher-confirms}")
private Boolean publisherConfirms; @Value("${spring.rabbitmq.virtual-host}")
private String virtualHost; //创建工厂连接
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(this.host);
connectionFactory.setUsername(this.username);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtualHost);
connectionFactory.setPublisherConfirms(this.publisherConfirms); //必须要设置
return connectionFactory;
} //rabbitmq的模板配置
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
//template.setConfirmCallback(); 设置消息确认
//template.setReturnCallback();
return template;
}
}

4、创建交换机、创建队列、绑定交换机和队列

@Configuration
public class RabbitExchangeConfig
{
//直连交换机
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(RabbitConstant.EXCHANGE_NAME);
} //队列
@Bean
public Queue queue() {
return QueueBuilder.durable(RabbitConstant.QUEUE_NAME).build();
} //绑定
@Bean
public Binding binding() {
return BindingBuilder.bind(this.queue()).to(this.defaultExchange()).with(RabbitConstant.ROUTING_KEY);
}
}

5、消费者

@Component
public class RabbitSender implements RabbitTemplate.ConfirmCallback
{
@Autowired
private RabbitTemplate rabbitTemplate; public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
this.rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_NAME,
RabbitConstant.ROUTING_KEY, content, correlationId);
} //回调
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(" 回调id:" + correlationData);
if (ack) {
System.out.println("消息成功消费");
} else {
System.out.println("消息消费失败:" + cause);
}
}
}

6、消费者

@Configuration
public class RabbitReceive
{
@Autowired
private RabbitConfig rabbitConfig; @Autowired
private RabbitExchangeConfig rabbitExchangeConfig; @Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitConfig.connectionFactory());
container.setQueues(rabbitExchangeConfig.queue()); //设置要监听的队列
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
}