简单实现Java的RMI——远程方法调用

时间:2023-11-18 12:35:20

一、RMI简介:

说到RMI就不得不说RPC了。

RPC:(Remote Procedure Call),远程过程调用。

RMI(Remote Method Invocation),远程方法调用。

RPC和RMI是有区别的,RPC中是通过网络服务协议向远程主机发送请求,RPC远程主机就去搜索与之相匹配的类和方法,找到后就执行方法并把结果编码,通过网络协议发回。

而RMI是通过客户端的对象作为远程接口进行远程方法的调用。RMI只适用于Java语言。

二、RMI的运行机理:

涉及两个网络端。其核心思想是,一个端可以通过调用另一个端的方法,实现相关功能。
一个端“执行”一个方法,而这个方法的实际执行是在另一端进行的!

当然,两个端都应该有相同的类,自然会拥有相同的方法。
一个端所谓的执行这个方法,其实是通过调用这个类的代理对象的方法,在其中拦截这个方法,在这个方法中
实际上是将执行这个方法的参数和类名称、方法名称,通过网络通讯传输给另一端;另一端根据得到的方法名称、
类名称和参数,实际执行那个方法,再将方法执行结果回传给对端。
要注意的问题:
1、实际执行方法的一端,我们可以认为是RMI服务器端,伪执行一端,自然是RMI客户端;
2、伪执行端不应该自己完成参数、方法名称和类名称的传递工作;也就是说,对于RMI客户端用户而言,他只面对一个类的一个方法,
    直接执行就好;
3、RMI服务器端可能接收多个RMI客户端有关这个方法的执行请求,每个客户端的执行当然应该是独立的,应该用线程实现;
4、RMI服务器端在执行了相关方法,并回传方法执行结果后,应该断开与RMI客户端的连接。

下面是我要实现它的一个思路。

简单实现Java的RMI——远程方法调用

上图由于截图原因,给点补充说明:RpcClientExecutor的作用是建立和服务器的连接,并接受消息和发送消息,具体是发送方法的序列号和参数类型。

1.首先:应该是RpcBeanDefinition:

 package com.xupt.rpc.core;

 import java.lang.reflect.Method;

 public class RpcBeanDefination {   //将类,类方法,类对象封装在Definition中。

     private Class<?> klass;
private Method method;
private Object object; RpcBeanDefination() {
}

给该类所有的成员都有getter和setter方法就不需要说了,这个类,将执行的哪个类的哪个方法和类的对象封装起来,以后这个类将形成Map

的值,下面来介绍的RpcBeanFactory会重点介绍。

2.RpcBeanFactory

 package com.xupt.rpc.core;

 import java.util.HashMap;
import java.util.Map; public class RpcBeanFactory { private final Map<String, RpcBeanDefination> beanMap; RpcBeanFactory() {
beanMap = new HashMap<>();
} void rpcBeanRegistry(String beanId,RpcBeanDefination defination) {
RpcBeanDefination rbd = beanMap.get(beanId);
if(rbd != null) {
return;
}
beanMap.put(beanId, defination);
} RpcBeanDefination getBean(String beanId) {
return beanMap.get(beanId);
}
}

此类是将序列号作为Map中的键,RpcBeanDeifintion作为值放入Map中,用BeanId来找对应客户端那边序列号相同的方法。

3.下来是RpcBeanRegistry:

 package com.xupt.rpc.core;

 import java.lang.reflect.Method;

 public class RpcBeanRegistry {

     RpcBeanRegistry() {
} //给客户端提供
static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces) {
doregistry(rpcBeanFactory,interfaces,null);
} //内部使用,注册
private static void doregistry(RpcBeanFactory rpcBeanFactory , Class<?> interfaces ,Object object) {
//得到接口中的所有的方法,行程方法的数组
Method[] methods = interfaces.getDeclaredMethods();
for(Method method : methods) { //遍历这些方法
String beanId = String.valueOf(method.toString().hashCode());//将方法序列化。
RpcBeanDefination rpcBeanDefination = new RpcBeanDefination(); //将得到的实现接口的那个类和它的方法以及对象放进RpcBeanDefination()中。
rpcBeanDefination.setKlass(interfaces);
rpcBeanDefination.setMethod(method);
rpcBeanDefination.setObject(object); rpcBeanFactory.rpcBeanRegistry(beanId, rpcBeanDefination);
}
} //服务端使用,知道实现类的对象。
static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Object object) {
//判断此类是否实现了这个接口。
if(!interfaces.isAssignableFrom(object.getClass())){
return;
}
doregistry(rpcBeanFactory,interfaces,object);
} //服务器端使用,知道类,创建一个对象。
static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Class<?> klass) {
//判断该类是否实现了接口。
if(!interfaces.isAssignableFrom(klass)){
return;
}
try {
doregistry(rpcBeanFactory, interfaces, klass.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}

这个类是同时给客户端和服务器使用的,所以有一个私有方法来完成类方法的获得和序列号的产生(method.toString().hashcode())。然后将类、方法、对象放进RpcBeanDefinition中,将得到的序列号作为键和rpcBeanDeifinyion作为值放进Map中,形成键值对,方便客户端和服务器的调用。

4.下来是RpcServer:

package com.xupt.rpc.core;

 import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket; public class RpcServer implements Runnable { private ServerSocket server;
private int port;
private boolean goon;
private final RpcBeanFactory rpcBeanFactory;
private static long executorId; public RpcServer() {
rpcBeanFactory = new RpcBeanFactory();
this.goon = false;
} public void setPort(int port) {
this.port = port;
} public void startRpcServer() throws Exception {
if(this.port == 0) {
return;
}
server = new ServerSocket(port);
this.goon = true;
new Thread(this,"Rpc_Server").start();//启动线程
} public void stopRpcServer() { //关闭服务器
if (this.server != null && !this.server.isClosed()) {
try {
this.server.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
this.server = null;
}
}
} RpcBeanFactory getRpcBeanFactory() {
return rpcBeanFactory;
} //用注册的方法知道类实现的接口,类的对象,通过对象执行类的方法。
public void rpcRegistry(Class<?> interfaces,Object object) {
RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces, object);
} public void rpcRegistry(Class<?> interfaces, Class<?> imClass) {
RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces,imClass);
} @Override
public void run() {
while(goon) {
try {
Socket socket = server.accept();//不断的侦听 new RpcServerExecutor(socket, this,++executorId); } catch (Exception e) {
goon = false;
e.printStackTrace();
}
}
stopRpcServer();
}
}

