深入理解NIO(三)—— NIO原理及部分源码的解析

时间:2022-02-07 03:23:50

深入理解NIO(三)—— NIO原理及部分源码的解析

欢迎回到淦™的源码看爆系列

在看完前面两个系列之后,相信大家对NIO也有了一定的理解,接下来我们就来深入源码去解读它,我这里的是OpenJDK-8u60版本,建议大家也下一份放ide里和我一起看会比较好理解。(这里主要介绍Selector,Buffer第一篇有提到一点,Channel也不过是些Buffer的操作方法而已,这里就不提及了,大家感兴趣可以自己去看)

老哥行行好,转载和我说一声好吗,我不介意转载的,但是请把原文链接贴大点好吗

open()

// 1. 创建Selector
Selector selector = Selector.open();

首先我们来分析open方法:

// Selector
public static Selector open() throws IOException {
// 这里的静态方法provider会使用DefaultSelectorProvider.create();方法根据系统选择一个SelectorProvider
// windows平台的话是WindowsSelectorProvider,
// Linux平台是一个EPollSelectorProvider,这里主要分析Linux平台下的
// 之后openSelector方法(一会看下面)会返回一个EPollSelectorImpl作为Selector的实现,我们一般提及的Selector就是它了
return SelectorProvider.provider().openSelector();
} // EPollSelectorProvider
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

之后是EPollSelectorImpl的构造方法:

EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 其他的我也看不太懂,我们直接进去这个EPollArrayWrapper的构造方法
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
} // EPollArrayWrapper
EPollArrayWrapper() throws IOException {
// 直接看这里,这里调用了一个封装出来的Linux的api:epoll_create,这个东西大概可以理解成一个selector,详细的我们下一章再讲解
epfd = epollCreate();
}

所以其实Selector方法大抵上就是封装了一个epoll_create() 方法,当然还调用了一下epoll_ctl(),把serverchannel给注册进去。

register()

// 5. 将channel注册到selector上,监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

接下来我们分析把channel注册到Selector上的register方法

// SelectableChannel
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException
{
return register(sel, ops, null);
} // AbstractSelectableChannel
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
// 212行,剩下的删掉了
k = ((AbstractSelector)sel).register(this, ops, att); } // SelectorImpl
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
//生成SelectorKey来存储到hashmap中,一共之后获取
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
//attach用户想要存储的对象
k.attach(attachment);
//调用子类的implRegister方法,接下来进去这里
synchronized (publicKeys) {
implRegister(k);
}
//设置关注的option
k.interestOps(ops);
return k;
}
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
//获取Channel所对应的fd,因为在linux下socket会被当作一个文件,也会有fd
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
//调用pollWrapper的add方法,将channel的fd添加到监控列表中
pollWrapper.add(fd);
//保存到HashSet中,keys是SelectorImpl的成员变量
keys.add(ski);
}

调用register方法并没有涉及到EpollArrayWrapper中的native方法epollCtl的调用,这是因为他们将这个方法的调用推迟到Select方法中去了.

select()

// 获取可用channel数量
int readyChannels = selector.select();

接下来我们来分析select()方法

// SelectorImpl
public int select(long timeout)
throws IOException
{
. . . .
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
} // SelectorImpl
private int lockAndDoSelect(long timeout) throws IOException {
. . . .
return doSelect(timeout);
}
// EPollSelectorImpl
protected int doSelect(long timeout) throws IOException {
.....
try {
....
//调用了poll方法,底层调用了native的epollCtl和epollWait方法
pollWrapper.poll(timeout);
} finally {
....
}
....
//更新selectedKeys,为之后的selectedKeys函数做准备
int numKeysUpdated = updateSelectedKeys();
....
return numKeysUpdated;
}
// EPollArrayWrapper
int poll(long timeout) throws IOException {
// 这里面的实现就是调用epoll_ctl()方法注册先前在register方法中保存的Channel的fd和感兴趣的事件类型
updateRegistrations();
// 这里是调用epollWait方法等待感兴趣事件的生成,导致线程阻塞
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
. . . .
}

上面提到的epollCtl和epollWait方法在下一章我们会详细讲,这里先不讲。

总之我们可以知道Selector其实就是封装了Linux提供的api而已,也就是epollCreateepollCtlepollWait方法。

selectedKeys()

// 获取可用channel的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();

接下来我们来看看selectedKeys()方法:

// SelectorImpl
//是通过Util.ungrowableSet生成的,不能添加,只能减少
private Set<SelectionKey> publicSelectedKeys;
public Set<SelectionKey> selectedKeys() {
....
return publicSelectedKeys;
}

很奇怪啊,怎麽直接就返回publicSelectedKeys了,难道在select函数的执行过程中有修改过这个变量吗?publicSelectedKeys这个对象其实是selectedKeys变量的一份副本,你可以在SelectorImpl的构造函数中找到它们俩的关系,我们再回头看一下select中updateSelectedKeys方法:

private int updateSelectedKeys() {
//更新了的keys的个数,或在说是产生的事件的个数
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
//对应的channel的fd
int nextFD = pollWrapper.getDescriptor(i);
//通过fd找到对应的SelectionKey
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
//更新selectedKey变量,并通知响应的channel来做响应的处理
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
// 这里加进去
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}

不知道大家有没有留意到,如果我们不先调用select(),直接selectedKeys()是不会获得任何Channel的,因为里面没有更新publicSelectedKeys的方法

还有一点是,publicSelectedKeys是selectedKeys的引用,所以我们获得的是它的引用,而不是每次返回一个新对象,这个引用里面的Channel我们处理完后要记得remove掉,不然下次还是会返回给你的。

顺便一提这里publicSelectedKeys是采用 publicSelectedKeys = Util.ungrowableSet(selectedKeys); 的方式创建出来的,这个方法创建出来的set如方法名ungrowableSet,是不能调用add方法的,只能remove

为什么Netty自己又从新实现了一边native相关的NIO底层方法? 听听Netty的创始人是怎麽说的吧链接

因为Java的版本使用的epoll的LT模式,而Netty则希望使用ET模式(详情看第四篇的两种触发模式),而且Java版本没有将epoll的部分配置项暴露出来,比如说TCP_CORK和SO_REUSEPORT。

下一篇:epoll的实现原理

深入理解NIO(三)—— NIO原理及部分源码的解析


参考资料:

https://segmentfault.com/a/1190000017798684?utm_source=tag-newest