Java IO学习笔记六:NIO到多路复用

时间:2023-03-09 08:56:24
Java IO学习笔记六:NIO到多路复用

作者:Grey

原文地址:Java IO学习笔记六:NIO到多路复用

虽然NIO性能上比BIO要好,参考:Java IO学习笔记五:BIO到NIO

但是NIO也有问题,NIO服务端的示例代码中往往会包括如下代码:即:遍历所有的SocketChannel,获取能读写数据的客户端,当客户端数量非常多的时候,服务端要轮询所有连接的客户端拿数据(recv调用),很多调用是无意义的,这样会导致频繁的用户态切换成内核态,导致性能变差。

....
//遍历已经链接进来的客户端能不能读写数据
for (SocketChannel c : clients) {
int num = c.read(buffer);
if (num > 0) {
buffer.flip();
byte[] aaa = new byte[buffer.limit()];
buffer.get(aaa); String b = new String(aaa);
System.out.println(c.socket().getPort() + " : " + b);
buffer.clear();
}
}
...

多路复用技术可以解决NIO的这个问题,多个IO通过一个系统调用获得其中的IO状态,然后由程序对有状态的IO进行读写操作。在Linux系统中,多路复用的实现有:

  • 基于POSIX标准的SELECT
  • POLL (select只支持最大fd < 1024,如果单个进程的文件句柄数超过1024,select就不能用了。poll在接口上无限制)
  • EPOLL

其中SELECT和POLL类似,但是有一些区别,参考select和poll的区别

无论NIO,SELECT还是POLL,都是要遍历所有IO,询问状态,只不过遍历这件事到底是内核来做还是应用程序来做而已。

而epoll,可以看成是SELECT和POLL的增强,在调用select/poll时候,都需要把fd集合从用户态拷贝到内核态,但是epoll调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不做拷贝,而且epoll采用的是事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1)。

更多内容可以参考:

Java的Selector封装了底层epoll和poll的API,可以通过指定如下参数来调用执行的内核调用, 在Linux平台,如果指定

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider

则底层调用poll,

指定为:

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider

或者不指定,则底层调用epoll

源码参考:jdk8u-jdk

Java IO学习笔记六:NIO到多路复用

接下来,我们使用一套服务端代码,在Linux服务器上运行,分别指定底层用epoll和poll,并用strace来追踪其内核调用。

