利用空的queue名字("")让rabbitMQ生成一个唯一的队列名称,同时指定队列是:临时的(auto-delete)、私有的(exclusive)。
在发送的RPC调用消息里设置消息的属性(com.rabbitmq.client.AMQP.BasicProperties)的reply_to字段来传递上面那个随机的队列名称。
基本流程如下:
代码如下:
服务端RpcServer.java
package com.yzl.test4; import java.io.IOException; import org.apache.log4j.Logger; import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; /**
* 基于rabbitMQ的RPC服务的服务提供者
* 消息消费者角色,被动接收消息,然后回复消息
* @author: yzl
* @date: 2016-10-23
*/
public class RpcServer {
//交换器名称
private static final String EXCHANGE_NAME = "rpcDirectExchange";
//client调用的消息存储queue
private static final String QUEUE_NAME = "rpcQueue";
//服务提供方的路由key,实际使用场景会是应用的appid或者appName
private static final String SERVER_ROUTING_KEY = "rpc_server1";
private static final Logger logger = Logger.getLogger(RpcServer.class); /**
* @param args
* @throws
* @throws IOException
*/
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest"); Connection connection = factory.newConnection();
final Channel channel = connection.createChannel(); //定义RPC的direct的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明接收client调动请求的队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, SERVER_ROUTING_KEY); logger.info("服务启动完成,等待接收请求....."); //接收client端的请求
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//回复ack响应
channel.basicAck(envelope.getDeliveryTag(), false); logger.info("收到client调用请求-----------");
String msg = new String(body);
logger.info("收到的消息如下:" + msg);
String resp = null; //模拟解析需要调用的方法
if(msg.indexOf("add") != -1){
String parameter = msg.substring(msg.indexOf("(")+1, msg.indexOf(")"));
String[] args = parameter.split(",");
resp = String.valueOf(add(Integer.valueOf(args[0]), Integer.valueOf(args[1])));
}else{
resp = "method is not found!!";
}
logger.info("需要回调的client的queueName:" + properties.getReplyTo());
try {
Integer time = new java.util.Random().nextInt(5000);
logger.info("休眠" + time + "毫秒");
//随即休眠,模拟服务调用方法正常业务方法耗时
Thread.sleep(time);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//发送给默认的direct交换器,路由键为发送方的queueName,默认交换机会把消息转发给此路由键名称的queue
channel.basicPublish("", properties.getReplyTo(), properties, (msg + ", resp is:" + resp).getBytes());
logger.info("回复clent完成");
}
});
} //模拟的server中的方法
private static int add(int num1, int num2){
logger.info("call add method, para num1 is :" + num1 + ",num2 is :" + num2);
return num1 + num2;
}
}
客户端
package com.yzl.test4; import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.apache.log4j.Logger; import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery; /**
* 基于rabbitMQ的RPC服务的服务调用者
* 消费生产者角色,发送调用请求,然后读取回复
* 发送的交换器 和 接收回应的交换器是不同的
* @author: yzl
* @date: 2016-10-23
*/
public class RpcClient {
//交换器名称
private static final String EXCHANGE_NAME = "rpcDirectExchange";
//服务提供方的路由key
private static final String SERVER_ROUTING_KEY = "rpc_server1"; private static final Logger logger = Logger.getLogger(RpcClient.class);
/**
* @param args
*/
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest"); Connection connection = factory.newConnection();
final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); int threadCount = 1; //使用CountDownLatch控制10个线程一起运行
final CountDownLatch cdl = new CountDownLatch(threadCount); //生成10个线程同时访问服务
ExecutorService pool = Executors.newFixedThreadPool(threadCount); for(int i=0; i<threadCount; i++){
final int index = i;
pool.submit(new Runnable() {
@Override
public void run() {
try {
cdl.await(); //Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
//默认direct exchange 匿名的、 私有、自动删除
Queue.DeclareOk queue = channel.queueDeclare("", false, true, true, null);
//获取rabbit帮我们生成的随即的队列名称,来接收服务端返回的数据
String queueName = queue.getQueue(); final String messageId = UUID.randomUUID().toString();
//定义replyTo属性 和 消息ID
BasicProperties props = new BasicProperties.Builder().replyTo(queueName).correlationId(messageId).build(); logger.info("发送rpc调用请求,消息index:" + index); //发送RPC方法调用
channel.basicPublish(EXCHANGE_NAME, SERVER_ROUTING_KEY, props, ("add(" + index + "," + (index+1) + ")").getBytes()); logger.info("等待服务器响应"); //定义队列式的消费者处理器,之前是用的DefaultConsumer
QueueingConsumer consumer = new QueueingConsumer(channel); //把消费者处理器和队列对照起来
channel.basicConsume(queueName, consumer); //这里会堵塞,直到取到值或者超时
logger.info("尝试从consumer里取返回的值");
Delivery delivery = consumer.nextDelivery();
//Delivery delivery = consumer.nextDelivery(5000);
logger.info("成功取到消息,开始处理");
if(delivery.getProperties().getCorrelationId().equals(messageId)){
logger.info("收到服务器回复-----------");
String msg = new String(delivery.getBody());
logger.info("回复消息id:" + delivery.getProperties().getCorrelationId() + "内容:" + msg);
} /*
//该取消息的方式是异步的,不会堵塞,会导致后面的logger.info先于handleDelivery执行
channel.basicConsume(queueName, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
if(properties.getCorrelationId().equals(messageId)){
logger.info("收到服务器回复-----------");
String msg = new String(body);
logger.info("回复消息id:" + properties.getCorrelationId() + "内容:" + msg);
}
}
});
*/ logger.info("消息处理完成");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
for(int i=0; i<threadCount; i++){
cdl.countDown();
}
}
}
先运行server后运行client,输入如下:
2016-10-23 21:22:38,961 [com.yzl.test4.RpcServer]-[INFO] 服务启动完成,等待接收请求.....
2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 收到client调用请求-----------
2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 收到的消息如下:add(0,1)
2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] call add method, para num1 is :0,num2 is :1
2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 需要回调的client的queueName:amq.gen-iek4oYqWiqQ-HU7-i2g6mA
2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 休眠3999毫秒
2016-10-23 21:22:46,415 [com.yzl.test4.RpcServer]-[INFO] 回复clent完成
2016-10-23 21:22:42,398 [com.yzl.test4.RpcClient]-[INFO] 发送rpc调用请求,消息index:0
2016-10-23 21:22:42,414 [com.yzl.test4.RpcClient]-[INFO] 等待服务器响应
2016-10-23 21:22:42,414 [com.yzl.test4.RpcClient]-[INFO] 尝试从consumer里取返回的值
2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 成功取到消息,开始处理
2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 收到服务器回复-----------
2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 回复消息id:1e4b7fb3-1728-41e2-8cac-b13fe88d5a20内容:add(0,1), resp is:1
2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 消息处理完成