【Netty源码分析】发送数据过程

时间:2023-03-09 22:45:15
【Netty源码分析】发送数据过程

前面两篇博客【Netty源码分析】Netty服务端bind端口过程【Netty源码分析】客户端connect服务端过程中我们分别介绍了服务端绑定端口和客户端连接到服务端的过程,接下来我们分析一下数据发送的过程。

future.channel().writeAndFlush("Hello Netty Server ,I am a common client");  

调用AbstractChannel的writeAndFlush函数

@Override
public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
}

调用AbstractChannelHandlerContext的writeAndFlush函数

@Override
public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
	........

	write(msg, true, promise);

	.......

}

需要注意的一点是,写数据的过程其实是分为两步的,第一步是将要写的数据写到buffer中,第二步是flush其实就是从buffer中读取数据然后发送给服务端。

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

首先是调用write函数,将数据写到buffer中。

private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

调用HeadContext的write函数

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
}

AbstractUnsafe中调用write函数,这一步就可以认为将数据写到buffer中了,接下来buffer的东西我们会分析。

@Override
public final void write(Object msg, ChannelPromise promise) {

	.......

    outboundBuffer.addMessage(msg, size, promise);

	......
}

接下来是flush过程,将数据写到服务端

private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

HeadContext中调用flush过程

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
     unsafe.flush();
}

AbstractUnsafe中调用flush过程,在这里我们可以看到之前写入数据的buffer(outboundBuffer)

@Override
public final void flush() {
   assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
         return;
     }

    outboundBuffer.addFlush();
    flush0();
}

调用AbstractNioUnsafe的flush0函数

@Override
protected void flush0() {

	........

    doWrite(outboundBuffer);

	.......

}

AbstractUnsafe中调用flush0函数

protected void flush0() {

	........

    doWrite(outboundBuffer);

	.......

}

调用NioSocketChannel中的doWrite函数,在doWrite函数中会看到调用NIO中的socketChannel中的写数据操作。

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                // All written so clear OP_WRITE
                clearOpWrite();
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    super.doWrite(in);
                    return;
                case 1:
                    // Only one ByteBuf so use non-gathering write
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }