java netty之ServerBootstrap的启动

时间:2023-03-09 01:56:38
java netty之ServerBootstrap的启动

通过前面的几篇文章,对整个netty部分的架构已经运行原理都有了一定的了解,那么这篇文章来分析一个经常用到的类:ServerBootstrap,一般对于服务器端的编程它用到的都还算是比较的多。。看一看它的初始化,以及它的运行原理。。。

首先我们还是引入一段代码,通过分析这段代码来分析ServerBootstrap的运行。。。

  1. EventLoopGroup bossGroup = new NioEventLoopGroup();   //这个是用于serversocketchannel的eventloop
  2. EventLoopGroup workerGroup = new NioEventLoopGroup();    //这个是用于处理accept到的channel
  3. try {
  4. ServerBootstrap b = new ServerBootstrap();    //构建serverbootstrap对象
  5. b.group(bossGroup, workerGroup);   //设置时间循环对象,前者用来处理accept事件,后者用于处理已经建立的连接的io
  6. b.channel(NioServerSocketChannel.class);   //用它来建立新accept的连接,用于构造serversocketchannel的工厂类
  7. b.childHandler(new ChannelInitializer<SocketChannel>(){      //为accept channel的pipeline预添加的inboundhandler
  8. @Override     //当新连接accept的时候,这个方法会调用
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. // TODO Auto-generated method stub
  11. ch.pipeline().addLast(new MyChannelHandler());   //为当前的channel的pipeline添加自定义的处理函数
  12. }
  13. });
  14. //bind方法会创建一个serverchannel,并且会将当前的channel注册到eventloop上面,
  15. //会为其绑定本地端口,并对其进行初始化,为其的pipeline加一些默认的handler
  16. ChannelFuture f = b.bind(80).sync();
  17. f.channel().closeFuture().sync();  //相当于在这里阻塞,直到serverchannel关闭
  18. } finally {
  19. bossGroup.shutdownGracefully();
  20. workerGroup.shutdownGracefully();
  21. }

这段代码在前面的文章也有用到,基本上其意思也都在上面的注释中说的比较清楚了,那么我们接下来具体的分析其中的方法调用,首先是ServerBootstrap的group方法:

  1. //这里parent用于执行server的accept时间事件,child才是用于执行获取的channel连接的事件
  2. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
  3. super.group(parentGroup);
  4. if (childGroup == null) {
  5. throw new NullPointerException("childGroup");
  6. }
  7. if (this.childGroup != null) {
  8. throw new IllegalStateException("childGroup set already");
  9. }
  10. this.childGroup = childGroup;
  11. return this;
  12. }

这个方法是用来设置eventloopgroup,首先调用了父类的group方法(abstractbootstrap),就不将父类的方法列出来了,其实意思都差不多,eventloopgroup属性的值。。。

好了,接下来我们再来看一下channel方法:

  1. //构造serversocketchannel factory
  2. public B channel(Class<? extends C> channelClass) {
  3. if (channelClass == null) {
  4. throw new NullPointerException("channelClass");
  5. }
  6. return channelFactory(new BootstrapChannelFactory<C>(channelClass));  //构造工厂类
  7. }
  8. /**
  9. * {@link ChannelFactory} which is used to create {@link Channel} instances from
  10. * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
  11. * is not working for you because of some more complex needs. If your {@link Channel} implementation
  12. * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
  13. * simplify your code.
  14. */
  15. @SuppressWarnings("unchecked")
  16. public B channelFactory(ChannelFactory<? extends C> channelFactory) {
  17. if (channelFactory == null) {
  18. throw new NullPointerException("channelFactory");
  19. }
  20. if (this.channelFactory != null) {
  21. throw new IllegalStateException("channelFactory set already");
  22. }
  23. this.channelFactory = channelFactory;   //设置
  24. return (B) this;
  25. }

该方法主要是用于构造用于产生channel的工厂类,在我们这段代码说白了就是用于实例化serversocketchannel的工厂类。。。

接下来我们再来看一下childHandler方法:

  1. //设置childHandler,这个是当有channel accept之后为其添加的handler
  2. public ServerBootstrap childHandler(ChannelHandler childHandler) {
  3. if (childHandler == null) {
  4. throw new NullPointerException("childHandler");
  5. }
  6. this.childHandler = childHandler;
  7. return this;
  8. }

