Java NIO三大组件Buffer、Channel、Selector保姆级教程 附聊天室核心代码

时间:2021-04-12 01:15:25


1、NIO简介

NIO(non-blocking IO或new IO),是同步非阻塞IO.

关于BIO、NIO和AIO的原理与区别请看我写的这篇文章:BIO、NIO、AIO三种IO模型的理解 本文只介绍NIO三大组件的原理及使用。

NIO三大组件分别是:Buffer、Channel、Selector,是JDK1.4提供的新API,NIO相关的类都放在java.nio包下,且对java.io包下BIO相关的很多类进行了改写。

NIO是面向缓冲区(或面向块的),而BIO是面向流(字符流或字节流)的。数据读取到一个它稍后处理的缓冲区(本质是一个内存块,基于数组实现),需要时可在缓冲区前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。

而BIO是基于事件的,当有连接请求、或读事件时才会处理,否则就阻塞在那里。

NIO的非阻塞模式:

  • 非阻塞读:一个线程从某通道读取数据时,仅能读到目前准备好的数据,如果数据还没准备好,就立即返回,该线程可以继续做别的事情,而不是像BIO那样保持线程阻塞;
  • 非阻塞写:一个线程写入一些数据到某通道时,先将数据写到缓冲区,等到数据可写时,再将缓冲区的数据写到通道。而BIO调用了write之后就阻塞在这里,即使数据不可写也要一直等待,直到数据完全写出去。

2、三大组件的关系

下面来看看三大组件的关系图:

Java NIO三大组件Buffer、Channel、Selector保姆级教程 附聊天室核心代码


可以看到,服务端仅仅只有一个线程,而这个线程里面维护着一个selector,一个selector可以同时处理多个客户端连接。channel可以理解为服务端与客户端传输数据的通道。

而buffer是缓冲区。前面说到NIO是面向缓冲区的,什么意思呢?客户端写数据时总是先将数据写到缓冲区,然后再从缓冲区写到channel,而读数据时恰恰是相反的,channel先将数据读到缓冲区,而客户端直接从缓冲区中读取数据。由此可见,buffer是双向的,既可以读,又可以写,而面向流的BIO则是单向的,一般都分为输入流和输出流两种,要么只能读,要么只能写。

上图就是NIO的多路复用模式。selector可以称作选择器,或者多路复用器,它也是多路复用的关键。

下面再复述一遍对应关系:

  • 每个channel都会对应一个buffer(其实channel两头都有一个buffer,上图没有画出channel服务端那头的buffer),一个线程有一个selector,一个selector对应多个channel。

多个channel首先会注册到一个selector上面,selector会使用轮询机制,遍历这些channel,当哪个channel有事件(具体的事件类型有可接收accept、已连接connect、可读readable、可写writeable)发生了,就停下来处理这个channel。

3、Buffer入门实例

前面说到,Buffer本质是一个内存块,基于数组实现,Buffer是一个抽象类,七个基本类型(除了boolean)都有Buffer的子类,最常用的是ByteBuffer,因为网络数据都是以字节方式传输的。

上面也说到,buffer既可以读又可以写,但是读写要做切换,也就是说,如果buffer当前模式为写模式,则要显式切换到读模式才可以读数据,反之亦然。

下面以ButeBuffer为例,演示一下Buffer的使用。

// 创建新的buffer,大小是1024个字节
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 创建之后是写模式,因此可以直接写数据
for (int i = 0; i < 10; i++) {
    buffer.put((byte) i);
}
// 调用flip方法切换到读模式
buffer.flip();
// 循环读取
while (buffer.hasRemaining()){
    System.out.println(buffer.get());
}

4、Buffer核心原理

知道如何简单使用buffer之后,下面就来探究一下buffer的底层原理。

buffer的原理主要在于它的四个属性

  • mark:在缓冲区操作过程当中,可以将当前的 position的值临时存入mark属性中,需要的时候再取出,恢复到之前的position,重新从之前的position位置开始处理。调用mark()方法来设置mark=position,再调用reset()可以让 position恢复到 mark标记的位置,即 position=mark
  • position:位置,读或写都会改变位置
  • limit:表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作,极限可以修改
  • capacity:容量,在缓冲区创建时设定且不可修改,比如前面创建的缓冲区容量就是1024且不可修改

mark属性和capacity已经介绍的很详细了,mark属性一般情况下不需要用它,而capacity就是容量在创建buffer时就已经固定了,因此下面重点介绍其它属性。

一个刚创建的buffer是写模式,比如ByteBuffer.allocate(10),此时position=0,limit=10,如下图所示。

Java NIO三大组件Buffer、Channel、Selector保姆级教程 附聊天室核心代码

limit表示读或写的极限,由于当前是写模式,因此可以写10个数据,且每写入一次数据,position就会加1。如果写第11个数据,则会报错。

limit是可以修改的,比如改成5,则最多只能放5个数据,写入第6个时就会报错,如下图所示。

