kafka源码解析之六SocketServer

时间:2022-06-25 16:46:32
class SocketServer(val brokerId: Int,
val host: String,
val port: Int,
val numProcessorThreads: Int,
val maxQueuedRequests: Int,
val sendBufferSize: Int,
val recvBufferSize: Int,
val maxRequestSize: Int = Int.MaxValue,
val maxConnectionsPerIp: Int = Int.MaxValue,
val connectionsMaxIdleMs: Long,
val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
@volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)//
/* a meter to track the average free capacity of the network processors */
private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
/**
* Start the socket server
*/
def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for(i <- 0 until numProcessorThreads) {//启动num.network.threads个Processor线程处理网络请求
processors(i) = new Processor(i,
time,
maxRequestSize,
aggregateIdleMeter,
newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads,
requestChannel,
quotas,
connectionsMaxIdleMs)
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}

newGauge("ResponsesBeingSent", new Gauge[Int] {
def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
})
// register the processor threads for notification of responses 注册response的listener,当有response的时候,调用ResponseListener
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections 接受网络连接请求
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
acceptor.awaitStartup
info("Started")
}
}

Acceptor作为一个独立的线程存在,当接受到网络连接请求的时候,轮训地甩给其中一个Processor线程处理之后的request

private[kafka] class Acceptor(val host: String, 
val port: Int,
private val processors: Array[Processor],
val sendBufferSize: Int,
val recvBufferSize: Int,
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) {
val serverChannel = openServerSocket(host, port)

/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
startupComplete()
var currentProcessor = 0
while(isRunning) {
val ready = selector.select(500)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isAcceptable)
accept(key, processors(currentProcessor))//添加到Processor的newConnections中,以后该processor负责这个Connections的所有request
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread 轮训
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(selector.close())
shutdownComplete()
}
}
那么Processor线程是如何处理request的呢?关键在于requestChannel,它作为request和response的传输通道,使得Processor线程只负责接受connection的requet和发送相应的reponse,而和真实的业务逻辑无关,且看requestChannel
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//1个request的阻塞队列,供之后的KafkaRequestHandler线程接收
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)// num.network.threads个response的阻塞队列,供之后的KafkaRequestHandler线程存放
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
}
即Processor线程将各自对应的connection的request都存放进requestQueue中,然后分别从对应的responseQueues(i)中获取对应的request的response,如下图:

kafka源码解析之六SocketServer

代码如下:
private[kafka] class Processor(val id: Int,
val time: Time,
val maxRequestSize: Int,
val aggregateIdleMeter: Meter,
val idleMeter: Meter,
val totalProcessorThreads: Int,
val requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) {
override def run() {
startupComplete()
while(isRunning) {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()//receive 对应阻塞队列responseQueue的response
val startSelectTime = SystemTime.nanoseconds
val ready = selector.select(300)
currentTimeNanos = SystemTime.nanoseconds
val idleTime = currentTimeNanos - startSelectTime
idleMeter.mark(idleTime)
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
aggregateIdleMeter.mark(idleTime / totalProcessorThreads)

trace("Processor id " + id + " selection time = " + idleTime + " ns")
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isReadable)
read(key)//获取connection的request
else if(key.isWritable)
write(key)//写相应request的response
else if(!key.isValid)
close(key)
else
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
close(key)
} case e: InvalidRequestException => {
info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
close(key)
} case e: Throwable => {
error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
close(key)
}
}
}
}
maybeCloseOldestConnection
}
debug("Closing selector.")
closeAll()
swallowError(selector.close())
shutdownComplete()
}

def read(key: SelectionKey) {
lruConnections.put(key, currentTimeNanos)
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
val address = socketChannel.socket.getRemoteSocketAddress();
trace(read + " bytes read from " + address)
if(read < 0) {
close(key)
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)//组装request
requestChannel.sendRequest(req)//把request发送给requestChannel
key.attach(null)
// explicitly reset interest ops to not READ, no need to wake up the selector just yet
key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
}

def write(key: SelectionKey) {
val socketChannel = channelFor(key)
val response = key.attachment().asInstanceOf[RequestChannel.Response]
val responseSend = response.responseSend//获取response的内容
if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = responseSend.writeTo(socketChannel)//将response发送给负责该connection的socket
trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
if(responseSend.complete) {
response.request.updateRequestMetrics()
key.attach(null)
trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
} else {
trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_WRITE)
wakeup()
}
}
}