netty学习(6)---NIO服务器端实现

时间:2022-09-05 11:59:00

NIO服务器创建的步骤:

步骤一:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道:

serverSocketChannel = ServerSocketChannel.open()

 步骤二:绑定监听端口,设置连接为非阻塞模式:

serverSocketChannel.configureBlocking(false);//设置连接为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024)

步骤三:创建Reactor线程,创建多路复用器并启动线程:

selector = Selector.open(); //创建多路复用器Selector
new Thread(new ReactorTask()).start();

步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件:

 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

步骤五:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key

步骤六:多路复用器接收到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路:

SocketChannel sc = ssc.accept(); //处理新的请求,完成TCP三次握手,建立物理链路

步骤七:设置客户端链路为非阻塞模式

 sc.configureBlocking(false); //设置客户端链路为非阻塞模式

步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息:

                //add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ); //将客户端连接注册到多路复用器上,监听读操作,用来监听客户端发送的网络消息
步骤九:异步读取客户端的请求消息到缓冲区:

int readBytes = sc.read(readBuffer);

步骤十:对ByteBuffer消息进行编码,如果有半包消息指针reset,继续读取后续的保温,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排.

步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端:

channel.write(writeBuffer);

下面用NIO实现TimeServer:


TimeServer

package com.panther.dong.netty.nio;

/**
* Created by panther on 15-8-26.
*/
public class TimeServer {

public static void main(String[] args) {
int port = 12306;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (Exception e) {

}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();

}
}

MultiplexerTimeServer:


package com.panther.dong.netty.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

/**
* Created by panther on 15-8-26.
*/
public class MultiplexerTimeServer implements Runnable {

private Selector selector;

private ServerSocketChannel serverSocketChannel;

private volatile boolean stop;

/**
* 初始化多路复用器、绑定监听端口
*
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open(); //创建多路复用器Selector
serverSocketChannel = ServerSocketChannel.open(); //打开ServerSocketChannel,用于监听客户端的连接
serverSocketChannel.configureBlocking(false);//设置连接为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);//绑定监听端口,设置连接为非阻塞模式
serverSocketChannel
.register(selector, SelectionKey.OP_ACCEPT); //将serversocketchannel注册到多路复用器Selector上,监听ACCEPT事件
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}

public void stop() {
this.stop = true;
}

@Override
public void run() {
//多路复用器无限轮询准备就绪的key
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
handleInput(key); //多路复用器处理ACCEPT事件
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
//处理新接入的请求消息
if (key.isAcceptable()) {
//Accept the new Connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept(); //处理新的请求,完成TCP三次握手,建立物理链路
sc.configureBlocking(false); //设置客户端链路为非阻塞模式
//add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ); //将客户端连接注册到多路复用器上,监听读操作,用来监听客户端发送的网络消息
}
if (key.isReadable()) {
//read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("the time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
new Date(System.currentTimeMillis()).toString() :
"BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
; //读到0字节,忽略
}
}
}
}

private void doWrite(SocketChannel channel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}