Java NIO三大组件Buffer、Channel、Selector保姆级教程 附聊天室核心代码


当调用flip方法时,会从写模式切换到读模式。我们看看flip方法做了什么。

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

我们假设一下,新创建的buffer容量为10,且设置了limit=8,那么当写了5个数据后(未写满)就调用一下flip方法,调用之前limit=8,position=5,调用之后limit=5,而position重置为0,此时已经进入读模式,且会读取0~5之间的数据,因为你只写了5个啊。

以上的分析就是position和limit属性的含义,它会控制读或写的区间永远在position到limit的之间。

如何从写模式切换到读模式我们已经知道了,那么如何从读模式切换到写模式呢?答案是调用clear方法或compact方法。下面看一下clear方法做了什么。

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

其这么做的含义你们可以自行研究,前面道理已经讲得比较清楚了。

下面是读写模式切换方式图。

Java NIO三大组件Buffer、Channel、Selector保姆级教程 附聊天室核心代码

5、Channel核心原理与实战

常见的channel有如下几种:

  • FileChannel(实现为FileChannelImpl)
  • DatagramChannel(用于UDP)
  • ServerSocketChannel(用于TCP,实现为ServerSocketChannelImpl)
  • SocketChannel(用于TCP,实现为SocketChannelImpl)

下面主要演示一下FileChannel的使用。

FileChannel,顾名思义,是用于处理文件输入输出的。

向本地文件写数据代码示例:

FileOutputStream out = new FileOutputStream("E:/tmp/nio.txt");
FileChannel channel = out.getChannel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("测试nio往文件写数据".getBytes());
// 需要切换为读模式,因为下面调用write方法时相当于从buffer里面读数据
buffer.flip();
// 向channel写数据
int len = channel.write(buffer);
System.out.println("字节数:"+len);

out.close();

这里用到了BIO的类FileOutputStream,说明NIO也是基于BIO的。

从本地文件读数据代码示例:

FileInputStream in = new FileInputStream("E:/tmp/nio.txt");
FileChannel channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(8);

int len = -1;
List<Byte> list = new ArrayList<>();
byte[] bytes = new byte[8];
// 循环读取数据
while ((len=channel.read(buffer))!=-1){
	// 下面要从buffer中读数据,因此切换为读模式
    buffer.flip();
    buffer.get(bytes,0,len);
    for (int i = 0; i < len; i++) {
        list.add(bytes[i]);
    }
    // 下一个循环需要先向buffer写数据,因此切换为写模式
    buffer.clear();
}
in.close();
// 转为byte数组
byte[] resBytes = new byte[list.size()];
for (int i = 0; i < list.size(); i++) {
    resBytes[i] = list.get(i);
}
// 以字符串形式打印
System.out.println(new String(resBytes));

文件拷贝代码示例:

FileInputStream in = new FileInputStream("E:/tmp/nio.txt");
FileOutputStream out = new FileOutputStream("E:/tmp/nio2.txt");
FileChannel src = in.getChannel();
FileChannel dst = out.getChannel();

src.transferTo(0,src.size(),dst);
// 或dst.transferFrom(src,0,src.size());

in.close();
out.close();

transferTo和transferFrom方法底层是零拷贝机制,会用到堆外物理内存,谨慎使用。关键零拷贝后面会有文章介绍,请耐心等待。

6、Selector原理与聊天室代码

前面已经介绍过selector了,它是多路复用的核心,主要用在网络编程模式中,服务端采用多路复用IO提升并发处理效率。

下面介绍一下selector的API。

selector几个核心方法:

  • select:注册的通道上至少有一个通道有事件就返回,并将产生事件的selectionKey加入到内部集合,否则一直阻塞
  • select(timeout):超过timeout就返回
  • selectNow:立马返回,不阻塞
  • selectedKeys:获取所有有事件发生的selectionKey,每个selectionKey对象聚合了一个selectableChannel对象
  • keys :获取注册在selector上面所有的selectionKey
  • wakeup:唤醒selector,比如调用了select方法,就会阻塞。如果selector正在阻塞中,则调用wakeup方法会唤醒,否则wakeup就在下次阻塞时生效

通过阅读SelectorImpl类(Selector的实现)的源码发现,keys方法和selectedKeys方法分别返回的是SelectorImpl的publicKeys属性和publicSelectedKeys属性,且这两个属性都是Set<SelectionKey>类型

SelectionKey类内部维护的几个事件常量:

  • SelectionKey.OP_ACCEPT:可接收的
  • SelectionKey.OP_CONNECT:已连接
  • SelectionKey.OP_READ:可读
  • SelectionKey.OP_WRITE:可写

如何创建一个Selector呢?答案是调用Selector.open()方法。

