Kafka 源码分析之网络层(一)

时间:2022-09-19 00:01:15

Kafka 源码分析之网络层(一)


Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。小编会给大家带来几期 Kafka 相关的源码分析文章。这一系列文章是基于kafka 0.9.1版本,今天先来网络层的第一部分-概述和网络层模型实现。

PS:丰富的一线技术、多元化的表现形式,尽在“HULK一线技术杂谈”,点关注哦!



Kafka的网络层模型概述

  • 这个模型其实一点也不神秘,很质朴,很清晰,也很好用,引用源码中的一句话:

The threading model is 1 Acceptor thread that handles new connections Acceptor has N Processor threads that each have their own selector and read requests from socketsM Handler threads that handle requests and produce responses back to the processor threads for writing

  • 再来张图:

Kafka 源码分析之网络层(一)

  • Acceptor 作两件事: 创建一堆worker线程;接受新连接, 将新的socket指派给某个 worker线程;

  • Worker线程处理若干个socket,接受请求转给各种handler处理,response再经由worker线程发送回去.

  • 总结起来就是个半同步半异步模型(https://github.com/DavidLiuXh/lightningserver).

Kafka的网络层模型实现

  • 虽然kafka用scala实现,但里面也用了大量的java类, 这部分主要是用了NIO(http://tutorials.jenkov.com/java-nio/index.html);

  • 主要实现文件:core/src/main/scal/kafka/network/SocketServer.scala,里面包括了SocketServer, Acceptor, Processor等;

  • 数据传输层实现:clients/src/main/java/org/apache/kafka/common/network,里面包括了Channel,TransportLayer,Authenticator等.

Kafka网络层一哥:SocketServer类

  • 所在文件: core/src/main/scala/kafka/network/SocketServer.scala;

  • 统筹组织所有的网络层组件;

startup()方法:

(1) 根据配置的若干endpoint创建相应的Acceptor及相关联的一组Processor线程;

 
  1. for (i <- processorBeginIndex until processorEndIndex) {

  2.          processors(i) = new Processor(i,

  3.            time,

  4.            maxRequestSize,

  5.            requestChannel,

  6.            connectionQuotas,

  7.            connectionsMaxIdleMs,

  8.            protocol,

  9.            config.values,

  10.            metrics

  11.          )

  12.        }

  13. val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,

  14. processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)

(2) 创建Acceptor运行的线程并启动;

 
  1. Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString,

  2.                          endpoint.port), acceptor, false).start()

  3. acceptor.awaitStartup()

Kafka网络层头号马仔:Acceptor类

  • 所在文件: core/src/main/scala/kafka/network/SocketServer.scala Acceptor类对象创建:

(1) 创建监听ServerSocket:

 
  1. val serverChannel = ServerSocketChannel.open()

  2.    serverChannel.configureBlocking(false)

  3.    serverChannel.socket().setReceiveBufferSize(recvBufferSize)

  4.    try {

  5.      serverChannel.socket.bind(socketAddress)

  6.      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))

  7.    } catch {

  8.      case e: SocketException =>

  9.        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)

  10.    }

(2) 开启分配到的若干Processor:

 
  1. this.synchronized {

  2.    processors.foreach { processor =>

  3.      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()

  4.    }

  5.  }

(3) run()-利用NIO的selector来接收网络连接:

 
  1. var currentProcessor = 0

  2.      while (isRunning) {

  3.        try {

  4.          val ready = nioSelector.select(500)

  5.          if (ready > 0) {

  6.            val keys = nioSelector.selectedKeys()

  7.            val iter = keys.iterator()

  8.            while (iter.hasNext && isRunning) {

  9.              try {

  10.                val key = iter.next

  11.                iter.remove()

  12.                if (key.isAcceptable)

  13.                  accept(key, processors(currentProcessor))

  14.                else

  15.                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

  16.                // round robin to the next processor thread

  17.                currentProcessor = (currentProcessor + 1) % processors.length

  18.              } catch {

  19.                case e: Throwable => error("Error while accepting connection", e)

  20.              }

  21.            }

  22.          }

  23.        }

这里面最主要的就是 accept(key,processors(currentProcessor))

(4) accept: 设置新连接socket的参数后交由Processor处理:

 
  1. socketChannel.configureBlocking(false)

  2. socketChannel.socket().setTcpNoDelay(true)

  3. socketChannel.socket().setKeepAlive(true)

  4. socketChannel.socket().setSendBufferSize(sendBufferSize)

  5. processor.accept(socketChannel)



Kafka网络层堂口扛把子:Processor类

  • 所在文件:core/src/main/scala/kafka/network/SocketServer.scala;

  • 从单个连接进来的request都由它处理;

  • 每个Processor对象会创建自己的nio selector;

  • 每个连接有唯一标识ConnectionId: $localHost:$localPort-$remoteHost:$remotePort,这个非常重要!!!

  • accept(socketChannel::SocketChannel):将新连接的SocketChannel保存到并发队列Q1中;

  • run()是核心, 包裹在一个循环里,直接线程退出, while(isRunning){},里面依次调用如下函数: 

    (1) configureNewConnections():从并发队列Q1里取出SocketChannel,添加到自身的nio selector中,监听读事件;

    (2) processNewResponses():处理当前所有处理完成的request相应的response, 这些response都是从RequestChannel获得(requestChannel.receiveResponse),根据request的类型来决定从当前连接的nio selector中暂时删除读事件监听/添加写事件/关闭当前连接;

    (3) selector.poll(300): 这个就不用解释了, 这个selector是对nio selector的又一封装,我们后面一章会讲到,它完成具体的数据接收和发送;

    (4) selector.completedReceives.asScala.foreach: 处理当前所有的从selector返回的完整request,将其put到RequestChannel的一个阻塞队列里,供应用层获取并处理;同时会暂时删除些连接上的读事件监听:selector.mute(receive.source);

    (5) selector.completedSends.asScala.foreach: 处理当前所有的从selector返回的写操作,重新将读事件添加到连接的selector监听中 selector.unmute(send.destination);

    (6) selector.disconnected.asScala.foreach: 处理当前所有将关闭的连接;

Kafka网络层中间人:RequestChannel类

  • 所在文件: core/src/main/scala/kafka/network/RequestChannel.scala;

  • 保存所有从网络层拿到的完整request和需要发送的response;

  • 一般是RequestHandler会周期性从RequestChannel获取request,并将response保存回RequestChannel;

  • processNewResponses() 处理RequestChannel中所有的response;

Kafka网络层看门小弟:ConnectionQuotas类

  • 所在文件:core/src/main/scala/kafka/network/SocketServer.scala;

  • 当某个IP到kafka的连接数过多时,将抛出 TooManyConnectionsException异常;

  • 实现就是通过加锁的加减计数;

SocketServer 图解:

Kafka 源码分析之网络层(一)

下一篇咱们来讲在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送。


扫描下方二维码了解更多内容



Kafka 源码分析之网络层(一)