mqtt消息推送-推送服务简单实现

时间:2023-01-04 13:25:31

上一章已经部署了mosquito服务器。这里实现一个简单的推送服务功能。主要包括两个部分。一个上传推送消息的API接口。连接mosquito并推送消息的服务。

简单的例子可以使用springboot快速开发,使用默认配置即可。
新建maven项目mqtt-server,pom.xml配置文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>com.mob</groupId>
<artifactId>mqtt-server</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>mqtt-server Maven Webapp</name>
<url>http://maven.apache.org</url>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
<relativePath/>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>mqtt-server</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

新建springboot启动类Application.java

@SpringBootApplication
@RestController
public class Application {

@RequestMapping("/")
public String greeting() {
return "welcome, mqtt server!";
}

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

新建mqtt推送服务代码MqttServer.java

@Component
@ConfigurationProperties(prefix = "my") // 配置文件中的前缀
public class MqttServer {
private String hostName; //地址
private String clientId; //clientId
private String topicName; //主题
private MqttClient client ;

public MqttServer(){
try {
//获取客户端连接
client=new MqttClient(hostName,clientId);

//设置参数
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
// options.setUserName(userName);
// options.setPassword(passWord.toCharArray());
// 设置超时时间
//options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setAutomaticReconnect(true);
options.setKeepAliveInterval(20);
client.setCallback(new PushCallback());
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}

/***
* 发送消息
* @param sendMsg 消息内容
*/

public void publish(String sendMsg){
try {
MqttTopic topic = client.getTopic(topicName);
System.out.println("发送的消息内容为:"+sendMsg);
MqttMessage message = new MqttMessage(sendMsg.getBytes("utf-8"));
message.setQos(1);
topic.publish(message);
} catch (Exception e) {
e.printStackTrace();
}
}

public String getHostName() {
return hostName;
}

public void setHostName(String hostName) {
this.hostName = hostName;
}

public String getTopicName() {
return topicName;
}

public void setTopicName(String topicName) {
this.topicName = topicName;
}

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public MqttClient getClient() {
return client;
}

public void setClient(MqttClient client) {
this.client = client;
}
}

/***
* 回调函数
* @author ouyjm
*
*/

class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// publish后会执行到这里
System.out.println("deliveryComplete---------"+ token.isComplete());

}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:"+topic);
System.out.println("接收消息Qos:"+message.getQos());
System.out.println("接收消息内容:"+new String(message.getPayload()));

}
}

新建接收推送消息的API接口MqttApi.java。

@RestController
@RequestMapping("/mqttServer")
public class MqttApi {
@Autowired
private MqttServer mqttServer;

@RequestMapping(value = "/send", method = RequestMethod.GET)
public String sendMessage(String message){
mqttServer.publish(message);
return "send \""+message+"\" to client success!";
}

}

配置文件application.yml中修改了默认端口,以及mqtt服务器相关配置。如下:

server:
port: 9090
my:
hostName: tcp://localhost:1883
clientId: "ibm-mqisdp"
topicName: mqtt

以上为maven工程的代码。


使用方式1:
不考虑客户端实现调试。
- 启动mqtt服务,见上一章。
- 服务器上启动一个对mqtt topic的订阅,见上一章。
- 访问http://location:9090/mqttServer/send?message=8945782
即可在订阅的控制台看到8945782的输出。

使用方式2:
Android机器联调。
可以到IBM网站上下载Android的demo。
填入ClientID(通过查看端口占用状态可以知道lsof -i tcp:1883)、
Host(需要加上tcp://)、
Port(默认1883)后连接。
然后填入Topic订阅。
这时通过访问API接口,就能在手机上收到提醒了。
mqtt消息推送-推送服务简单实现
mqtt消息推送-推送服务简单实现