RocketMQ的使用

时间:2022-06-20 13:19:35

1 在resources目录下创建config目录,新建文件rocketmq.properties文件

# 指定namesrv地址
suning.rocketmq.namesrvAddr=localhost:9876 #生产者group名称
suning.rocketmq.producerGroupName=user_group #事务生产者group名称
suning.rocketmq.transactionProducerGroupName=order_transaction #消费者group名称
suning.rocketmq.consumerGroupName=user_consumer_group #生产者实例名称
suning.rocketmq.producerInstanceName=user_producer_instance #消费者实例名称
suning.rocketmq.consumerInstanceName=user_consumer_instance #事务生产者实例名称
suning.rocketmq.producerTranInstanceName=user_producer_transacition #一次最大消费多少数量消息
suning.rocketmq.consumerBatchMaxSize=1 #广播消费
suning.rocketmq.consumerBroadcasting=false #消费的topic:tag
suning.rocketmq.subscribe[0]=user-topic:white #启动的时候是否消费历史记录
suning.rocketmq.enableHistoryConsumer=false #启动顺序消费
suning.rocketmq.enableOrderConsumer=false

2 新建properties文件读取类

package com.test.domi.config;

import java.util.ArrayList;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource; /**
* @Author 18011618
* @Date 19:31 2018/7/18
* @Function 读取配置文件信息
*/
@PropertySource("classpath:config/rocketmq.properties")
@ConfigurationProperties(prefix = "suning.rocketmq")
@Configuration
public class RocketMQProperties {
private String namesrvAddr;
private String producerGroupName;
private String transactionProducerGroupName;
private String consumerGroupName;
private String producerInstanceName;
private String consumerInstanceName;
private String producerTranInstanceName;
private int consumerBatchMaxSize;
private boolean consumerBroadcasting;
private boolean enableHistoryConsumer;
private boolean enableOrderConsumer;
private List<String> subscribe = new ArrayList<String>(); public String getNamesrvAddr() {
return namesrvAddr;
} public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
} public String getProducerGroupName() {
return producerGroupName;
} public void setProducerGroupName(String producerGroupName) {
this.producerGroupName = producerGroupName;
} public String getTransactionProducerGroupName() {
return transactionProducerGroupName;
} public void setTransactionProducerGroupName(String transactionProducerGroupName) {
this.transactionProducerGroupName = transactionProducerGroupName;
} public String getConsumerGroupName() {
return consumerGroupName;
} public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
} public String getProducerInstanceName() {
return producerInstanceName;
} public void setProducerInstanceName(String producerInstanceName) {
this.producerInstanceName = producerInstanceName;
} public String getConsumerInstanceName() {
return consumerInstanceName;
} public void setConsumerInstanceName(String consumerInstanceName) {
this.consumerInstanceName = consumerInstanceName;
} public String getProducerTranInstanceName() {
return producerTranInstanceName;
} public void setProducerTranInstanceName(String producerTranInstanceName) {
this.producerTranInstanceName = producerTranInstanceName;
} public int getConsumerBatchMaxSize() {
return consumerBatchMaxSize;
} public void setConsumerBatchMaxSize(int consumerBatchMaxSize) {
this.consumerBatchMaxSize = consumerBatchMaxSize;
} public boolean isConsumerBroadcasting() {
return consumerBroadcasting;
} public void setConsumerBroadcasting(boolean consumerBroadcasting) {
this.consumerBroadcasting = consumerBroadcasting;
} public boolean isEnableHistoryConsumer() {
return enableHistoryConsumer;
} public void setEnableHistoryConsumer(boolean enableHistoryConsumer) {
this.enableHistoryConsumer = enableHistoryConsumer;
} public boolean isEnableOrderConsumer() {
return enableOrderConsumer;
} public void setEnableOrderConsumer(boolean enableOrderConsumer) {
this.enableOrderConsumer = enableOrderConsumer;
} public List<String> getSubscribe() {
return subscribe;
} public void setSubscribe(List<String> subscribe) {
this.subscribe = subscribe;
} @Override
public String toString() {
return "RocketMQProperties{" +
"namesrvAddr='" + namesrvAddr + '\'' +
", producerGroupName='" + producerGroupName + '\'' +
", transactionProducerGroupName='" + transactionProducerGroupName + '\'' +
", consumerGroupName='" + consumerGroupName + '\'' +
", producerInstanceName='" + producerInstanceName + '\'' +
", consumerInstanceName='" + consumerInstanceName + '\'' +
", producerTranInstanceName='" + producerTranInstanceName + '\'' +
", consumerBatchMaxSize=" + consumerBatchMaxSize +
", consumerBroadcasting=" + consumerBroadcasting +
", enableHistoryConsumer=" + enableHistoryConsumer +
", enableOrderConsumer=" + enableOrderConsumer +
", subscribe=" + subscribe +
'}';
}
}

