基于Netty-Socket-io的无直接调用式的数据传输

时间:2022-09-29 11:24:45

实时消息的推送,PC端的推送技术可以使用socket建立一个长连接来实现。传统的web服务都是客户端发出请求,服务端给出响应。但是现在直观的要求是允许特定时间内在没有客户端发起请求的情况下服务端主动推送消息到客户端。

有哪些可以实现web消息推送的技术:

①不断地轮询(俗称“拉”,polling)是获取实时消息的一个手段:Ajax 隔一段时间(通常使用 JavaScript 的 setTimeout 函数)就去服务器查询是否有改变,从而进行增量式的更新。但是间隔多长时间去查询成了问题,因为性能和即时性造成了严重的反比关系。间隔太短,连续不断的请求会冲垮服务器,间隔太长,务器上的新数据就需要越多的时间才能到达客户机。

优点:服务端逻辑简单;

缺点:其中大多数请求可能是无效请求,在大量用户轮询很频繁的情况下对服务器的压力很大;

应用:并发用户量少,而且要求消息的实时性不高,一般很少采用;

②长轮询技术(long-polling):客户端向服务器发送Ajax请求,服务器接到请求后hold住连接,直到有新消息或超时(设置)才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求。

优点:实时性高,无消息的情况下不会进行频繁的请求;

缺点:服务器维持着连接期间会消耗资源;

③基于Iframe及htmlfile的流(streaming)方式:iframe流方式是在页面中插入一个隐藏的iframe,利用其src属性在服务器和客户端之间创建一条长链接,服务器向iframe传输数据(通常是HTML,内有负责插入信息的javascript),来实时更新页面。

优点:消息能够实时到达;

缺点:服务器维持着长连接期会消耗资源;

④插件提供socket方式:比如利用Flash XMLSocket,Java Applet套接口,Activex包装的socket。

优点:原生socket的支持,和PC端和移动端的实现方式相似;

缺点:浏览器端需要装相应的插件;

⑤WebSocket:是HTML5开始提供的一种浏览器与服务器间进行全双工通讯的网络技术。

优点:更好的节省服务器资源和带宽并达到实时通讯;

缺点:目前还未普及,浏览器支持不好;

综上,考虑到浏览器兼容性和性能问题,采用长轮询(long-polling)是一种比较好的方式。
netty-socketio是一个开源的Socket.io服务器端的一个java的实现, 它基于Netty框架。 项目地址为(https://github.com/mrniko/netty-socketio)

基于项目需要,我们系统要做一个中转的服务,类似:三方系统A调用我们的接口通过参数的形式把数据传给我们,然后我们在接收到这个参数并做检验之后马上推送给三方系统B,实现数据的高实时性。但由于三方系统B是纯前端项目,类似Echarts,无任何后台,重要的是他们无法知道什么时间三方A系统调用了我们的接口传给我们数据,所以三方B系统无法通过接口形式来获取数据,鉴于此,参考了一下实时通信的案例,发现
netty-socketio是一个很好的解决方案,然后自己改造了下完成了自己的需求,改造如下:

POM依赖

<!-- socket心跳包 -->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.3</version>
</dependency>

这个包下有很多依赖包,大概七八个。

工具类封装
package com.spdb.hs.data.utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOServer;

public class WebSocketService implements ApplicationListener{

private static Logger log = LoggerFactory.getLogger(WebSocketService.class);

private static Configuration config;
private static SocketIOServer server;
private String eventName;
private String data;

public WebSocketService(){};

public WebSocketService(String eventName,String data){
this.data=data;
this.eventName=eventName;
}

public void startLisener(){
try {
SocketEventListener(eventName,data);
} catch (Exception e) {
server.stop();
e.printStackTrace();
}


}


private void SocketEventListener(final String eventName, final String data) {
try {
//默认给所有连接的客户端推送数据,广播形式
server.getBroadcastOperations().sendEvent(eventName, data);
} catch (Exception e) {
log.error(e.getMessage(), e);
}


}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
Properties p = new Properties();
InputStream in = event.getApplicationContext().getClassLoader().getResourceAsStream("conf/socket.properties");
try {
p.load(in);
} catch (IOException e) {
log.error("Socket配置文件读取失败......", e);
}
String Host = p.getProperty("socket.host");
String Port = p.getProperty("socket.port");
log.info("Host="+Host+",Port="+Port);
if(server==null){
config = new Configuration();
config.setHostname(Host);
config.setPort(Integer.valueOf(Port));
server = new SocketIOServer(config);
server.start();
log.info("Socket Service was started");
}
}

}

