IBM websphere MQ 消息发送与获取

时间:2023-03-09 06:23:23
IBM websphere MQ 消息发送与获取

一. 所需依赖包,安装 IBM websphere MQ 后,在安装目录下的 java 目录内

import java.io.IOException;
import java.util.Properties; import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.constants.MQConstants; /**
* 客户端模式开发
*
* @author Geely
*
*/
public class MQTest1 { public static void main(String[] args) throws MQException, IOException { // 发送消息给队列
put(); // 从队列读取消息
get(); // 获取队列深度
getDepth(); } @SuppressWarnings("unchecked")
static void put() throws MQException, IOException { // 配置MQ服务器连接参数
MQEnvironment.hostname = "127.0.0.1";
MQEnvironment.port = 1414;
MQEnvironment.channel = "QM_ACK";
// MQEnvironment.userID = "";
// MQEnvironment.password = "";
// 设置队列管理器字符集 编码
MQEnvironment.CCSID = 1381; // 设置应用名称,方便服务器MQ 查看应用连接
MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java"); // 设置应用名称,方便服务器MQ 查看应用连接
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); // 创建实例,连接队列管理器
MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties); // 以可写的方式访问队列管理器已定义的队列QUEUE1,当然也可以创建队列
MQQueue putQueue = queueManager.accessQueue("QUEUE_RECV", CMQC.MQOO_OUTPUT); // 新建并发送消息给队列
MQMessage myMessage = new MQMessage();
myMessage.expiry = -1; // 设置消息用不过期
String name = "MePlusPlus's 博客";
myMessage.writeUTF(name); // 使用默认的消息选项
MQPutMessageOptions pmo = new MQPutMessageOptions();
// 发送消息
putQueue.put(myMessage, pmo);
putQueue.close(); // 断开连接
queueManager.disconnect();
} @SuppressWarnings("unchecked")
static void get() throws MQException, IOException { // 配置MQ服务器连接参数
MQEnvironment.hostname = "127.0.0.1";
MQEnvironment.port = 1414;
MQEnvironment.channel = "QM_ACK";
// MQEnvironment.userID = "";
// MQEnvironment.password = "";
// 设置队列管理器字符集 编码
MQEnvironment.CCSID = 1381; // 设置应用名称,方便服务器MQ 查看应用连接
MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java"); // 设置应用名称,方便服务器MQ 查看应用连接
MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES_BINDINGS); // 创建实例,连接队列管理器
MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties); // 打开队列.
int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE; // 以可读的方式访问队列管理器已定义的队列QUEUE1
// MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV",CMQC.MQOO_INPUT_AS_Q_DEF);
MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null); MQGetMessageOptions gmo = new MQGetMessageOptions();
// Get messages under sync point control.
// 在同步点控制下获取消息.
//gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
// Wait if no messages on the Queue.
// 如果在队列上没有消息则等待.
//gmo.options = gmo.options + CMQC.MQGMO_WAIT;
// Fail if QeueManager Quiescing.
// 如果队列管理器停顿则失败.
//gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING;
// Sets the time limit for the wait.
// 设置等待的时间限制.
gmo.waitInterval = 3000; // 从队列读取消息
MQMessage theMessage = new MQMessage();
getQueue.get(theMessage, gmo); String name = theMessage.readUTF(); System.out.println(name);
getQueue.close(); // 断开连接
queueManager.disconnect();
} static void getDepth() throws MQException, IOException { Properties props = new Properties();
props .put("hostname", "127.0.0.1");
props .put("port", 1414); //端口号
props .put("channel", "QM_ACK"); //服务器连接通道
props .put("CCSID", 1381);
props .put("transport", "MQSeries"); // 创建实例,连接队列管理器
MQQueueManager queueManager = new MQQueueManager("QM",props ); // 打开队列.
int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE;
// 以可读的方式访问队列管理器已定义的队列QUEUE1
MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null); MQGetMessageOptions gmo = new MQGetMessageOptions();
// Get messages under sync point control.
// 在同步点控制下获取消息.
// gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
// Wait if no messages on the Queue.
// 如果在队列上没有消息则等待.
// gmo.options = gmo.options + CMQC.MQGMO_WAIT;
// Fail if QeueManager Quiescing.
// 如果队列管理器停顿则失败.
// gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING; // Sets the time limit for the wait.
// 设置等待的时间限制.
gmo.waitInterval = 3000; int depth = getQueue.getCurrentDepth();
System.out.println("该队列当前的深度为:"+depth);
System.out.println("===========================");
while(depth-->0)
{
MQMessage msg = new MQMessage();// 要读的队列的消息
getQueue.get(msg, gmo);
System.out.println("消息的大小为:"+msg.getDataLength());
System.out.println("消息的内容:\n"+msg.readUTF());
System.out.println("---------------------------");
}
System.out.println("Finish!!!"); getQueue.close(); // 断开连接
queueManager.disconnect(); } }

JAVA从MQ读取消息的时候报错及解决

  JAVA通过writeUTF将消息写入到MQ,再通过JAVA采用MQMessage读消息的方法readUTF()去读取的时候,就不会报错,可以正常读出来。但是如果采用在MQ资源管理器中插入测试消息或者是通过另外一台MQ服务器往当前MQ服务器通过远程队例写消息过来,通过JAVA读取出会错.

MQMessage mqmsg = new MQMessage();// 要读的队列的消息
getQueue.get(mqmsg, gmo); System.out.println("消息的大小为:" + mqmsg.getDataLength()); byte[] rawData = new byte[mqmsg.getMessageLength()]; // 先转byte
mqmsg.readFully(rawData); // 读出所有数据
String msg = new String(rawData); // byte转string
System.out.println("消息的内容:\n"+msg);