实现PHP服务器+Android客户端(Retrofit+RxJava)第六天推送的实现

时间:2022-09-05 15:25:46

废话不多说,今天来说说近几天的成果。

  1. 如何实现推送
  2. android客户端nio的使用
  3. 为啥要使用长连接
  4. 长连接的建立
  5. php服务器如何实现长连接
  6. 后期需要优化的部分

如何实现推送

推送的原理其实也很简单,服务器和客户端实现长连接,实现了长连接之后就想干嘛干嘛了。

android客户端nio的使用

如果对nio不了解,大家可以自己再去了解,我这里简单说下:你就是非阻塞的io,相对于我们一般使用的bio来说使用可以更加的灵活吧,nio是事件驱动的,也就是可以读取的时候读取的事件响应,可写的时候写的事件响应,在读写的时候是非阻塞的,也就是如果有100个字节要读取,它不会等到你全部读完再执行下一句,他会立马返回,这个时候你可能只读了50个字节,我们要做的事情就是在下一次读事件来的时候把另50个字节读出来。nio一般是用在服务器这边的,它对高并发的支持比较好,客户端的话就没有那么大的意义,不过我这里还是在客户端用了(具体是因为我还不知道怎么再php服务器上用nio,只能在客户端这里先练练手),要把nio讲清楚的话可以讲一整篇,大家自己去慢慢理解,坑还是有的。

为啥要使用长连接

看过我这个系列之前的几篇博客的人应该都知道我这个app使用的基本都是http协议,也就是短连接去请求数据,其实看我这里用的retrofit就知道是http协议,其实我的app写下来基本http协议就已经可以满足我的要求,那为什么还要长连接呢,老实说,我只是看了这篇博客携程App的网络性能优化实践,有两点收获,一个就是网络服务多使用tcp连接,其次就是在业务逻辑多的时候用hybrid(web 和native混合),web就是直接用webview请求html5页面,native就是使用android原生控件写的界面,hybrid好处很多,但是web部分的效果不是那么理想,毕竟只是一个页面,无法达到android原生控件的性能。扯了那么多其他的,进入正题,其实我自己现在总结实现长连接的好处就是服务器可以推动信息给客户端,而且http协议也是tcp连接,而一次连接的建立需要三次握手,消耗还是比较大的,长连接就可以减少http的这种建立连接和断开连接的操作。

长连接的建立

上面已经说了使用nio,另外还有一点需要注意的是需要一个心跳去维护这个长连接,心跳就是每个一段时间发送消息给服务器,服务器返回信息,确保这个长连接没有中断。其实最好还得有一个长连接池,进一步减少建立连接的消耗,我这里只实现了如何用nio实现长连接,还没有实现长连接池,心跳的话明白什么意思之后还是挺简单的,就自己去实现,下面的主要代码:

package com.sqq.tcpdemo.nioconnect;

import android.util.Log;
import com.sqq.tcpdemo.rxbus.ConnectedEvent;
import com.sqq.tcpdemo.rxbus.DisConnectEvent;
import com.sqq.tcpdemo.rxbus.GetResponseEvent;
import com.sqq.tcpdemo.rxbus.RxBus;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.AccessControlException;
import java.util.Iterator;

/**
* Created by sqq on 2016/8/2.
* rxbus+nio
*/

