Hadoop RPC源码阅读-客户端

时间:2023-03-09 04:09:06
Hadoop RPC源码阅读-客户端

Hadoop版本Hadoop2.6

RPC主要分为3个部分:(1)交互协议(2)客户端(3)服务端

(2)客户端

先展示RPC客户端实例代码

public class LoginController {
public static void main(String[] args) throws IOException {
  //获取RPC LoginServiceInterface协议接口的代理对象
LoginServiceInterface proxy= RPC.getProxy(LoginServiceInterface.class,1L,new InetSocketAddress("localhost",10000),new Configuration());
String msg=proxy.login("xiaoming","123123");
System.out.println(msg);
}
}

(1)进入上述的RPC.getProxy方法,会发现是通过获取RpcEngine接口(默认实现是WritableRpcEngine),利用WritableRpcEngine的getProxy方法获取Proxy代理,如下所示

public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException { if (connectionRetryPolicy != null) {
throw new UnsupportedOperationException(
"Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
}
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy<T>(protocol, proxy, true);
}

(2)上述就是客户端获取代理的过程,但是其中是如何从服务端获取通过动态代理类Invoker实现,并将代理封装成ProtocolProxy类,在本文上述的例子中,该ProtocolProxy类没有干什么,只是通过getProxy()方法将封装的代理返回给客户端

那么我们接着分析动态代理类Invoker

Invoker成员有Clinet类,并且全局变量ClientCache对Client进行缓存。

动态代理类Invoker在代理对象发送请求时会自动执行invoke()方法,如下所示:

public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
TraceScope traceScope = null;
if (Trace.isTracing()) {
traceScope = Trace.startSpan(
method.getDeclaringClass().getCanonicalName() +
"." + method.getName());
}
ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}

3、上述中动态代理通过client.call方法向服务器发送请求获取返回值。

我们还看到Invocation类封装了方法和参数,Invocation通过实现Writable实现序列化,方便数据在网络中传输,作为数据传输层,相当于VO。

因此我们接着进入Clinet类,查看call方法干了什么。

首先我们先看看Client类的结构,Client类包含了几个内部类:

Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
Connection :用以处理远程连接对象。继承了Thread
ConnectionId :唯一确定一个连接

Client类中call()方法如下所示:

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);//将传入的数据封装成call对象
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);//获得一个连接  
try {
connection.sendRpcRequest(call); // send the rpc request向服务端发送call对象
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
} boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
} if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
} if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
}
} else {
return call.getRpcResponse();
}
}
}

4、从上述可以看到,rpcRequest是将方法和参数封装后的可序列号的对象,当做请求参数发送给服务端。

在上述方法中主要使用了两个类Call和Connection.

Call:封装了与服务端请求的状态,包括:

    final int id;               // call id该请求连接ID
final int retry; // retry count该请求重试次数
final Writable rpcRequest; // the serialized rpc request该请求参数
Writable rpcResponse; // null if rpc has error该请求的返回值
IOException error; // exception, null if success该请求成功标示
final RPC.RpcKind rpcKind; // Rpc EngineKind使用RpcEngine的类型
boolean done; // true when call is done该请求完成标示

Connection则是实现了与服务端建立连接,发送请求,获取数据等功能。

5、Connection类解析

Connection类继承线程类Thread.

从3步可以看到在Clinet的call()方法通过getConnection()方法获取Connection,如下所示:

可以看出Client使用connections对客户端每一个connection进行缓存,

并通过setupIOstreams()方法与服务器建立Socket连接,并创建输入输出流connection.in,connection.out,

并通过start()方法启动该线程也就是运行Connection类的run()方法,等待服务端传回数据。

因此Connection类主要通过run()方法接受数据,通过sendRpcRequest()向服务端发送请求。

private Connection getConnection(ConnectionId remoteId,
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
}
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId, serviceClass);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call)); //we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}

5.1 Connection 的sendRpcRequest()向服务端发送请求

public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
} // Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread, // so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d); synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
} if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id); byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
}); try {
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("unexpected checked exception", cause);
}
}
}
}

5.2 Connection 的run()获取服务端返回的数据

可以看到通过receiveRpcResponse()方法通过之前建立的输入流in获取服务器传来的数据,并将数据value传给call数据对象call.setRpcResponse(value);,

在call.setRpcResponse(value)方法中通过callComplete()将call数据对象设置成已完成,并通过notify()唤醒该call对象。

在Client的call()方法中,检测到call对象已完成后,就将call对象中的响应数据返回给调用者。

至此,一个完整的RPC远程过程调用的过程就完成了。

public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size()); try {
while (waitForWork()) {//wait here for work - read or close connection循环等待获取服务端数据
receiveRpcResponse();//获取服务端数据的具体实现
}
} catch (Throwable t) {
// This truly is unexpected, since we catch IOException in receiveResponse
// -- this is only to be really sure that we don't leave a client hanging
// forever.
LOG.warn("Unexpected error reading responses on connection " + this, t);
markClosed(new IOException("Error reading responses", t));
} close(); if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections "
+ connections.size());
}
private void receiveRpcResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch(); try {
int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
checkResponse(header); int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); int callId = header.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId); Call call = calls.get(callId);
RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
calls.remove(callId);
call.setRpcResponse(value); // verify that length was correct
// only for ProtobufEngine where len can be verified easily
if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
if (totalLen != headerLen + resWrapper.getLength()) {
throw new RpcClientException(
"RPC response length mismatch on rpc success");
}
}
} else { // Rpc Request failed
// Verify that length was correct
if (totalLen != headerLen) {
throw new RpcClientException(
"RPC response length mismatch on rpc error");
} final String exceptionClassName = header.hasExceptionClassName() ?
header.getExceptionClassName() :
"ServerDidNotSetExceptionClassName";
final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
final RpcErrorCodeProto erCode =
(header.hasErrorDetail() ? header.getErrorDetail() : null);
if (erCode == null) {
LOG.warn("Detailed error code not set by server on rpc error");
}
RemoteException re =
( (erCode == null) ?
new RemoteException(exceptionClassName, errorMsg) :
new RemoteException(exceptionClassName, errorMsg, erCode));
if (status == RpcStatusProto.ERROR) {
calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
}
}
} catch (IOException e) {
markClosed(e);
}
}