netty客户端源码

时间:2023-12-16 20:42:50

随笔记录。

//创建一个ChannelFactory(客户端代码)

ChannelFactory factory = new NioClientSocketChannelFactory(

Executors.newCachedThreadPool(),

Executors.newCachedThreadPool());

// NioClientSocketChannelFactory构造方法

public NioClientSocketChannelFactory(

Executor bossExecutor, Executor workerExecutor,

int bossCount, int workerCount) {

...

// 线程池

this.bossExecutor = bossExecutor;

// 线程池

this.workerExecutor = workerExecutor;

// 构建ChannelSink,NioClientSocketPipelineSink实例

// bossCount默认1,workerCount默认Runtime.getRuntime().availableProcessors() * 2

sink = new NioClientSocketPipelineSink(

bossExecutor, workerExecutor, bossCount, workerCount);

}

// NioClientSocketPipelineSink构造方法

NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor,

int bossCount, int workerCount) {

this.bossExecutor = bossExecutor;

bosses = new Boss[bossCount];

for (int i = 0; i < bosses.length; i ++) {

bosses[i] = new Boss(i + 1);

}

workers = new NioWorker[workerCount];

for (int i = 0; i < workers.length; i ++) {

workers[i] = new NioWorker(id, i + 1, workerExecutor);

}

}

// 创建Bootstrap并设置factory(客户端代码)

ClientBootstrap bootstrap = new ClientBootstrap(factory);

// Bootstrap类set方法

public void setFactory(ChannelFactory factory) {

this.factory = factory;

}

// 设置ChannelPipelineFactory,实现getPipeline方法,返回一个ChannelPipeline实现类

// (客户端代码)

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

public ChannelPipeline getPipeline() {

ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("encode",new StringEncoder());

pipeline.addLast("decode",new StringDecoder());

pipeline.addLast("handler1",new TimeClientHandler());

return pipeline;

}

});

DefaultChannelPipeline类addLast方法

public synchronized void addLast(String name, ChannelHandler handler) {

if (name2ctx.isEmpty()) {

// 初始化name2ctx,head,tail

init(name, handler);

} else {

DefaultChannelHandlerContext oldTail = tail;

DefaultChannelHandlerContext    newTail =   new DefaultChannelHandlerContext(oldTail, null, name, handler);

// 最新的DefaultChannelHandlerContext放入tail以及更新到oldTail.next中

oldTail.next = newTail;

tail = newTail;

name2ctx.put(name, newTail);

}

}

// 客户端发起连接请求(客户端代码)

bootstrap.connect (new InetSocketAddress("127.0.0.1", 8080));

// connect源代码解读

ClientBootstrap类connect方法

public ChannelFuture connect(final SocketAddress remoteAddress,

final SocketAddress localAddress) {

ChannelPipeline pipeline;

try {

// 返回 DefaultChannelPipeline对象实例

pipeline = getPipelineFactory().getPipeline();

} catch (Exception e) {

throw new ChannelPipelineException("Failed to initialize a pipeline.", e);

}

// Set the options.

// 返回NioClientSocketChannelFactory实例,并创建NioClientSocketChannel实例

Channel ch = getFactory().newChannel(pipeline);

ch.getConfig().setOptions(getOptions());

// Bind.

if (localAddress != null) {

ch.bind(localAddress);

}

// Connect.

return ch.connect(remoteAddress);

}

NioClientSocketChannelFactory类newChannel方法

public SocketChannel newChannel(ChannelPipeline pipeline) {

//this为NioClientSocketChannelFactory实例

//pipeline为DefaultChannelPipeline实例

//sink为NioClientSocketPipelineSink实例

// sink.nextWorker返回一个NioWorker实例

return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());

}

NioClientSocketChannel类构造方法

NioClientSocketChannel(

ChannelFactory factory, ChannelPipeline pipeline,

ChannelSink sink, NioWorker worker) {

//  新创建一个SocketChannel(newSocket() = > SocketChannel.open())

super(null, factory, pipeline, sink, newSocket(), worker);

fireChannelOpen(this);

}