3.加载properties文件

package com.test.domi.config;

import javax.annotation.PostConstruct;

import groovy.util.logging.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import java.util.List;
import java.util.stream.Collectors;
/**
* @Author 18011618
* @Date 19:36 2018/7/18
* @Function 通过使用指定的文件读取类 来加载配置文件到字段中
*/
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@Slf4j
public class RocketMQConfiguration { @Autowired
private RocketMQProperties rocketMQProperties; //事件监听
@Autowired
private ApplicationEventPublisher publisher = null; private static boolean isFirstSub = true; private static long startTime = System.currentTimeMillis();
private static Logger log = LoggerFactory.getLogger(RocketMQConfiguration.class); /**
* 容器初始化的时候 打印参数
*/
@PostConstruct
public void init() {
System.err.println(rocketMQProperties.getNamesrvAddr());
System.err.println(rocketMQProperties.getProducerGroupName());
System.err.println(rocketMQProperties.getConsumerBatchMaxSize());
System.err.println(rocketMQProperties.getConsumerGroupName());
System.err.println(rocketMQProperties.getConsumerInstanceName());
System.err.println(rocketMQProperties.getProducerInstanceName());
System.err.println(rocketMQProperties.getProducerTranInstanceName());
System.err.println(rocketMQProperties.getTransactionProducerGroupName());
System.err.println(rocketMQProperties.isConsumerBroadcasting());
System.err.println(rocketMQProperties.isEnableHistoryConsumer());
System.err.println(rocketMQProperties.isEnableOrderConsumer());
System.out.println(rocketMQProperties.getSubscribe().get(0));
} /**
* 创建普通消息发送者实例
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(
rocketMQProperties.getProducerGroupName());
producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
producer.setInstanceName(rocketMQProperties.getProducerInstanceName());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
log.info("rocketmq producer server is starting....");
return producer;
} /**
* 创建支持消息事务发送的实例
* @return
* @throws MQClientException
*/
@Bean
public TransactionMQProducer transactionProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer(
rocketMQProperties.getTransactionProducerGroupName());
producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
producer.setInstanceName(rocketMQProperties
.getProducerTranInstanceName());
producer.setRetryTimesWhenSendAsyncFailed(10);
// 事务回查最小并发数
producer.setCheckThreadPoolMinSize(2);
// 事务回查最大并发数
producer.setCheckThreadPoolMaxSize(2);
// 队列数
producer.setCheckRequestHoldMax(2000);
producer.start();
log.info("rocketmq transaction producer server is starting....");
return producer;
} /**
* 创建消息消费的实例
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQPushConsumer pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
rocketMQProperties.getConsumerGroupName());
consumer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
consumer.setInstanceName(rocketMQProperties.getConsumerInstanceName()); //判断是否是广播模式
if (rocketMQProperties.isConsumerBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
//设置批量消费
consumer.setConsumeMessageBatchMaxSize(rocketMQProperties
.getConsumerBatchMaxSize() == 0 ? 1 : rocketMQProperties
.getConsumerBatchMaxSize()); //获取topic和tag
List<String> subscribeList = rocketMQProperties.getSubscribe();
for (String sunscribe : subscribeList) {
consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);
} // 顺序消费
if (rocketMQProperties.isEnableOrderConsumer()) {
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
try {
context.setAutoCommit(true);
msgs = filterMessage(msgs);
if (msgs.size() == 0)
return ConsumeOrderlyStatus.SUCCESS;
publisher.publishEvent(new MessageEvent(msgs, consumer));
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
}
// 并发消费
else { consumer.registerMessageListener(new MessageListenerConcurrently() { @Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
//过滤消息
msgs = filterMessage(msgs);
if (msgs.size() == 0)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
publisher.publishEvent(new MessageEvent(msgs, consumer));
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000); try {
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
log.info("rocketmq consumer server is starting....");
} catch (InterruptedException e) {
e.printStackTrace();
}
} }).start(); return consumer;
} /**
* 消息过滤
* @param msgs
* @return
*/
private List<MessageExt> filterMessage(List<MessageExt> msgs) {
if (isFirstSub && !rocketMQProperties.isEnableHistoryConsumer()) {
msgs = msgs.stream()
.filter(item -> startTime - item.getBornTimestamp() < 0)
.collect(Collectors.toList());
}
if (isFirstSub && msgs.size() > 0) {
isFirstSub = false;
}
return msgs;
} }