public class Client implements Runnable {

static {
try {
// Needed for NIO selectors on Android 2.2.
// 其实就是防止读取到ipv6的地址
System.setProperty("java.net.preferIPv6Addresses", "false");
} catch (AccessControlException ignored) {
}
}

final int connectTimeout;
final InetAddress connectHost;
final int connectTcpPort;
final ByteBuffer readBuffer, writeBuffer;

private Selector selector;
private SelectionKey selectionKey;
private SocketChannel clientChannel;

private int emptySelects;
// private boolean isClientClose = false;
private boolean isConnected;

public Client(Client.Builder builder) {

connectTimeout = builder.timeout;
connectHost = builder.host;
connectTcpPort = builder.tcpPort;
writeBuffer = ByteBuffer.allocate(builder.writeBufferSize);
readBuffer = ByteBuffer.allocate(builder.objectBufferSize);
isConnected = false;
//isClientClose = false;

}

public void Connect() {
Thread clientThread = new Thread(this);
clientThread.setDaemon(true);
clientThread.start();
}

@Override
public void run() {
try {
clientChannel = SocketChannel.open();

Socket socket = clientChannel.socket();
//确保数据及时发出
socket.setTcpNoDelay(true);
//表示发送请求之后10s内没有接受到数据就断开客户端
//socket.setSoTimeout(10000);
//下面两句不能改变顺序,为了方便就直接是阻塞模式连接,连接之后再设置非阻塞模式
socket.connect(new InetSocketAddress(connectHost, connectTcpPort), connectTimeout);
//boolean connect = clientChannel.connect(new InetSocketAddress(connectHost, connectTcpPort));
clientChannel.configureBlocking(false);

try {
selector = Selector.open();
} catch (IOException ex) {
throw new RuntimeException("Error opening selector.", ex);
}


if (socket.isConnected()) {
//已经连接
selectionKey = clientChannel.register(selector, SelectionKey.OP_READ);
isConnected = true;
RxBus.getRxBus().send(new ConnectedEvent());
} else {
selectionKey = clientChannel.register(selector, SelectionKey.OP_CONNECT);
}
} catch (IOException e) {
e.printStackTrace();
Log.d("Client", e.toString());
//建立连接失败,这里应该返回,或者重试之类的
return;
}


try {
checkSelector();
} catch (IOException e) {
e.printStackTrace();
Log.d("Client", e.toString());
return;
}
}

public void checkSelector() throws IOException {
while (isConnected) {
selector.select();
// 获得selector中选中的项的迭代器
Iterator ite = selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key,以防重复处理
ite.remove();
// 连接事件发生
if (key.isConnectable()) {
//这里其实不会走到这一步,前面的连接时阻塞的
connected(key);

} else if (key.isReadable()) {
canRead(key);

} else if (key.isWritable()) {
canWrite(key);

}
}
}
}

public void send(String message) {
Log.d("Client", message);
try {
writeBuffer.put(message.getBytes("UTF-8"));
writeBuffer.flip();

selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
selector.wakeup();

} catch (UnsupportedEncodingException e) {
e.printStackTrace();
Log.d("Client","解析失败");
}


}

private void connected(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key
.channel();
// 如果正在连接,则完成连接
if (channel.isConnectionPending()) {
channel.finishConnect();
}
// 设置成非阻塞
channel.configureBlocking(false);
}

private void canRead(SelectionKey key) throws IOException{
Log.d("Client", "read");
SocketChannel channel = (SocketChannel) key
.channel();
int read = channel.read(readBuffer);
if (read == -1) {
Log.d("Client", "客户端已经关闭");
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
RxBus.getRxBus().send(new DisConnectEvent());
return;
}
byte[] data = readBuffer.array();
String msg = new String(data).trim();
RxBus.getRxBus().send(new GetResponseEvent(msg));
Log.d("Client", "服务端收到信息:" + msg);

}

private void canWrite(SelectionKey key) throws IOException{
Log.d("Client", "write");
SocketChannel channel = (SocketChannel) key
.channel();
//channel.write(ByteBuffer.wrap(new String("客户端:1").getBytes()));
//判断当前位置到上界(limit)是否还有数据没有写到通道
while(writeBuffer.hasRemaining()){
channel.write(writeBuffer);
writeBuffer.compact();
writeBuffer.flip();
}
writeBuffer.clear();
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
}

public void close() {
isConnected = false;
try {
if(selector!=null)
selector.close();
if(clientChannel!=null)
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}

}


public static final class Builder {
int writeBufferSize;
int objectBufferSize;
/**
* 默认5s连接超时
*/

int timeout;
InetAddress host;
int tcpPort;

public Builder() {
writeBufferSize = 8192;
objectBufferSize = 2048;
timeout = 5000;
host = null;
tcpPort = -1;
}

public Client.Builder setHost(String host) {
try {
this.host = InetAddress.getByName(host);
} catch (UnknownHostException e) {
e.printStackTrace();
}
return this;
}

public Client.Builder setTcpPort(int tcpPort) {
this.tcpPort = tcpPort;
return this;
}

public Client build() {
return new Client(this);
}
}
}

