自己实现一个简单的RPC框架

时间:2022-08-14 20:38:25

RPC的全称是Remote Procedure Call,它是一种进程间的通信方式。允许像调用本地服务一样调用远程服务。


对于RPC的总结:

  1. 简单:RPC概念的语义十分简单和清晰,这样建立分布式计算更容易。
  2. 高效:过程调用看起来十分简单而且十分高效。
  3. 通用:在单机计算过程中往往是不同算法和API,在跨进程调用最重要的是通用的通信机制。

对于RPC框架实现的技术点总结如下:

  1. 远程服务提供者需要一某种形式提供服务调用的相关信息,包括但不限于服务接口的定义、数据结构,或者中间状态服务定义的文件。
  2. 远程代理对象:服务调用者调用的服务实际上市远程服务的代理。
  3. 通信:RPC框架的与具体的通信协议无关
  4. 序列化:远程通信需要将对象转换成二进制码流进行传输,不同的序列化框架支持的数据类型,数据包大小,异常类型及性能等都不相同。

最简单的RPC框架实现:

  1. 服务提供者,运行在服务端,负责服务接口定义和服务接口实现。
  2. 服务发布者,运行在RPC服务端,负责将本地服务发布成远程服务,供其他消费者调用
  3. 本地服务代理,运行在RPC客户端,通过代理调用远程服务提供者,然后对结果进行封装返回给本地消费者

下面是具体的实现,
定义EchoService接口,代码如下

/**
* @author zhuowen_pan.
* @version 1.0.0
*/

public interface EchoService {

String sayHello(String name);

}

EchoServiceImpl定义该接口的实现类

/**
* @author zhuowen_pan.
* @version 1.0.0
*/

public class EchoServiceImpl implements EchoService {

@Override
public String sayHello(String name) {
return name == null ? "hello nobody" : "hello " + name;
}
}

RPC服务端服务发布者代码的实现

/**
* @author zhuowen_pan.
* @version 1.0.0
*/

public class RpcExporter {

private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

public static void exporter(String hostName, int port) throws Exception {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(hostName, port));
try {
while (true) {
executor.execute(new ExporterTasks(serverSocket.accept()));
}
} finally {
serverSocket.close();
}
}

private static class ExporterTasks implements Runnable {
Socket client = null;

public ExporterTasks(Socket client) {
this.client = client;
}

@Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream out = null;
try {

input = new ObjectInputStream(client.getInputStream());
String interfaceName = input.readUTF();
Class<?> service = Class.forName(interfaceName);
String methodName = input.readUTF();
Class<?>[] paramsTypes = (Class<?>[]) input.readObject();
Object[] args = (Object[]) input.readObject();
Method method = service.getMethod(methodName, paramsTypes);
Object result = method.invoke(service.newInstance(), args);
out = new ObjectOutputStream(client.getOutputStream());
out.writeObject(result);

} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (out != null) {
out.close();
}
if (input != null) {
input.close();
}
if (client != null) {
client.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

}

服务发布者的主要职责

  1. 作为服务端,监听客户端的TCP链接,接收到客户端的饿链接之后封装成Tsak交给线程池执行
  2. 将客户端发送的码流反序列化成对象,反射调用服务的实现者,获取执行结果
  3. 将结果反序列化,通过Socket发送给客户端
  4. 远程服务调用结束后,释放资源

RPC客户端本地服务代理源码如下

/**
* @author zhuowen_pan.
* @version 1.0.0
*/
public class RpcImporter<S> {

public S importer(final Class<?> serviceClass, final InetSocketAddress address) {
return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{serviceClass.getInterfaces()[0]}, (proxy, method, args) -> {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
socket = new Socket();
socket.connect(address);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(serviceClass.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
if (socket != null) {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
return null;
})
;
}

}

本地服务的代理的功能如下

  1. 将本地接口调用转换成JDK的动态代理,在动态代理中实现接口的调用
  2. 创建Socket客户端,根据指定地址调用远程服务的提供者
  3. 将远程接口调用所需要的接口类,方法名,参数列表等编码后发送给服务者
  4. 同步阻塞等待服务端返回应答,获取执行结果

测试代码

/**
* @author zhuowen_pan.
* @version 1.0.0
*/

public class Client {

public static void main(String[] args) {
new Thread(() -> {
try {
RpcExporter.exporter("127.0.0.1", 8088);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
RpcImporter<EchoService> importer = new RpcImporter<>();
EchoService echoService = importer.importer(EchoServiceImpl.class, new InetSocketAddress("127.0.0.1", 8088));
System.out.println(echoService.sayHello(peter));
}

}

运行结果
自己实现一个简单的RPC框架


到这里一个简单RPC服务框架就实现了。