4 创建生产者

package com.test.domi.controller;

import com.test.domi.dto.User;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson.JSON; import java.util.List; @RestController
public class ProducerController {
@Autowired
private DefaultMQProducer defaultProducer; @Autowired
private TransactionMQProducer transactionProducer; /**
* 发送普通消息
*/
@GetMapping("/sendMessage")
public void sendMsg() { for(int i=0;i<10;i++){
User user = new User();
user.setId(String.valueOf(i));
user.setUsername("jhp"+i);
String json = JSON.toJSONString(user);
Message msg = new Message("user-topic","white",json.getBytes());
try {
SendResult result = defaultProducer.send(msg);
System.out.println("消息id:"+result.getMsgId()+":"+","+"发送状态:"+result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息发送失败");
}
} } /**
* 发送事务消息
* @return
*/
@GetMapping("/sendTransactionMess")
public String sendTransactionMsg() {
SendResult sendResult = null;
try {
// a,b,c三个值对应三个不同的状态
String ms = "c";
Message msg = new Message("user-topic","white",ms.getBytes());
// 发送事务消息
sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1, Object arg) -> {
String value = "";
if (arg instanceof String) {
value = (String) arg;
}
if (value == "") {
throw new RuntimeException("发送消息不能为空...");
} else if (value =="a") {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (value =="b") {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}, 4);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
return sendResult.toString();
} /**
* 支持顺序发送消息
*/
@GetMapping("/sendMessOrder")
public void sendMsgOrder() {
for(int i=0;i<100;i++) {
User user = new User();
user.setId(String.valueOf(i));
user.setUsername("jhp" + i);
String json = JSON.toJSONString(user);
Message msg = new Message("user-topic", "white", json.getBytes());
try{
defaultProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = ((Integer) arg) % mqs.size();
return mqs.get(index);
}
},i);
}
catch (Exception e){
e.printStackTrace();
}
}
}
}

5.创建监听对象

package com.test.domi.config;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.context.ApplicationEvent;
import java.util.List;
/**
* 监听对象
* @author 18011618
*
*/
public class MessageEvent extends ApplicationEvent {
private static final long serialVersionUID = -4468405250074063206L;
private DefaultMQPushConsumer consumer;
private List<MessageExt> msgs; public MessageEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception {
super(msgs);
this.consumer = consumer;
this.setMsgs(msgs);
} public DefaultMQPushConsumer getConsumer() {
return consumer;
} public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
} public List<MessageExt> getMsgs() {
return msgs;
} public void setMsgs(List<MessageExt> msgs) {
this.msgs = msgs;
}
}

6.监听消息进行消费

package com.test.domi.config;

import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component; /**
* 监听消息进行消费
*/
@Component
public class ConsumerService {
@EventListener(condition = "#event.msgs[0].topic=='user-topic' && #event.msgs[0].tags=='white'")
public void rocketmqMsgListener(MessageEvent event) {
try {
List<MessageExt> msgs = event.getMsgs();
for (MessageExt msg : msgs) {
System.err.println("消费消息:"+new String(msg.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

访问/sendMessage的url生产消息,控制台打印如下:

消息id:C0A801652B5C18B4AAC20F5FF9C60032:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFE20033:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFE50034:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFE80035:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFEB0036:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFEF0037:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFF30038:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFF90039:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFFC003A:,发送状态:SEND_OK
消息id:C0A801652B5C18B4AAC20F5FFFFF003B:,发送状态:SEND_OK
消费消息:{"id":"9","username":"jhp9"}
消费消息:{"id":"6","username":"jhp6"}
消费消息:{"id":"0","username":"jhp0"}
消费消息:{"id":"5","username":"jhp5"}
消费消息:{"id":"1","username":"jhp1"}
消费消息:{"id":"2","username":"jhp2"}
消费消息:{"id":"7","username":"jhp7"}
消费消息:{"id":"3","username":"jhp3"}
消费消息:{"id":"4","username":"jhp4"}
消费消息:{"id":"8","username":"jhp8"}