实现了ApplicationListener接口,这么做的原因是让项目在spring容器加载完成后默认会去做的事,目的在于项目一启动就创建唯一的SocketService服务端。后续可以已调用方法的形式自定义的给客户端发信息(具体服务端与特定客户端通信省略)

配置文件

Socket.properties

socket.host=xxx.xxx.xxx.xxx
socket.port=9001

spring-bean.xml

<bean id="webSocketService" class="com.spdb.hs.data.utils.WebSocketService"/>

这个东西必须在bean中配置好,不然不起作用。

调用实例

package com.spdb.hs.data.controller.app;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import com.mchange.v2.lang.StringUtils;
import com.spdb.hs.data.parameter.EventType;
import com.spdb.hs.data.utils.ApiService;
import com.spdb.hs.data.utils.WebSocketService;
import com.spdb.hs.data.vo.RspVo;
import com.spdb.hs.oms.vo.PacketHead;

@Controller
@RequestMapping("/drill")
public class DrillController {

private Logger log = LoggerFactory.getLogger(DrillController.class);

/**
*
* 方法描述:
* 创建人:zhy
* 创建时间:2017年9月21日 下午2:27:44
* 修改时间:2017年9月21日 下午2:27:44
* Description: 启动
*
*/

@RequestMapping("quickStart")
@ResponseBody
public RspVo getReady(){
RspVo vo = new RspVo();
String drillUrl = "xxx.xxx.xxx.xxx";
String data = null;
try {
data = new ApiService().doGet(drillUrl);
} catch (Exception e) {
log.error(e.getMessage(),e);
}
if(null==data){
vo.setCode(PacketHead.JSON_ERR_FORMAT);
vo.setMsg("请求参数错误,请检查数据......");
}else{
vo.setData(data);
}
return vo;
}

/**
*
* 方法描述:
* 创建人:zhy
* 创建时间:2017年9月21日 下午2:27:30
* 修改时间:2017年9月21日 下午2:27:30
* Description: 定时更新接口
*
*/

@RequestMapping(value="timerUpd")
@ResponseBody
public RspVo timingUpdate(@RequestParam(required=true)String data){
RspVo vo = new RspVo();
log.info(" 定时更新接口接收到的参数为:data="+data);
if(StringUtils.nonEmptyString(data)){
try {
WebSocketService socketService = new WebSocketService(EventType.EventType_Timer,data);
socketService.startLisener();
} catch (Exception e) {
log.error(e.getMessage(), e);
vo.setCode(PacketHead.JSON_ERR_SYS);
vo.setMsg("系统繁忙请稍候再试......");
return vo;
}
}
return vo;

}

/* @RequestMapping(value="timerUpd2")
@ResponseBody
public RspVo timingUpdate2(@RequestParam(required=true)String beginTime,@RequestParam(required=true)String endTime,
@RequestParam(required=true)String processStatus,@RequestParam(required=true)String onlineUserCount,
@RequestParam(required=true)String allUserCount,@RequestParam(required=true)String facilityCount,
@RequestParam(required=true)String allTaskCount,@RequestParam(required=true)String autoTaskCount,
@RequestParam(required=true)String doneTaskCount,@RequestParam(required=true) String customFields){

log.info(" 定时更新接口接收到的参数为:beginTime="+beginTime+",endTime="+endTime+",processStatus="+processStatus+
",onlineUserCount="+onlineUserCount+",allUserCount="+allUserCount+",facilityCount="+facilityCount+
",allTaskCount="+allTaskCount+",autoTaskCount="+autoTaskCount+",doneTaskCount="+doneTaskCount+
",customFields="+customFields);

RspVo vo = new RspVo();
return vo;
}*/



/**
*
* 方法描述:
* 创建人:zhy
* 创建时间:2017年9月21日 下午2:45:38
* 修改时间:2017年9月21日 下午2:45:38
* Description: 任务更新接口
*
*/

@RequestMapping(value="tasksUpd")
@ResponseBody
public RspVo tasksUpdate(@RequestParam(required=true)String data){
RspVo vo = new RspVo();
log.info(" 任务更新接口接收到的参数为:data="+data);
if(StringUtils.nonEmptyString(data)){
try {
WebSocketService socketService = new WebSocketService(EventType.EventType_Tasks,data);
socketService.startLisener();
} catch (Exception e) {
log.error(e.getMessage(), e);
vo.setCode(PacketHead.JSON_ERR_SYS);
vo.setMsg("系统繁忙请稍候再试......");
return vo;
}
}
return vo;
}

/* @RequestMapping(value="tasksUpd2")
@ResponseBody
public RspVo tasksUpdate2(@RequestParam(required=true)String taskId,@RequestParam(required=true)String taskType,
@RequestParam(required=true)String taskStatus,@RequestParam(required=true)String beginTime,
@RequestParam(required=true)String acceptTime,@RequestParam(required=true)String endTime,
@RequestParam(required=true)String duration){
log.info("任务更新接口接收到的参数为:taskId="+taskId+",taskType="+taskType+",taskStatus="+taskStatus+",beginTime="+beginTime+
",endTime="+endTime+",acceptTime="+acceptTime+",duration="+duration);
RspVo vo = new RspVo();
return vo;
}*/


/**
*
* 方法描述:
* 创建人:zhy
* 创建时间:2017年9月21日 下午2:46:13
* 修改时间:2017年9月21日 下午2:46:13
* Description: 日志更新接口
*
*/

@RequestMapping(value="logsUpd")
@ResponseBody
public RspVo logsUpdate(@RequestParam(required=true)String data){
RspVo vo = new RspVo();
log.info(" 日志更新接口接收到的参数为:data="+data);
if(StringUtils.nonEmptyString(data)){
try {
WebSocketService socketService = new WebSocketService(EventType.EventType_Logs,data);
socketService.startLisener();
} catch (Exception e) {
log.error(e.getMessage(), e);
vo.setCode(PacketHead.JSON_ERR_SYS);
vo.setMsg("系统繁忙请稍候再试......");
return vo;
}
}
return vo;
}

/*@RequestMapping(value="logsUpd2")
@ResponseBody
public RspVo logsUpdate2(@RequestParam(required=true)String id,@RequestParam(required=true)String content) {
log.info("日志更新接口接收到的参数为:id="+id+",content="+content);
RspVo vo = new RspVo();
return vo;
}*/


/**
*
* 方法描述:
* 创建人:zhy
* 创建时间:2017年9月21日 下午2:46:13
* 修改时间:2017年9月21日 下午2:46:13
* Description: 状态更新接口
*
*/

@RequestMapping(value="statusUpd")
@ResponseBody
public RspVo statusUpdate(@RequestParam String data){
RspVo vo = new RspVo();
log.info(" 状态更新接口接收到的参数为:data="+data);
if(StringUtils.nonEmptyString(data)){
try {
WebSocketService socketService = new WebSocketService(EventType.EventType_Status,data);
socketService.startLisener();
} catch (Exception e) {
log.error(e.getMessage(), e);
vo.setCode(PacketHead.JSON_ERR_SYS);
vo.setMsg("系统繁忙请稍候再试......");
return vo;
}
}
return vo;
}
/**
*
* 方法描述:
* 创建人:zhy
* 创建时间:2017年9月21日 下午2:46:13
* 修改时间:2017年9月21日 下午2:46:13
* Description: 人员更新接口
*
*/

@RequestMapping(value="usersUpd")
@ResponseBody
public RspVo usersUpdate(@RequestParam(required=true)String data){
RspVo vo = new RspVo();
log.info(" 人员更新接口接收到的参数为:data="+data);
if(StringUtils.nonEmptyString(data)){
try {
WebSocketService socketService = new WebSocketService(EventType.EventType_Users,data);
socketService.startLisener();
} catch (Exception e) {
log.error(e.getMessage(), e);
vo.setCode(PacketHead.JSON_ERR_SYS);
vo.setMsg("系统繁忙请稍候再试......");
return vo;
}
}
return vo;
}


}

简单的说就是我通过HttpClient去三方A获取一个启动数据,然后三方A会不断调用我的接口给我传数据,我一接收到数据就穿给三方系统B

package com.spdb.hs.data.utils;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

public class ApiService{

/**
*
* @return 响应体的内容
* @throws IOException
* @throws ClientProtocolException
*/

public String doGet(String url) throws ClientProtocolException, IOException{

// 创建http GET请求
HttpGet httpGet = new HttpGet(url);
httpGet.setConfig(this.getConfig());//设置请求参数
CloseableHttpResponse response = null;
try {
// 执行请求
response = this.getHttpClient().execute(httpGet);
// 判断返回状态是否为200
if (response.getStatusLine().getStatusCode() == 200) {
String content = EntityUtils.toString(response.getEntity(), "UTF-8");
// System.out.println("内容长度:"+content.length());
return content;
}
} finally {
if (response != null) {
response.close();
}
//httpClient.close();
}
return null;
}

/**
* 带有参数的get请求
* @param url
* @return
* @throws URISyntaxException
* @throws IOException
* @throws ClientProtocolException
*/

public String doGet(String url , Map<String, String> params) throws URISyntaxException, ClientProtocolException, IOException{
URIBuilder uriBuilder = new URIBuilder(url);
if(params != null){
for(String key : params.keySet()){
uriBuilder.setParameter(key, params.get(key));
}
}//http://xxx?ss=ss
return this.doGet(uriBuilder.build().toString());
}
/**
* 带有参数的post请求
* @param url
* @param params
* @return
* @throws IOException
* @throws ClientProtocolException
*/

public HttpResult doPost(String url , Map<String, String> params) throws ClientProtocolException, IOException{


// 创建http POST请求
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(this.getConfig());
if(params != null){

// 设置2个post参数,一个是scope、一个是q
List<NameValuePair> parameters = new ArrayList<NameValuePair>(0);

for(String key : params.keySet()){
parameters.add(new BasicNameValuePair(key, params.get(key)));
}
// 构造一个form表单式的实体
UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters);
// 将请求实体设置到httpPost对象中
httpPost.setEntity(formEntity);
}

CloseableHttpResponse response = null;
try {
// 执行请求
response = this.getHttpClient().execute(httpPost);
// 判断返回状态是否为200
/*if (response.getStatusLine().getStatusCode() == 200) {
String content = EntityUtils.toString(response.getEntity(), "UTF-8");
System.out.println(content);
}*/

return new HttpResult(response.getStatusLine().getStatusCode(),EntityUtils.toString(response.getEntity(), "UTF-8"));
} finally {
if (response != null) {
response.close();
}
//httpClient.close();
}
}