这个很简单吧,就是一个赋值,具体说他有什么用,前面的注释有说明,不过以后的分析会说明它有什么用的。。。

接下来我们来看一下bind方法,这个比较重要吧:

  1. //最终将会创建serverchannel,然后会将其绑定到这个地址,然后对其进行初始化
  2. public ChannelFuture bind(int inetPort) {
  3. return bind(new InetSocketAddress(inetPort));
  4. }

好吧,接下来再来看bind方法:

  1. public ChannelFuture bind(SocketAddress localAddress) {
  2. validate();
  3. if (localAddress == null) {
  4. throw new NullPointerException("localAddress");
  5. }
  6. return doBind(localAddress);
  7. }

好吧,再来看看doBind方法:

  1. private ChannelFuture doBind(final SocketAddress localAddress) {
  2. final ChannelFuture regPromise = initAndRegister();   //在这里创建serverchanel,并对其进行初始化,并将其注册到eventloop当中去
  3. final Channel channel = regPromise.channel();
  4. final ChannelPromise promise = channel.newPromise();
  5. if (regPromise.isDone()) {
  6. doBind0(regPromise, channel, localAddress, promise);   //将当前的serverchannel绑定地址
  7. } else {
  8. regPromise.addListener(new ChannelFutureListener() {
  9. @Override
  10. public void operationComplete(ChannelFuture future) throws Exception {
  11. doBind0(future, channel, localAddress, promise);
  12. }
  13. });
  14. }
  15. return promise;
  16. }

这里调用了一个比较重要的方法:initAndRegister,我们来看看它的定义:

  1. //创建初始化以及注册serverchanel
  2. final ChannelFuture initAndRegister() {
  3. //利用工厂类创建channel
  4. final Channel channel = channelFactory().newChannel();
  5. try {
  6. init(channel);  //init函数留给了后面来实现,用于初始化channel,例如为其的pipeline加上handler
  7. } catch (Throwable t) {
  8. channel.unsafe().closeForcibly();
  9. return channel.newFailedFuture(t);
  10. }
  11. ChannelPromise regPromise = channel.newPromise();
  12. group().register(channel, regPromise);  //将当前创建的serverchannel注册到eventloop上面去
  13. if (regPromise.cause() != null) {
  14. if (channel.isRegistered()) {
  15. channel.close();
  16. } else {
  17. channel.unsafe().closeForcibly();
  18. }
  19. }
  20. // If we are here and the promise is not failed, it's one of the following cases:
  21. // 1) If we attempted registration from the event loop, the registration has been completed at this point.
  22. //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
  23. // 2) If we attempted registration from the other thread, the registration request has been successfully
  24. //    added to the event loop's task queue for later execution.
  25. //    i.e. It's safe to attempt bind() or connect() now:
  26. //         because bind() or connect() will be executed *after* the scheduled registration task is executed
  27. //         because register(), bind(), and connect() are all bound to the same thread.
  28. return regPromise;
  29. }

代码还是很简单,而且也相对比较好理解,无非就是利用前面说到过的channel工厂类来创建一个serversocketchannel,然后调用init方法对这个刚刚生成的channel进行一些初始化的操作,然后在调用eventloopgroup的register方法,将当前这个channel的注册到group上,那么以后这个channel的事件都在这个group上面执行,说白了也就是一些accept。、。。