准备服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set; public class SocketMultiplexingV1 { private Selector selector = null;
int port = 9090; public void initServer() {
try {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
} public void start() {
initServer();
System.out.println("服务器启动了。。。。。");
try {
while (true) {
Set<SelectionKey> keys = selector.keys();
System.out.println(keys.size() + " size");
while (selector.select() > 0) {
//返回的有状态的fd集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
} public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(8192);
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");
} catch (IOException e) {
e.printStackTrace();
}
} public void readHandler(SelectionKey key) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read;
try {
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace(); }
} public static void main(String[] args) {
SocketMultiplexingV1 service = new SocketMultiplexingV1();
service.start();
}
}

和服务端代码在同一目录下准备一个脚本 SocketMultiplexingV1.sh

rm -rf ${1}*
/usr/local/jdk/bin/javac SocketMultiplexingV1.java
strace -ff -o $1 /usr/local/jdk/bin/java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.${1}SelectorProvider SocketMultiplexingV1

执行:

./SocketMultiplexingV1 Poll

底层调用Poll

重新打开一个控制台,通过nc工具连接这个服务端

nc localhost 9090

服务端可以正常接收到连接

[root@io io]# ./SocketMultiplexingV1.sh Poll
服务器启动了。。。。。
1 size
-------------------------------------------
新客户端:/0:0:0:0:0:0:0:1:39724
-------------------------------------------

暂时先不要发送数据,此时,查看服务端的进程:

[root@io io]# jps
1712 Jps
1659 SocketMultiplexingV1

查看服务端目前关联的文件描述符

[root@io io]# lsof -p 1659
...
java 1659 root 4u IPv6 25831 0t0 TCP *:websm (LISTEN)
...
java 1659 root 7u IPv6 22508 0t0 TCP localhost:websm->localhost:39724 (ESTABLISHED)

其中4u为服务端监听的Socket文件描述符,7u为新连接进来的客户端Socket文件描述符。

通过nc客户端给服务端发送一些数据,客户端也可以正常收到服务端返回的数据

[root@io io]# nc localhost 9090
sdfasdfasd
sdfasdfasd

接下来停掉服务端和客户端, 查看追踪日志

[root@io io]# ll
total 2444
-rwxr-xr-x. 1 root root 106 Jun 10 19:25 mysh.sh
-rw-r--r--. 1 root root 1714 Jun 12 16:35 OSFileIO.java
-rw-r--r--. 1 root root 9572 Jun 17 19:58 Poll.1659
-rw-r--r--. 1 root root 215792 Jun 17 19:58 Poll.1660
-rw-r--r--. 1 root root 1076 Jun 17 19:58 Poll.1661
-rw-r--r--. 1 root root 983 Jun 17 19:58 Poll.1662
-rw-r--r--. 1 root root 850 Jun 17 19:58 Poll.1663
-rw-r--r--. 1 root root 940 Jun 17 19:58 Poll.1664
-rw-r--r--. 1 root root 948 Jun 17 19:58 Poll.1665
-rw-r--r--. 1 root root 885 Jun 17 19:58 Poll.1666
-rw-r--r--. 1 root root 948 Jun 17 19:58 Poll.1667
-rw-r--r--. 1 root root 1080 Jun 17 19:58 Poll.1668
-rw-r--r--. 1 root root 124751 Jun 17 19:58 Poll.1669
-rw-r--r--. 1 root root 1245 Jun 17 19:58 Poll.1670
-rw-r--r--. 1 root root 1210 Jun 17 19:58 Poll.1671
-rw-r--r--. 1 root root 2416 Jun 17 19:58 Poll.1672
-rw-r--r--. 1 root root 27498 Jun 17 19:58 Poll.1673
-rw-r--r--. 1 root root 27326 Jun 17 19:58 Poll.1674
-rw-r--r--. 1 root root 27602 Jun 17 19:58 Poll.1675
-rw-r--r--. 1 root root 26866 Jun 17 19:58 Poll.1676
-rw-r--r--. 1 root root 1141 Jun 17 19:58 Poll.1677
-rw-r--r--. 1 root root 1953818 Jun 17 19:58 Poll.1678
-rw-r--r--. 1 root root 2204 Jun 17 19:58 Poll.1831
-rw-r--r--. 1 root root 3440 Jun 17 19:48 SocketMultiplexingV1.class
-rw-r--r--. 1 root root 3315 Jun 17 19:13 SocketMultiplexingV1.java
-rwxr-xr-x. 1 root root 199 Jun 17 19:19 SocketMultiplexingV1.sh

其中Poll.1678为主线程日志, 我们一一看下整个调用过程

...
2535 socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
...
2793 bind(4, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INE T6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
2794 listen(4, 50) = 0
...

以上两个调用对应了代码中建立Socket并绑定9090端口进行监听这个逻辑。

...
2772 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
...
2883 poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{f d=4, revents=POLLIN}])

以上调用对应了:

server.configureBlocking(false);

调用的poll方法表示一个新的文件描述符4u有POLLIN(POLLIN:There is data to read)的事件

...
2893 accept(4, {sa_family=AF_INET6, sin6_port=htons(39724), inet_pton(AF_ INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 7 ... 2935 poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=PO LLIN}], 3, -1) = 1 ([{fd=7, revents=POLLIN}])