demo地址:demo地址
大家把demo下载下来之后,可以先写个java服务器先测试一下,代码如下:

`package cn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIO服务端
*/
public class NIOServer {
//通道管理器
private Selector selector;

/**
* 获得一个ServerSocket通道,并对该通道做一些初始化的工作
* @param port 绑定的端口号
* @throws IOException
*/
public void initServer(int port) throws IOException {
// 获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverChannel.configureBlocking(false);
// 将该通道对应的ServerSocket绑定到port端口
serverChannel.socket().bind(new InetSocketAddress(port));
// 获得一个通道管理器
this.selector = Selector.open();
//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
//当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}

/**
* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void listen() throws IOException {
System.out.println("服务端启动成功!");
// 轮询访问selector
while (true) {
//当注册的事件到达时,方法返回;否则,该方法会一直阻塞
selector.select();
// 获得selector中选中的项的迭代器,选中的项为注册的事件
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key,以防重复处理
ite.remove();
// 客户端请求连接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 获得和客户端连接的通道
SocketChannel channel = server.accept();
// 设置成非阻塞
channel.configureBlocking(false);

//在这里可以给客户端发送信息哦
channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
//在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ);

// 获得了可读的事件
} else if (key.isReadable()) {
read(key);
}

}

}
}
/**
* 处理读取客户端发来的信息 的事件
* @param key
* @throws IOException
*/
public void read(SelectionKey key) throws IOException{
// 服务器可读取消息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("服务端收到信息:"+msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);// 将消息回送给客户端
}

/**
* 启动服务端测试
* @throws IOException
*/
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8000);
server.listen();
}

}
`
上面的代码摘自Java NIO原理图文分析及代码实现 ,写完之后自己再去配置一下端口号、ip之类的就可以运行

php服务器如何实现长连接

上面的例子运行成功之后最后再来说说怎么用php实现长连接,php建立长连接的代码网上有一堆,这里我也贴一个:

 <?php 
//确保在连接客户端时不会超时
set_time_limit(0);

//设置IP和端口号
$address='10.0.3.114';
$port=13448; //调试的时候,可以多换端口来测试程序!

//创建一个SOCKET
if(($sock=socket_create(AF_INET,SOCK_STREAM,SOL_TCP))<0)
{
echo "socket_create() failed reason:".socket_strerror($sock)."<br>";
}

//绑定到socket端口
if(($ret=socket_bind($sock,$address,$port))<0)
{
echo "socket_bind() failed reason:".socket_strerror($ret)."<br>";
}

//开始监听
if(($ret=socket_listen($sock,4))<0)
{
echo "socket_listen() failed reason:".socket_strerror($ret)."<br>";
}

do {
if (($msgsock = socket_accept($sock)) < 0)
{
echo "socket_accept() failed: reason: " . socket_strerror($msgsock) . "/n";
echo "/nThe Server is Stop……/n";
break;
}

//发到客户端
$msg ="Welcome To Server!";
socket_write($msgsock, $msg, strlen($msg));
ob_flush();
flush();
socket_close($msgsock);

echo "/nThe Server is running……/n";
printf("/nThe Server is running……/n");
} while (true);

socket_close($sock);

上面的代码只是简单的例子,并没有建立长连接,发送了消息之后就关闭了连接。
其次需要注意的就两点,一点是需要先在php.ini这个配置文件中打开socket的支持,具体做法:在php.ini找到extension=php_sockets.dll一句将前面的;去掉,还有一点要注意的就是ip地址,如果你是在本机上做的测试只需要写127.0.0.1就可以,如果是和android客户端(不是模拟器)建立连接就需要写本机的ip。
最后要说的是,我自己服务器端的实现是用了workerman框架,还是比较好用的

后期需要优化的部分

大家下载我的demo之后,主要看Client.java就可以,里面已经实现长连接池,但总觉得有问题,还需要测试,心跳的话已经在服务器端做了(框架中自带了)。其次就是数据解析,我一直在考虑使用json还是用其他的,目前我用的是json解析,解析之后再做二次判断,有些麻烦。目前整个项目我也已经开源。地址:我的项目