好,我们先来看看这个init方法吧:

  1. @Override
  2. //初始化chanel,当用channel factory构造channel以后,会调用这个函数来初始化,说白了就是为当前的channel的pipeline加入一些handler
  3. void init(Channel channel) throws Exception {
  4. //先初始化一些配置
  5. final Map<ChannelOption<?>, Object> options = options();
  6. synchronized (options) {
  7. channel.config().setOptions(options);
  8. }
  9. //初始化一些属性
  10. final Map<AttributeKey<?>, Object> attrs = attrs();
  11. synchronized (attrs) {
  12. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
  13. @SuppressWarnings("unchecked")
  14. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
  15. channel.attr(key).set(e.getValue());
  16. }
  17. }
  18. //获取当前channel的pipeline
  19. ChannelPipeline p = channel.pipeline();
  20. if (handler() != null) {
  21. p.addLast(handler());
  22. }
  23. final EventLoopGroup currentChildGroup = childGroup;
  24. final ChannelHandler currentChildHandler = childHandler;
  25. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  26. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  27. synchronized (childOptions) {
  28. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
  29. }
  30. synchronized (childAttrs) {
  31. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
  32. }
  33. p.addLast(new ChannelInitializer<Channel>() {
  34. @Override
  35. public void initChannel(Channel ch) throws Exception {
  36. //这是一个inboundher,将其加入到serverchannel的pipeline上面去
  37. ch.pipeline().addLast(new ServerBootstrapAcceptor(
  38. currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  39. }
  40. });
  41. }

代码还是相对很简单,首先初始化一些配置参数,然后初始化属性,最后还要为当前的channel的pipeline添加一个handler,这个handler用来当channel注册到eventloop上面之后对其进行一些初始化,我们还是来看看channelInitalizer的定义吧:

  1. public abstract class ChannelInitializer<C extends Channel> extends ChannelStateHandlerAdapter {
  2. private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
  3. /**
  4. * This method will be called once the {@link Channel} was registered. After the method returns this instance
  5. * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
  6. *
  7. * @param ch            the {@link Channel} which was registered.
  8. * @throws Exception    is thrown if an error occours. In that case the {@link Channel} will be closed.
  9. */
  10. protected abstract void initChannel(C ch) throws Exception;
  11. @SuppressWarnings("unchecked")
  12. @Override
  13. public final void channelRegistered(ChannelHandlerContext ctx)
  14. throws Exception {
  15. boolean removed = false;
  16. boolean success = false;
  17. try {
  18. //调用用户定义的init函数对当前的channel进行初始化
  19. initChannel((C) ctx.channel());
  20. ctx.pipeline().remove(this);
  21. removed = true;
  22. ctx.fireChannelRegistered();
  23. success = true;
  24. } catch (Throwable t) {
  25. logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
  26. } finally {
  27. if (!removed) {
  28. ctx.pipeline().remove(this);
  29. }
  30. if (!success) {
  31. ctx.close();
  32. }
  33. }
  34. }
  35. @Override
  36. public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
  37. ctx.fireInboundBufferUpdated();
  38. }
  39. }

它有一个channelRegistered方法,这个方法是在当前pipeline所属的channel注册到eventloop上面之后会激活的方法,它则是调用了用户自定义的函数来初始化channel,然后在将当前handler移除。。。也就是执行

  1. ch.pipeline().addLast(new ServerBootstrapAcceptor(
  2. currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

这里又为当前的serversocketchannel添加了另外一个handler,来看看该类型的定义吧:

  1. private static class ServerBootstrapAcceptor
  2. extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<Channel> {
  3. private final EventLoopGroup childGroup;
  4. private final ChannelHandler childHandler;
  5. private final Entry<ChannelOption<?>, Object>[] childOptions;
  6. private final Entry<AttributeKey<?>, Object>[] childAttrs;
  7. @SuppressWarnings("unchecked")
  8. ServerBootstrapAcceptor(
  9. EventLoopGroup childGroup, ChannelHandler childHandler,
  10. Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
  11. this.childGroup = childGroup;  //这个是用于管理accept的channel的eventloop
  12. this.childHandler = childHandler;
  13. this.childOptions = childOptions;
  14. this.childAttrs = childAttrs;
  15. }
  16. @Override
  17. public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
  18. return Unpooled.messageBuffer();
  19. }
  20. @Override
  21. @SuppressWarnings("unchecked")
  22. //当有数据进来的时候,会调用这个方法来处理数据,这里进来的数据就是accept的channel
  23. public void inboundBufferUpdated(ChannelHandlerContext ctx) {
  24. MessageBuf<Channel> in = ctx.inboundMessageBuffer(); //获取buf
  25. for (;;) {
  26. Channel child = in.poll();
  27. if (child == null) {
  28. break;
  29. }
  30. child.pipeline().addLast(childHandler);   //为accept的channel的pipeline加入用户定义的初始化handler
  31. for (Entry<ChannelOption<?>, Object> e: childOptions) {
  32. try {
  33. if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
  34. logger.warn("Unknown channel option: " + e);
  35. }
  36. } catch (Throwable t) {
  37. logger.warn("Failed to set a channel option: " + child, t);
  38. }
  39. }
  40. for (Entry<AttributeKey<?>, Object> e: childAttrs) {
  41. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  42. }
  43. try {
  44. childGroup.register(child);   //将当前accept的channel注册到eventloop
  45. } catch (Throwable t) {
  46. child.unsafe().closeForcibly();
  47. logger.warn("Failed to register an accepted channel: " + child, t);
  48. }
  49. }
  50. }
  51. @Override
  52. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  53. final ChannelConfig config = ctx.channel().config();
  54. if (config.isAutoRead()) {
  55. // stop accept new connections for 1 second to allow the channel to recover
  56. // See https://github.com/netty/netty/issues/1328
  57. config.setAutoRead(false);
  58. ctx.channel().eventLoop().schedule(new Runnable() {
  59. @Override
  60. public void run() {
  61. config.setAutoRead(true);
  62. }
  63. }, 1, TimeUnit.SECONDS);
  64. }
  65. // still let the exceptionCaught event flow through the pipeline to give the user
  66. // a chance to do something with it
  67. ctx.fireExceptionCaught(cause);
  68. }
  69. }

