1、Java NIO 是一种同步非阻塞的I/O模型
将多个IO的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下处理多个客户端请求。
NIO三个核心对象:通道(Channel)、缓冲区(Buffer)和选择器(Selector)
具体说就是Selector会不断轮询注册在其上的Channel,如果某个Channel上有新的TCP连接,读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectorKey可以获取就绪Channel的集合,进行后续I/O操作。
NIO单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。并且由于线程的节约,连接数大的时候因为线程切换带来的问题也随之解决,进而为处理海量连接提供了可能。
单线程处理I/O的效率确实非常高,没有线程切换,只是拼命的读、写、选择事件。但现在的服务器,一般都是多核处理器,如果能够利用多核心进行I/O,无疑对效率会有更大的提高。
package java.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路复用器、绑定监听端口 */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); // Channel主要用来读写网络上的数据的。打开ServerSocketChannel, // 用于监听客户端的连接,它是所有客户端连接的父管道 servChannel = ServerSocketChannel.open(); // 设置为非阻塞模式 servChannel.configureBlocking(false); // 绑定监听端口 8080 servChannel.socket().bind(new InetSocketAddress(port), 1024); /* * Selector会不断地轮询在其上的Channel,如果某个Channel上面有新的TCP * 连接接入、读和写事件,这个Channel就处于就绪状态 * * 注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件 */ servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } public void run() { while (!stop) { try { // This method performs a blocking selection operation. // It returns only after at least one channel is selected, //(只有在至少有一个事件就绪后才会进行返回,所以是阻塞的) // this selector's wakeup method is invoked, the current thread is // interrupted,or the given timeout period expires, whichever comes first. selector.select(1000); // 阻塞等待,休眠时间为1s Set<SelectionKey> selectedKeys = selector.selectedKeys(); /* * 当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey * 集合,通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作 */ Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { // 事件分发器,单线程选择就绪的事件。 // I/O处理器,包括connect、read、write等,这种纯CPU操作,一般开启CPU核心 // 个线程就可以。业务线程,在处理完I/O后,业务一般还会有自己的业务逻辑,有的还 // 会有其他的阻塞I/O,如DB操作,RPC等。只要有阻塞,就需要单独的线程。 handleInput(key); // 所以在这里其实最好是用其它的线程来处理,而不要影响了事件分发器线程 } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 处理新接入的请求消息 if (key.isAcceptable()) { // 接受一个新的客户端接入请求 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); // 设置客户端为异步非阻塞 sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { /* * 读取数据 * 读取到的字节数,返回值有以下有三种结果: * (1)大于0,读取到字节,对其进行解编码 * (2)等于0,没有读取到字节,南纺股份正常场景,忽略 * (3)-1 ,链路已经关闭,需要关闭SocketChannel,释放资源 */ SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 由于设置了SocketChannel为异步非阻塞的,所以它的read是非阻塞的 int readBytes = sc.read(readBuffer); if (readBytes > 0) { /* * 将缓冲区当前的limit设置为position,position为0,用于后续对缓冲区的读取操作。 * 然后根据缓冲区可读的字节个数创建字节数组,调用get()操作将缓冲区可读的字节数 * 组复制到新创建的字节数组中 */ readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else{ ; // 读到0字节,忽略 } } } } private void doWrite(SocketChannel channel, String response)throws IOException { /* * 由于SocketChannel是异步非阻塞的,并不能保证一次能够把所有需要发送的数据发送,此时会出现写半包问题。 * 需要注册写操作???,不断轮询Selector将没有发送完的bytebuffer发送完毕。可以通过byteBuffer的hasRemain() * 方法判断是否发送完毕。 */ if (response != null && response.trim().length() > 0) { // 将应答消息异步发送给客户端 byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // 将缓冲区中的字节数据发送 writeBuffer.put(bytes); writeBuffer.flip(); // 缓存区复位 channel.write(writeBuffer); } } }
Selector轮询是阻塞的,而真正的I/O是异步非阻塞的。
对于NIO来说,缓存可以使用DirectByteBuffer和HeapByteBuffer。如果使用了DirectByteBuffer,一般来说可以减少一次系统空间到用户空间的拷贝。但Buffer创建和销毁的成本更高,更不宜维护,一般用来读取大文件时使用。
参考文章:http://blog.****.net/szzt_lingpeng/article/details/50612018
2、Java Reactor模式 异步非阻塞IO
下图是Reactor的多线程模型
其特点如下:
(1)有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求
(2)网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送。
(3)一个NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题。
下图是主从Reactor的多线程模型。
由于单独一个Acceptor线程可能会存在性能不中的问题,所以需要主从Reactor模型。
服务端用于接收客户端的不再是一个单独的NIO线程,而是一个独立的NIO线程池。
3、NIO中, 如果不显式的调用System.gc()
那会出现什么问题?
DirectBuffer是分配在操作系统的内存中的,所以省去了应用程序到内核空间的拷贝,而HeapBuffer是分配到堆上的,所以便于垃圾回收。
DirectBuffer的GC规则与堆对象的回收规则是一样的,只有垃圾对象才会被回收,而判定是否为垃圾对象依然是根据引用树中的存活节点来判定。
如果DirectByteBuffer的空间够用,那么System.gc()是不会触发FullGC的。也就是说在空间不够用时,显示调用才能进行回收,如果不显式调用,那只能是抛出内存异常了。
在垃圾收集时,虽然虚拟机会对DirectMemory进行回收,但是DirectMemory却不像新生代和老年代那样,发现空间不足了就通知收集器进行垃圾回收,它只能等待老年代满了后FullGC,然后“顺便地”帮它清理掉内存中废弃的对象。否则,只能等到抛出内存溢出异常时,在catch块里调用System.gc()。
参考:http://blog.****.net/donsonzhang/article/details/46666353