(一)理解消息通信
1.消息通信概念---消费者、生产者和代理
生产者(producer)创建消息,然后发送到代理服务器(RaabitMQ)。
其中消息包括两部分内容:有效载荷(payload)和标签(label)。
有效载荷就是你想要传输的数据,它可以是任何内容。
标签描述了有效载荷,并且RabbitMQ用它来决定谁来将获得消息的拷贝。
RaabitMQ会根据标签将消息发送到感兴趣的接收方。----“发后即忘”的单向通信方式。
消费者(consumer)连接到代理服务器(RaabitMQ)并订阅到队列(queue)上。
这里解释一下“连接”的意思。消费者与生产者都必须首先连接到RaabitMQ,才能消费或发布消息。即生产者或消费者与RaabitMQ代理服务器之间创建一条TCP连接。
一旦TCP连接打开(你通过了认证),生产者或消费者就可以创建一条AMQP信道(channel),信道是建立在“真实的”TCP连接内的虚拟连接。AMQP命令都是通过信道发送出去
的。每条信道都会被指派一个唯一ID(AMQP库会帮你记住ID的)。所以,不论发布消息,订阅队列或是接收消息,这些动作都是通过信道完成的。
总之,RaabitMQ可以看做是软件的路由器。
2.AMQP元素---转发器、队列、绑定
生产者将消息发布到转发器上,消息最终到达队列,并被消费者接收;绑定决定了消息如何从路由器路由到特定的队列。
每个rabbitmq-server叫做一个Broker,等着tcp连接进入。
在rabbit-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等)。
Queue是进程内的逻辑队列,有多个,有名字。
Binding联系Exchange与Queue。
Routingkey由生产者指定。Bidingkey由消费者指定。二者联合决定一条消息的来去。
最基本的rabbitmq连接代码:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xxxxxx"); factory.setPort(xxxx); factory.setUsername("xxx");
factory.setPassword("xxx"); Connection connection =factory.newConnection(); final Channel channel =connection.createChannel();
最后这个channel就可以用来收和发消息了。
声明exchange与queue:
channel.exchangeDeclare("logs","fanout");
(对列名称 类型 是否持久化,不使用时是否自动删除,是否是内部的(不能被客户端使用),其他参数)
String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); channel.queueDeclare (对列名称,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除,其他参数)
channel.queueBind (对列名称,交换机名字,此次绑定使用的路由关键字,其他参数)
发出消息:
String message=""; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());