主要是有一个比较重要的方法,inboundBufferUpdated,这个方法是在有数据进来的时候会调用的,用于处理进来的数据,也就是accept到的channel,这里就知道我们定义的chidHandler的用处了吧,netty会将这个handler直接加入到刚刚accept到的channel的pipeline上面去。。。最后还要讲当前accept到的channel注册到child eventloop上面去,这里也就完完全全的明白了最开始定义的两个eventloopgroup的作用了。。。

好了,serversocketchannel的init以及register差不多了,然后会调用doBind0方法,将当前的serversocketchannel绑定到一个本地端口,

  1. //将chanel绑定到一个本地地址
  2. private static void doBind0(
  3. final ChannelFuture regFuture, final Channel channel,
  4. final SocketAddress localAddress, final ChannelPromise promise) {
  5. // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
  6. // the pipeline in its channelRegistered() implementation.
  7. channel.eventLoop().execute(new Runnable() {
  8. @Override
  9. //匿名内部类想要访问外面的参数,那么外面的参数必须是要final的才行
  10. public void run() {
  11. if (regFuture.isSuccess()) {
  12. //调用channel的bind方法,将当前的channl绑定到一个本地地址,其实是调用的是pipeline的bind方法,但是最终又是调用的当前
  13. //channel的unsafe对象的bind方法
  14. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  15. } else {
  16. promise.setFailure(regFuture.cause());
  17. }
  18. }
  19. });
  20. }

其实这里调用bind方法最终还是调用serversocketchannel的unsafe对象的bind方法。。。。

到这里,整个serverbootstrap 就算初始化完成了,而且也可以开始运行了。。。

  1. b.childHandler(new ChannelInitializer<SocketChannel>(){      //为accept channel的pipeline预添加的inboundhandler
  2. @Override     //当新连接accept的时候,这个方法会调用
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. // TODO Auto-generated method stub
  5. ch.pipeline().addLast(new MyChannelHandler());   //为当前的channel的pipeline添加自定义的处理函数
  6. }
  7. });

这段代码的意思是对于刚刚accept到的channel,将会在它的pipeline上面添加handler,这个handler的用处主要是就是用户自定义的initChannel方法,就是初始化这个channel,说白了就是为它的pipeline上面添加自己定义的handler。。。

这样整个serverbootstrap是怎么运行的也就差不多了。。。

刚开始接触到netty的时候觉得这里一头雾水,通过这段时间对其代码的阅读,总算搞懂了其整个运行的原理,而且觉得其设计还是很漂亮的,虽然有的时候会觉得有那么一点点的繁琐。。。。

整个运行过程总结为一下几个步骤:

(1)创建用于两个eventloopgroup对象,一个用于管理serversocketchannel,一个用于管理accept到的channel

(2)创建serverbootstrap对象,

(3)设置eventloopgroup

(4)创建用于构建用到的channel的工厂类

(5)设置childhandler,它的主要功能主要是用户定义代码来初始化accept到的channel

(6)创建serversocketchannel,并对它进行初始化,绑定端口,以及register,并为serversocketchannel的pipeline设置默认的handler

通过这几个步骤,整个serverbootstrap也就算是运行起来了。。。