继续看父类NioSocketChannel构造方法

public NioSocketChannel(

Channel parent, ChannelFactory factory,

ChannelPipeline pipeline, ChannelSink sink,

SocketChannel socket, NioWorker worker) {

super(parent, factory, pipeline, sink);

this.socket = socket;

this.worker = worker;

config = new DefaultNioSocketChannelConfig(socket.socket());

}

继续看父类AbstractChannel构造方法

protected AbstractChannel(

Channel parent, ChannelFactory factory,

ChannelPipeline pipeline, ChannelSink sink) {

// 传入了一个null值

this.parent = parent;

// NioClientSocketChannelFactory实例

this.factory = factory;

// DefaultChannelPipeline实例

this.pipeline = pipeline;

id = allocateId(this);

pipeline.attach(this, sink);

}

DefaultChannelPipeline类attach方法

public void attach(Channel channel, ChannelSink sink) {

// NioClientSocketChannel实例

this.channel = channel;

// NioClientSocketPipelineSink实例

this.sink = sink;

}

// ClientBootstrap类connect方法中ch.connect(remoteAddress)

//类AbstractChannel

public ChannelFuture connect(SocketAddress remoteAddress) {

return Channels.connect(this, remoteAddress);

}

//类Channels

public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {

if (remoteAddress == null) {

throw new NullPointerException("remoteAddress");

}

ChannelFuture future = future(channel, true);

// DefaultChannelPipeline

// 新建一个ChannelState实例DownstreamChannelStateEvent

channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(

channel, future, ChannelState.CONNECTED, remoteAddress));

return future;

}

//类NioClientSocketPipelineSink

public void eventSunk(

ChannelPipeline pipeline, ChannelEvent e) throws Exception {

if (e instanceof ChannelStateEvent) {

ChannelStateEvent event = (ChannelStateEvent) e;

NioClientSocketChannel channel =

(NioClientSocketChannel) event.getChannel();

ChannelFuture future = event.getFuture();

ChannelState state = event.getState();

Object value = event.getValue();

switch (state) {

case OPEN:

if (Boolean.FALSE.equals(value)) {

channel.worker.close(channel, future);

}

break;

case BOUND:

if (value != null) {

bind(channel, future, (SocketAddress) value);

} else {

channel.worker.close(channel, future);

}

break;

case CONNECTED:

if (value != null) {

//第一次客户端发起连接

connect(channel, future, (SocketAddress) value);

} else {

channel.worker.close(channel, future);

}

break;

case INTEREST_OPS:

channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());

break;

}

} else if (e instanceof MessageEvent) {

MessageEvent event = (MessageEvent) e;

NioSocketChannel channel = (NioSocketChannel) event.getChannel();

boolean offered = channel.writeBuffer.offer(event);

assert offered;

channel.worker.writeFromUserCode(channel);

}

}

private void connect(

final NioClientSocketChannel channel, final ChannelFuture cf,

SocketAddress remoteAddress) {

try {

// channel.socket在初始化NioClientSocketChannel时创建

//nio发起连接,因为设置了socket.configureBlocking(false)

//connect方法立即返回,返回值为false

//此时服务端已经收到了客户端发送的connect事件并进行处理

if (channel.socket.connect(remoteAddress)) {

channel.worker.register(channel, cf);

} else {

channel.getCloseFuture().addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture f)

throws Exception {

if (!cf.isDone()) {

cf.setFailure(new ClosedChannelException());

}

}

});

cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

channel.connectFuture = cf;

//注册事件,nextBoss()返回一个Runnable实例

nextBoss().register(channel);

}

} catch (Throwable t) {

cf.setFailure(t);

fireExceptionCaught(channel, t);

channel.worker.close(channel, succeededFuture(channel));

}

}

// Boss内部类 NioClientSocketPipelineSink