上述服务器端的基本任务已经完成了。

下来我们来看看客户端:

首先还是:RpcClientExecutor,它是建立与服务器端的连接,以及接收,发送消息,具体发送的是方法的序列号和参数类型。

package com.xupt.rpc.core;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket; public class RpcClientExecutor { private String rpcServerIp; //ip地址
private int rpcServerPort; //端口号 RpcClientExecutor() {
} RpcClientExecutor(String rpcServerIp, int rpcServerPort) {
this.rpcServerIp = rpcServerIp;
this.rpcServerPort = rpcServerPort;
} String getRpcServerIp() {
return rpcServerIp;
} void setRpcServerIp(String rpcServerIp) {
this.rpcServerIp = rpcServerIp;
} int getRpcServerPort() {
return rpcServerPort;
} void setRpcServerPort(int rpcServerPort) {
this.rpcServerPort = rpcServerPort;
} //关闭客户端
private void closeSocket(ObjectInputStream ois, ObjectOutputStream oos, Socket socket) {
try {
if (ois != null) {
ois.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ois = null;
}
try {
if (oos != null) {
oos.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
oos = null;
}
try {
if (socket != null && !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
socket = null;
}
} @SuppressWarnings("unchecked")
<T> T rpExecutor(String beanId,Object[] params) throws Exception {
//连接服务器端
Socket socket = new Socket(rpcServerIp, rpcServerPort);
//发送方法的序列号和参数类型。
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeUTF(beanId);
oos.writeObject(params); //必须将这句放在前面三句的后面。
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
//接收服务器端返回的结果。
Object result = ois.readObject(); closeSocket(ois,oos,socket); return (T) result; }
}

下来是:RpcClient:这个类是产生代理,用代理对象伪执行相关的方法,然后在真正的来连接服务器。

 package com.xupt.rpc.core;

 import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy; public class RpcClient { private RpcClientExecutor rpcClientExecutor; public RpcClient(String rpcServerIp,int rpcServerport) {
this.rpcClientExecutor = new RpcClientExecutor(rpcServerIp, rpcServerport);
} @SuppressWarnings("unchecked")
public <T> T getProxy(Class<?> klass) {
return (T) Proxy.newProxyInstance(
klass.getClassLoader(),
new Class[] {klass},
new InvocationHandler() { @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String BeanId = String.valueOf(method.toString().hashCode());
Object result = rpcClientExecutor.rpExecutor(BeanId, args); return result;
}
}
);
}
}

上述方法产生代理就不多做解释,不明白请看上一篇的代理机制之初见吧。。。。

以上就是我的RMI的简单实现,入如果有错误,请指正!也希望它对你有所帮助。