- Posted by 微博@Yangsc_o
- 原创文章,版权声明:*转载-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 3.0
摘要
通过温故知新-快速理解Linux网络IO的回顾,我们了解Linux下网络编程的5种I/O模型&I/O多路复用,接下来回顾一下java中的I/O模型,包括BIO、NIO、AIO,为下一篇netty做铺垫。
传统的BIO编程
传统的BIO通信模型
问题
- 该模型最大的问题就是,客户端的线程个数和客户端的并发呈1:1的关系,线程切换将满满拖垮整个系统;
- 测试代码如下
---------- server ----------
@Log4j2
public class BioServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(9090);
while (true) {
log.info("-- serverSocket before accept --");
Socket socket = serverSocket.accept();
log.info("-- serverSocket end accept --");
new Thread(() -> {
try {
// 读内容
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String readLine = bufferedReader.readLine();
log.info("thread:{} client :{}", Thread.currentThread().getName(), readLine);
} catch (IOException e) {
log.error(e);
}
}).run();
}
}
}
---------- client ----------
@Log4j2
public class BioClient {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
Integer tmp = i;
new Thread(() -> {
try {
Socket socket = new Socket(InetAddress.getByName("127.0.0.1"), 9090);
OutputStream outputStream = socket.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write("client message index: " + tmp);
printWriter.flush();
log.info("index:{}", tmp);
} catch (Exception e) {
log.error(e);
}
}).run();
}
}
}
伪异步I/O编程
- 为了解决线程耗尽的问题,引入了线程池,没本质区别;
NIO编程
NIO库是在JDK1.4引进的,弥补了原来同步阻塞I/O的不足,先看一下通信模型,直观的感受一下;
先了解三个概念:缓冲区(buffer)、通道(channel)、多路复用器(Selector)
- 缓冲区(buffer)
在NIO厍中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。
类型的缓存有很多种,eg:
- 通道(channel)
channel是一个通道,网络数据通过channel读取和写入;通道是双向的,流是单项的,流只是在一个方向移动(输入、输出),通道是双工的,可以同时读写,这个跟Unix TCP socket也是一致的;
- Selector
- 从上面的图中,最重要的就是Selector,这就是java实现I/O多路复用的核心;
- 一个Selector可以轮询多个注册到Selector的channel,如果某一个channel发送读写事件,channel就处于了就绪状态,就会被Selector轮询出来,然后通过SelectionKey获取就是Channel就绪集合,由于JDK使用了epoll()代替了传统的select轮询,所以没有1024的句柄限制。
- 跟BIO和伪异步IO相比,只用一个线程负责轮询,就可以接入成千上完的客户端,所以打好基础建设多么的重要!!
- 流程
- 测试代码
--------- server -----
@Log4j2
public class NioServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9090));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
new Thread(() -> {
try {
while (true) {
if (selector.select(1) > 0) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
try {
handle(key, selector);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
} finally {
it.remove();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}).start();
}
private static void handle(SelectionKey key, Selector selector) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
log.info("new channel ...");
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String data = new String(bytes, "UTF-8");
log.info("data:{}", data);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
}
}
}
}
}
AIO编程
NIO 2.0引入了新的异步通的概念,提供了异步文件通道和异步套接字通道的实现。
- 通过juc的Future表示异步操作的结果
- 在执行异步操作时传入channels
- CompletionHandler接口实现类作为操作完成的回调。
它是真正的异步非阻塞的IO模型,对应UNIX网络编程重的事件驱动I/O,不需要Selector对注册的通道进行轮询。
- 测试代码
@Log4j2
public class AioServer {
public static void main(String[] args) throws Exception {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel
.open();
channel.bind(new InetSocketAddress(9090));
channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(final AsynchronousSocketChannel asynchronousSocketChannel, Void attachment) {
channel.accept(null, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
asynchronousSocketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result_num, ByteBuffer attachment) {
attachment.flip();
CharBuffer charBuffer = CharBuffer.allocate(1024);
CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
decoder.decode(attachment, charBuffer, false);
charBuffer.flip();
String data = new String(charBuffer.array(), 0, charBuffer.limit());
log.info("data:{}", data);
try {
asynchronousSocketChannel.close();
} catch (Exception e) {
log.info(e);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.info("read error");
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("accept error");
}
});
while (true){
Thread.sleep(1000);
}
}
}
几种IO模型的对比
netty
- 下一篇就轮到!
参考
-《Netty 权威指南》第二版 – 李林峰
-《netty实战》–何品