[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

时间:2023-03-09 00:33:47
[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Unsafe 是channel的内部接口, 负责跟socket底层打交道。从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void deregister(ChannelPromise promise);
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
public interface NioUnsafe extends Unsafe {
SelectableChannel ch();
void finishConnect();
void read();
void forceFlush();
}
}

NioSocketChannelUnsafe 继承关系为: NioSocketChannelUnsafe -> NioByteUnsafe -> AbstractNioUnsafe -> AbstractUnsafe

AbstractUnsafe:负责socket 链路绑定、接受、关闭,数据fush操作

每个操作大概分四个阶段处理

        @Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
//执行前检查
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
} boolean wasActive = isActive();
//调用实现
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
} //调用业务,通知pipeline
if (!wasActive && isActive()) {
invokeLater(()-> pipeline.fireChannelActive(););
}
//完成阶段处理
safeSetSuccess(promise);
}
 @Override
public final void flush() {
assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
} outboundBuffer.addFlush();
flush0();
} @SuppressWarnings("deprecation")
protected void flush0() {
//刚完成Flush操作
if (inFlush0) {
return;
} final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
} inFlush0 = true; //发送数据前链路检查
if (!isActive()) {
try {
if (isOpen()) {
//true 通知 handler channelWritabilityChanged方法
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
} try {
//调用channel实现
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}

AbstractNioUnsafe:是NioUnsafe接口模板类,简单的包装

NioByteUnsafe:主要对NioUnsafe接口 read操作实现

NioSocketChannelUnsafe:只是简单的包装,最终公开给内部使用

NioByteUnsafe read方法

      public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config); ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//填充byteBuf 调用channel实现
int size = doReadBytes(byteBuf);
//记录最后读取长度
allocHandle.lastBytesRead(size);
//链路关闭,释放byteBuf
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
//自增消息读取处理次数
allocHandle.incMessagesRead(1);
//已完成填充byteBuf 调用业务pipeline
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading()); allocHandle.readComplete();
pipeline.fireChannelReadComplete(); if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
//如果不是主动read 要完成后要清理read op
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}

小结:可以看出没有任何的计算代码,Unsafe只实现边界检查、流程控制,具体实现交给上层处理