服务端多路复用IO核心流程:

  1. 服务端绑定一个端口(比如8888),开始监听
  2. 将ServerSocketChannel注册到Selector上,服务端线程开始调用selector的select方法,进入阻塞
  3. 一旦发生了事件,select方法苏醒,通过调用selector的selectedKeys方法获取有事件发生的selectionKey,并遍历这些selectionKey对象,拿到每个selectionKey对象维护的Channel,根据它们响应的事件做不同处理
  4. 如果发生的是Accept事件,则调用ServerSocketChannel的accept方法获取SocketChannel,并将SocketChannel注册到Selector上
  5. 如果发生的是Read事件,则从SocketChannel中读数据 如果发生的是Write事件,则往SocketChannel中写数据
  6. 如果发生的是Connect事件,则说明连接已建立

下面就来实现上述流程中的关键步骤。

服务端绑定端口:

// 1.构造一个ServerSocketChannel,并将其设为非阻塞模式
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2.绑定端口号
serverSocketChannel.bind(new InetSocketAddress("localhost",6666));

通用事件处理模型:

Selector selector = Selector.open();
// 将通道注册到reactor上,并只关注accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 服务端开始无限循环处理与客户端的通信
for(;;){
	// 阻塞等待,也可以使用selectNow非阻塞等待或select(timeout)超时等待
	selector.select();
	// 获取本次有事件发生的SelectionKey集合
	Set<SelectionKey> selectionKeys = selector.selectedKeys();
	// 遍历
	Iterator<SelectionKey> iterator = selectionKeys.iterator();
	while (iterator.hasNext()){
    	SelectionKey selectionKey = iterator.next();
    	// 处理相应事件。因为这里是服务端代码,所以就没必要判断是否为Connect事件了
    	if(selectionKey.isAcceptable()){
        	// 接收新连接逻辑
    	}else if(selectionKey.isReadable()){
        	// 读通道数据逻辑
    	}else if(selectionKey.isWritable()){
        	// 往通道写数据逻辑
    	}
    	// 最后需要移除,防止重复处理
    	iterator.remove();
	}
}

这里说明一下:ServerSocketChannel是专门接收连接的,因此它的有效事件是OP_ACCEPT,而SocketChannel才是用于服务端和客户端之间数据传输的,因此它的有效事件有OP_READ、OP_WRITE和OP_CONNECT。

OP_CONNECT比较特别,它表示客户端与服务端连接已建立,该事件发生于客户端的SocketChannel。为什么这么说呢,因为服务端也有一个SocketChannel对象。

register方法是SelectableChannel类的方法,前面说到的FileChannel不是该类的子类,因为FileChannel无法注册到Selector上面。

接收新连接,并注册到Selector:

// 通过事件,已经确定有新连接进来了,因为这里并不会阻塞
SocketChannel socketChannel = serverSocketChannel.accept();
// 设置非阻塞模式
socketChannel.configureBlocking(false);
// 可以打印一句话,表示有新的客户端已上线
System.out.println(String.format("客户端上线[%s]",socketChannel.getRemoteAddress().toString().substring(1)));
// 新的socketChannel注册到Selector上面,并主要关心Read事件
// 第三个参数为与之绑定的缓冲区,可以将读取的数据放到该缓冲区中
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));

读事件发生,通道可读,读取数据:
下面是读数据的核心代码,这个很重要。

SocketChannel channel = (SocketChannel)selectionKey.channel();
// 拿到与之绑定的缓冲区
ByteBuffer buffer = (ByteBuffer)selectionKey.attachment();
// 先清空一下,因为可能有上一次的残留数据
buffer.clear();
// 当客户端主动关闭连接时,read方法会抛出异常,此时需要try catch掉,避免服务端因异常挂掉
try {
    // 将channel中的数据读到buffer中,对于buffer是写,因为需要调用clear方法切换到写模式
    channel.read(buffer);
} catch (IOException e) {
    System.out.println(String.format("客户端下线[%s]",channel.getRemoteAddress().toString().substring(1)));
    selectionKey.cancel();
    channel.close();
    continue;
}
// 将收到的消息转为string
String message = new String(buffer.array(), 0, buffer.position());

服务端收到客户端发送的消息之后该怎么办呢?
如果是群聊,则将消息转发给其它所有的客户端,也就是将message写到除自己之外的其它所有的SocketChannel。如果是私聊,则需要往指定的那个SocketChannel里写数据。这里可以任你发挥。

写事件发生,通道可写:

SocketChannel channel = (SocketChannel)selectionKey.channel();
// 拿到与之绑定的缓冲区,此时缓冲区是有数据的
ByteBuffer att = (ByteBuffer) selectionKey.attachment();
// 需要切换到读模式,因为往channel里写数据对于buffer而言是读
att.flip();
// 写数据
channel.write(att);
// 写完之后记得要取消write事件,否则CPU空转、利用率达到100%
selectionKey.interestOps(selectionKey.OP_READ);

好了,服务端的核心代码就写到这里了,喜欢本文的朋友们请点个关注,谢谢!