ActiveMQ使用记录

时间:2023-03-09 15:27:05
ActiveMQ使用记录

1、在Linux中安装ActiveMQ

官方文档地址:http://activemq.apache.org/getting-started.html#GettingStarted-StartingActiveMQStartingActiveMQ

(1)、(Ubuntu中)先更新,执行 sudo apt-get update

(2)、需要Java环境,使用java –version命令显示版本信息,没有JAVA自己安装配置去 - -

(3)、 下载activemq,执行: wget http://activemq.apache.org/path/tofile/apache-activemq-x.x.x-bin.tar.gz

(4)、解压,执行: cd [activemq_install_dir]      tar zxvf activemq-x.x.x-bin.tar.gz

(5)、运行 :cd [activemq_install_dir]/bin   ./activemq start(我这步没有成功,然后看文件目录下有linux-x86-32/linux-x86-64,我是64位系统,然后进入linux-x86-64下,执行./activemq start,运行成功!

(6)、输入HTTP://IP:8161  admin  admin进入管理!  一定要输入HTTP://   一定要输入HTTP://    一定要输入HTTP://  才能访问!!

 

2、使用延时消息特性 

(该部分转自:http://www.voidcn.com/blog/doffs/article/p-2407882.html) 

关于MSMQ和ActiveMQ的基础功能就不再描述。这里说说ActiveMQ的比较实用的一个功能:ScheduledMessage

ScheduledMessage可以理解为一个调度功能,可以延时、重复发送消息。我就是采用这种方案来实现Web中的定时功能和系统中不同模块之间的消息传递。

这里主要说一下延时消息的新增、删除操作(坑啊官方API里面貌似没有这一个功能的描述,只有一全案例,而案例上所用到的东西,在NuGet上下载的dll根本没有这一个类)。

要使用Scheduled功能需要在服务端conf/activemq.xml中把Scheduled打开,根据官网上的提示,是需要在broker上面添加一个schedulerSupport="true"属性,配置好后,重启服务。

<!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker schedulerSupport="true" xmlns="http://activemq.apache.org/schema/core"

延时消息发送:

                IConnectionFactory factory = new ConnectionFactory("tcp://192.168.1.248:61616/");
using (IConnection connection = factory.CreateConnection())
{
using (ISession session = connection.CreateSession())
{
IMessageProducer prod = session.CreateProducer(
new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing")); ITextMessage msg = prod.CreateTextMessage();
msg.Text = "这是一个延时消息";
msg.Properties.SetLong("AMQ_SCHEDULED_DELAY", 3600000); //这里设置一个延时发送,后面是延时单位是ms,
prod.Send(msg);
Console.WriteLine(msg.NMSMessageId);//发送成功后,NMSMessageId将会被赋值,如果需要删除单条消息需要把这个记录下来 }
}

发送成功后,在服务器上可以看到这个延时消息

ActiveMQ使用记录

删除这个延时消息:

 IConnectionFactory factory = new ConnectionFactory("tcp://192.168.1.248:61616/");
using (IConnection connection = factory.CreateConnection())
{
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination management = session.GetDestination("ActiveMQ.Scheduler.Management");
connection.Start();
IMessageProducer producer = session.CreateProducer(management);
IMessage request = session.CreateMessage(); request.Properties.SetString("AMQ_SCHEDULER_ACTION", "REMOVE");//指定当前消息是来删除某个延时消息
request.Properties.SetString("scheduledJobId", id);//指定要删除的延时消息的ID、这个ID就是刚刚发送成功时返回的ID
producer.Send(request); }
}

参考资料

http://www.programcreek.com/java-api-examples/index.php?api=javax.jms.MessageListener

http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/ScheduledMessage.html#line.64

发送延时消息的简单封装:

        /// <summary>
/// 发送文本延时消息
/// </summary>
/// <param name="messageProducer">The message producer.</param>
/// <param name="msg">消息内容</param>
/// <param name="delayTime">延时时间(单位秒)</param>
/// <returns><c>true</c> if XXXX, <c>false</c> otherwise.</returns>
public bool SendScheduledTextMessage(IMessageProducer messageProducer, string msg, long delayTime)
{
try
{
var message = messageProducer.CreateTextMessage();
message.Properties.SetLong("AMQ_SCHEDULED_DELAY", delayTime * 1000);
message.Text = msg;
messageProducer.Send(message);
return true;
}
catch (Exception)
{
return false;
}
}

3、Send参数说明

Send方法:

message.Send(msg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
//第一个参数:消息的内容(byte或者是string)
//第二个参数:消息的持久化方式,这里为不进行持久。(如果为持久,可通过ActiveMQ的配置文件,配置ActiveMQ持久化的方式和位置)
//第三个参数:优先级(优先级为0-9,0最低,9最高)。这里使用的是Normal,为5
//第四个参数:为该条消息的过期时间,默认为0,永不过期。若设置过期时间,则消息超过设置的时间不处理,则会被丢弃到DLQ(Dead Letter Queue)队列。详细说明可参考:http://sharong.iteye.com/blog/1987171

4、获取ActiveMQ队列里的消息个数

这个一直在研究,但是迟迟搞不定。中文社区的资料里,基本都是JAVA的资料,使用JMS规范去获得,暂未发现.NET方面的资料。

所以暂附上现阶段的研究成果吧。

1、在官方的文档里,提到了如何获取这些信息。地址:http://activemq.apache.org/how-can-i-monitor-activemq.html

  根据文档发现首推的就是使用JMS,然后还有网页控制台,命令行,插件等等。不过依然没什么头绪。

2、在Stack Overflow上找到了一个相关问题,链接:http://*.com/questions/7512004/activemq-with-c-sharp-and-apache-nms-count-messages-in-queue  。

  (1)、根据问题的提示,是建立一个Browser,通过他一直去取消息,取到多少个代表队列里还有多少消息。暂不评论合不合适。  当我在使用的时候,发现每次都是只能取到一个。经过仔细研究之后,发现建立队列的时候,他的Message ID不同,用这种方法去取的时候,只能获取当前Message ID下的消息,所以每次都是一个。

  (2)、全局所有关于Activemq的都是使用单例模式,然并卵。。。依旧无果

  (3)、是不是配置的问题,只能访问到当前MessageID的消息?(这是一个问题)

3、最简单粗暴的方法,既然在8161控制台里能看到个数,那么直接HTTP请求,分析RSS页面内容。。你们懂的。。   http://127.0.0.1:8161/admin/queueBrowse/[队列名]?view=rss&feedType=atom_1.0