void register(NioClientSocketChannel channel) {

Runnable registerTask = new RegisterTask(this, channel);

Selector selector;

synchronized (startStopLock) {

if (!started) {

// Open a selector if this worker didn't start yet.

try {

// 打开一个选择器

this.selector = selector =  Selector.open();

} catch (Throwable t) {

throw new ChannelException(

"Failed to create a selector.", t);

}

// Start the worker thread with the new Selector.

boolean success = false;

try {

//启动线程,消费任务队列

//bossExecutor是客户端代码Executors.newCachedThreadPool()所创建

// nio的selector.select(500)操作

DeadLockProofWorker.start(

bossExecutor,

new ThreadRenamingRunnable(

this, "New I/O client boss #" + id + '-' + subId));

success = true;

} finally {

if (!success) {

// Release the Selector if the execution fails.

try {

selector.close();

} catch (Throwable t) {

logger.warn("Failed to close a selector.", t);

}

this.selector = selector = null;

// The method will return to the caller at this point.

}

}

} else {

// Use the existing selector if this worker has been started.

selector = this.selector;

}

assert selector != null && selector.isOpen();

started = true;

//写入队列一个注册任务

boolean offered = registerTaskQueue.offer(registerTask);

assert offered;

}

if (wakenUp.compareAndSet(false, true)) {

selector.wakeup();

}

}

//类DeadLockProofWorker

public static void start(final Executor parent, final Runnable runnable) {

//parent为bossExecutor,即一个线程池

......

//开启一个子线程

parent.execute(new Runnable() {

public void run() {

PARENT.set(parent);

try {

// ThreadRenamingRunnable实例

runnable.run();

} finally {

PARENT.remove();

}

}

});

}

//类ThreadRenamingRunnable

public void run() {

......

// Run the actual runnable and revert the name back when it ends.

try {

//runnable为Boss实例

runnable.run();

} finally {

if (renamed) {

// Revert the name back if the current thread was renamed.

// We do not check the exception here because we know it works.

currentThread.setName(oldThreadName);

}

}

}

// Boss内部类 NioClientSocketPipelineSink中

public void run() {

boolean shutdown = false;

Selector selector = this.selector;

long lastConnectTimeoutCheckTimeNanos = System.nanoTime();

for (;;) {

wakenUp.set(false);

try {

// 设置超时阻塞

int selectedKeyCount = selector.select(500);

if (wakenUp.get()) {

selector.wakeup();

}

// 消费队列中的事件

//nio中register操作

processRegisterTaskQueue();

if (selectedKeyCount > 0) {

//处理选择器获取到的事件

processSelectedKeys(selector.selectedKeys());

}

……

} catch (Throwable t) {

……

}

}

}

private void processRegisterTaskQueue() {

for (;;) {

//获取事件,task为registerTaskQueue.offer(registerTask);RegisterTask实例

final Runnable task = registerTaskQueue.poll();

if (task == null) {

break;

}

//执行NioClientSocketPipelineSink中的内部类RegisterTask的Run方法

task.run();

}

}

//内部类RegisterTask NioClientSocketPipelineSink中

public void run() {

try {

// nio socket注册,只有完成注册以后,才能和服务端进行通信

channel.socket.register(

boss.selector, SelectionKey.OP_CONNECT, channel);

} catch (ClosedChannelException e) {

channel.worker.close(channel, succeededFuture(channel));

}

……

}

private void processSelectedKeys(Set<SelectionKey> selectedKeys) {

for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {

SelectionKey k = i.next();

i.remove();

if (!k.isValid()) {

close(k);

continue;

}

if (k.isConnectable()) {

//完成客户端连接

connect(k);

}

}

}

private void connect(SelectionKey k) {

NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();

try {

//nio完成客户端连接

if (ch.socket.finishConnect()) {

k.cancel();

//NioWorker类注册

ch.worker.register(ch, ch.connectFuture);

}

} catch (Throwable t) {

.......

}

}

类NioWorker负责读写事件注册处理

未完待续...