public HttpResult doPostJson(String url , String json) throws ClientProtocolException, IOException{
// 创建http POST请求
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(this.getConfig());
if(StringUtils.isNotBlank(json)){
//标识出传递的参数是 application/json
StringEntity stringEntity = new StringEntity(json, ContentType.APPLICATION_JSON);
httpPost.setEntity(stringEntity);
}

CloseableHttpResponse response = null;
try {
// 执行请求
response =this.getHttpClient().execute(httpPost);
// 判断返回状态是否为200
/*if (response.getStatusLine().getStatusCode() == 200) {
String content = EntityUtils.toString(response.getEntity(), "UTF-8");
System.out.println(content);
}*/

return new HttpResult(response.getStatusLine().getStatusCode(),EntityUtils.toString(response.getEntity(), "UTF-8"));
} finally {
if (response != null) {
response.close();
}
//httpClient.close();
}
}

/**
* 没有参数的post请求
* @throws IOException
* @throws ClientProtocolException
*/

public HttpResult doPost(String url) throws ClientProtocolException, IOException{
return this.doPost(url, null);
}

private CloseableHttpClient getHttpClient(){
return HttpClientBuilder.create().setMaxConnTotal(200)
.setMaxConnPerRoute(100)
.build();
}

private RequestConfig getConfig(){
return RequestConfig.custom().setConnectionRequestTimeout(500)
.setSocketTimeout(30000)
.setConnectTimeout(5000)
.build();

}

}
package com.spdb.hs.data.utils;

public class HttpResult {

private int statusCode;
private String result;

public HttpResult(int statusCode, String result) {
super();
this.statusCode = statusCode;
this.result = result;
}

public int getStatusCode() {
return statusCode;
}

public void setStatusCode(int statusCode) {
this.statusCode = statusCode;
}

public String getResult() {
return result;
}

public void setResult(String result) {
this.result = result;
}


}

调试实例

Index.html

<html>

</html>
<script src="./socket.io.js"></script>
<script>
var socket = io.connect('http://10.112.5.60:9001');
</script>

socket.io.js这个需要自己去网上下载

在10.112.5.60这个IP的服务器上我们一起吧项目启动起来了,9001端口也开通了,那我我们现在要做的就是测试客户端连接上服务端后会有什么数据。效果如下:

基于Netty-Socket-io的无直接调用式的数据传输

图片表明测试成功。