zk请求和响应对

时间:2023-03-09 20:27:52
zk请求和响应对

zk的请求和响应是通过id对应上的:

请求头(RequestHeader)和响应头(ReplyHeader)共用一个xid,它的本质是ClientCnxn类中的一个计数器。

1. 首先看客户端:

Packet类封装了客户端的请求头、请求体、响应头、响应体。

org.apache.zookeeper.ClientCnxn.Packet

static class Packet {
RequestHeader requestHeader;
ReplyHeader replyHeader;
Record request;
Record response;
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
boolean finished;
AsyncCallback cb;
Object ctx;
WatchRegistration watchRegistration;
public boolean readOnly; Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration) {
this(requestHeader, replyHeader, request, response,
watchRegistration, false);
} Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration, boolean readOnly) {
this.requestHeader = requestHeader;
this.replyHeader = replyHeader;
this.request = request;
this.response = response;
this.readOnly = readOnly;
this.watchRegistration = watchRegistration;
} public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
}

RequestHeader

public class RequestHeader implements Record {
private int xid;
private int type;
}

GetDataRequest

public class GetDataRequest implements Record {
private String path;
private boolean watch;
}

ReplyHeader

public class ReplyHeader implements Record {
private int xid;
private long zxid;
private int err;
}

GetDataResponse

public class GetDataResponse implements Record {
private byte[] data;
private org.apache.zookeeper.data.Stat stat;
}

客户端发送请求:

//ClientCnxnSocketNIO
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//省略其他代码
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
//设置连接id
p.requestHeader.setXid(cnxn.getXid());
}
//创建发送的ByteBuffer
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
enableWrite();
}
}
}
}
//ClientCnxn类
private int xid = 1;
synchronized public int getXid() {
return xid++;
}

2. 然后看服务端:

服务端用Request类封装了请求的相关信息,

//org.apache.zookeeper.server.Request
public Request(ServerCnxn cnxn, long sessionId, int xid, int type,
ByteBuffer bb, List<Id> authInfo) {
this.cnxn = cnxn;
this.sessionId = sessionId;
//对应RequestHeader的xid
this.cxid = xid;
this.type = type;
this.request = bb;
this.authInfo = authInfo;
}

在ZooKeeperServer.processPacket方法中有:

//反序列化获取RequestHeader
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header"); //把RequestHeader的xid赋给Request的cxid
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(),
               incomingBuffer, cnxn.getAuthInfo());

在FinalRequestProcessor.processRequest方法中有:

long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
//xid从RequstHeader传到Request,然后传到ReplyHeader
ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());