这里说明接收了一个新的Socket连接,就是我们刚才用lsof看到的7u这个文件描述符。,调用了poll方法,说明一个新的文件描述符7u有POLLIN(POLLIN:There is data to read)的事件。

我们的代码中对于每次接收的客户端,也会把客户端设置为非阻塞,即:

client.configureBlocking(false);

对应的内核调用就是:

2926 fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK)    = 0

以上就是poll调用对应内核函数的调用。

接下来切换成epoll模式,重新执行脚本

[root@io io]# ./SocketMultiplexingV1.sh EPoll
服务器启动了。。。。。
1 size

用nc连接服务端

nc localhost 9090

服务端响应正常

[root@io io]# ./SocketMultiplexingV1.sh EPoll
服务器启动了。。。。。
1 size
-------------------------------------------
新客户端:/0:0:0:0:0:0:0:1:39726
-------------------------------------------

通过nc发送一些数据

[root@io io]# nc localhost 9090
asdfasd
asdfasd

也可以正常接收

接下来停掉服务端和客户端,查看主线程调用情况

[root@io io]# ll -h EPoll.*
-rw-r--r--. 1 root root 9.4K Jun 17 20:33 EPoll.2067
-rw-r--r--. 1 root root 212K Jun 17 20:33 EPoll.2068
-rw-r--r--. 1 root root 1.1K Jun 17 20:33 EPoll.2069
-rw-r--r--. 1 root root 983 Jun 17 20:33 EPoll.2070
-rw-r--r--. 1 root root 850 Jun 17 20:33 EPoll.2071
-rw-r--r--. 1 root root 983 Jun 17 20:33 EPoll.2072
-rw-r--r--. 1 root root 948 Jun 17 20:33 EPoll.2073
-rw-r--r--. 1 root root 983 Jun 17 20:33 EPoll.2074
-rw-r--r--. 1 root root 850 Jun 17 20:33 EPoll.2075
-rw-r--r--. 1 root root 1.1K Jun 17 20:33 EPoll.2076
-rw-r--r--. 1 root root 31K Jun 17 20:33 EPoll.2077
-rw-r--r--. 1 root root 1.4K Jun 17 20:33 EPoll.2078
-rw-r--r--. 1 root root 1.3K Jun 17 20:33 EPoll.2079
-rw-r--r--. 1 root root 2.4K Jun 17 20:33 EPoll.2080
-rw-r--r--. 1 root root 9.0K Jun 17 20:33 EPoll.2081
-rw-r--r--. 1 root root 8.7K Jun 17 20:33 EPoll.2082
-rw-r--r--. 1 root root 8.6K Jun 17 20:33 EPoll.2083
-rw-r--r--. 1 root root 8.2K Jun 17 20:33 EPoll.2084
-rw-r--r--. 1 root root 1.2K Jun 17 20:33 EPoll.2085
-rw-r--r--. 1 root root 400K Jun 17 20:33 EPoll.2086
-rw-r--r--. 1 root root 2.2K Jun 17 20:33 EPoll.2109
vi EPoll.2068

其中新建Socket,Bind 9090端口,设置非阻塞和Poll都是相同的调用

...
2539 socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
....
2776 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
....
2797 bind(4, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INE T6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
2798 listen(4, 50) = 0
....

但是一旦有新的连接进来

···
2852 epoll_create(256) = 7 ··· 2862 epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=140462610448389 }}) = 0 ···
2888 epoll_wait(7, [{EPOLLIN, {u32=4, u64=140462610448388}}], 4096, -1) = 1 ···

epoll_create1: 创建一个epoll实例,文件描述符

epoll_ctl: 将监听的文件描述符添加到epoll实例中,实例代码为将标准输入文件描述符添加到epoll中

epoll_wait: 等待epoll事件从epoll实例中发生, 并返回事件以及对应文件描述符

调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个 红黑树 用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件.

当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

源码:Github

参考资料:

深入理解 Epoll

Select、Poll、Epoll详解

select和poll的区别