分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

时间:2022-06-07 20:35:53

RPC基础知识


什么是RPC?

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。

RPC使得开发包括网络分布式多程序在内的应用程序更加容易。


RPC的模型

C/S模式 
基于传输层协议(例如TCP/IP)  远程调用不是新的一种数据传输协议
事件响应基本模型(请求、计算、响应)


RPC设计的目的

通过固定的协议调用非本机的方法
提供不同语言程序之间通信
可以在不了解底层通信,像本地方法一样调用


RPC框架完全封装了网络传输以及其他细节,比如SpringRPC框架在调用远程对象的方法时就像调用Spring Bean对象一样使用.

RPC的应用 大量的分布式应用都使用了RPC协议,比如分布式操作系统、分布式计算、分布式软件设计


RPC过程详解

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

RPC框架封装网络传输和其他细节,消费者和生产者不用去关心底层原理

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

消费者的代理层控制了整个RPC调用的流程,生成代理对象,封装请求报文,发送请求之类的

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

服务提供者会有一个监听模块,用来监听请求,并且按照约定,应该是注册了的服务才会被消费者调用到,注册的服务需要被反射调用到,用来计算结果


RPC框架的特点和设计模型

封装网络交互

尽量不要让RPC框架的使用者涉及到过多的网络层的开发

远程调用对象的代理

将接口代理的对象放入到Spring 容器之中,方便服务端开发

支持容器(SpringJetty)

支持Spring容器,还有Jetty这样的web容器

可配置,可扩展

尽量做到可配置,可扩展


设计模型

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

Proxy代理层

用于对象的代理,对象的反射调用,RPC流程的控制

Serialize序列化层

将请求和结果做序列化和反序列化

Invoke网络模块

网络通信相关的处理

Container容器组件

支持代理层监听网络请求


代码实现

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ibigsea</groupId>
<artifactId>http-rpc</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.6</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.2.8.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
</project>
config包下面的

ConsumerConfig.java

package com.ibigsea.rpc.config;

/**
* 服务消费者配置
*
* @author bigsea
*
*/
public class ConsumerConfig {

/**
* 请求地址 服务提供者监听的地址和端口
*/
private String url;

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

}
ProviderConfig.java
package com.ibigsea.rpc.config;/** * 服务提供者配置 *  * @author bigsea * */public class ProviderConfig {	/**	 * 监听端口 服务提供者监听请求端口	 */	private int port;	public ProviderConfig() {	}	public ProviderConfig(int port) {		this.port = port;	}	public int getPort() {		return port;	}	public void setPort(int port) {		this.port = port;	}} 

序列化层

Request.java

package com.ibigsea.rpc.serizlize;

import java.io.Serializable;

import com.alibaba.fastjson.annotation.JSONType;

/**
* 请求信息
*
* @author bigsea
*
*/
public class Request implements Serializable {

private static final long serialVersionUID = -4363326153251862952L;

private Class clazz;

private String method;

private Object param;

public Request() {
}

public Request(Class clazz, String method, Object param) {
this.clazz = clazz;
this.method = method;
this.param = param;
}

public Class getClazz() {
return clazz;
}

public void setClazz(Class clazz) {
this.clazz = clazz;
}

public String getMethod() {
return method;
}

public void setMethod(String method) {
this.method = method;
}

public Object getParam() {
return param;
}

public void setParam(Object param) {
this.param = param;
}

/**
* 通过反射执行对应的方法
*
* @param bean
* @return
* @throws Exception
*/
public Object invoke(Object bean) throws Exception {
return clazz.getMethod(method, param.getClass()).invoke(bean, param);
}

}
JsonParser.java

package com.ibigsea.rpc.serizlize;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;

/**
* 反序列化
*
* @author bigsea
*
*/
public class JsonParser {
/**
* 反序列化请求 将请求反序列化成一个请求报文
*
* @param param
* @return
*/
public static Request reqParse(String param) {
return JSON.parseObject(param, Request.class);
}

/**
* 反序列化响应 将响应反序列化成一个响应报文
*
* @param result
* @return
*/
public static <T> T resbParse(String result) {
return (T) JSON.parse(result);
}

}
JsonFormatter.java

package com.ibigsea.rpc.serizlize;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;

/**
* 序列化
*
* @author bigsea
*
*/
public class JsonFormatter {

/**
* 将请求序列化成字符串
*
* @param clazz
* @param method
* @param param
* @return
*/
public static String reqFormatter(Class clazz, String method, Object param) {
Request request = new Request(clazz, method, param);
return JSON.toJSONString(request, SerializerFeature.WriteClassName);
}

/**
* 将响应序列化成字符串
*
* @param param
* @return
*/
public static String resbFormatter(Object param) {
return JSON.toJSONString(param, SerializerFeature.WriteClassName);
}

}

http容器  httpContainer.java

package com.ibigsea.rpc.container;

import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.handler.AbstractHandler;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ibigsea.rpc.config.ProviderConfig;

/**
* 利用Jetty实现简单的嵌入式Httpserver
*
* @author bigsea
*
*/
public class HttpContainer {

private Logger LOG = LoggerFactory.getLogger(HttpContainer.class);

private AbstractHandler httpHandler;
private ProviderConfig providerConfig;

/**
* 构造方法
*
* @param httpHandler
*/
public HttpContainer(AbstractHandler httpHandler) {
this(httpHandler, new ProviderConfig(8080));
}

/**
* 构造方法
*
* @param httpHandler
* @param providerConfig
*/
public HttpContainer(AbstractHandler httpHandler, ProviderConfig providerConfig) {
this.httpHandler = httpHandler;
this.providerConfig = providerConfig;
}

public void start() {
// 进行服务器配置
Server server = new Server();
try {
SelectChannelConnector connector = new SelectChannelConnector();
// 设置监听端口
connector.setPort(providerConfig.getPort());
// 设置handler,请求过来之后通过该handler来处理请求
server.setHandler(httpHandler);
server.setConnectors(new Connector[] { connector });
server.start();
LOG.info("容器启动~");
} catch (Exception e) {
LOG.error("容器启动异常~", e);
}

}

}
RpcException.java

package com.ibigsea.rpc.exception;

/**
* 异常
*
* @author bigsea
*
*/
public class RpcException extends Throwable {
private Object data;

public RpcException(String message, Throwable cause, Object data) {
super(message, cause);
this.data = data;
}

public RpcException(Object data) {
super();
this.data = data;
}

public Object getData() {
return data;
}
}
HttpInvoke.java

package com.ibigsea.rpc.invoke;

import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import com.ibigsea.rpc.config.ConsumerConfig;
import com.ibigsea.rpc.exception.RpcException;

/**
* http请求和响应处理
*
* @author bigsea
*
*/
public class HttpInvoke {

private static final HttpClient httpClient = getHttpClient();

/**
* 单例
*/
private static HttpInvoke httpInvoke;

private HttpInvoke() {

}

public static synchronized HttpInvoke getInstance() {
if (httpInvoke == null) {
httpInvoke = new HttpInvoke();
}
return httpInvoke;
}

/**
* 发送请求
*
* @param request
* 服务消费者将 (类信息、方法、参数)封装成请求报文,序列化后的字符串
* @param consumerConfig
* 服务消费者请求的地址
* @return 请求结果
* @throws RpcException
*/
public String request(String request, ConsumerConfig consumerConfig) throws RpcException {
HttpPost post = new HttpPost(consumerConfig.getUrl());
// 使用长连接
post.setHeader("Connection", "Keep-Alive");
List<NameValuePair> params = new ArrayList<NameValuePair>();
params.add(new BasicNameValuePair("data", request));

try {
post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
HttpResponse response = httpClient.execute(post);
if (response.getStatusLine().getStatusCode() == 200) {
return EntityUtils.toString(response.getEntity(), "UTF-8");
}
throw new RpcException(request);
} catch (Exception e) {
throw new RpcException("http调用异常", e, request);
}
}

/**
* 响应结果 服务提供者根据服务消费者的请求报文执行后返回结果信息
*
* @param response
* @param outputStream
* @throws RpcException
*/
public void response(String response, OutputStream outputStream) throws RpcException {
try {
outputStream.write(response.getBytes("UTF-8"));
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
}
}

private static HttpClient getHttpClient() {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
// 连接池最大生成连接数200
cm.setMaxTotal(200);
// 默认设置route最大连接数为20
cm.setDefaultMaxPerRoute(20);
// 指定专门的route,设置最大连接数为80
HttpHost localhost = new HttpHost("localhost", 8080);
cm.setMaxPerRoute(new HttpRoute(localhost), 50);
// 创建httpClient
return HttpClients.custom().setConnectionManager(cm).build();
}

}
接下来就是代理成了,因为我们使用了jetty容器,所以这里对服务提供者的代理我们通过jetty的AbstractHandler来实现请求的处理

ProviderProxyFactory.java

package com.ibigsea.rpc.proxy;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.mortbay.jetty.handler.AbstractHandler;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ibigsea.rpc.config.ProviderConfig;
import com.ibigsea.rpc.container.HttpContainer;
import com.ibigsea.rpc.exception.RpcException;
import com.ibigsea.rpc.invoke.HttpInvoke;
import com.ibigsea.rpc.serizlize.JsonFormatter;
import com.ibigsea.rpc.serizlize.JsonParser;
import com.ibigsea.rpc.serizlize.Request;

/**
* 服务提供者代理
*
* @author bigsea
*
*/
public class ProviderProxyFactory extends AbstractHandler {

private Logger LOG = LoggerFactory.getLogger(ProviderProxyFactory.class);

/**
* 提供服务需要注册,这里使用map类实现简单的注册 约定俗成的,暴漏服务是需要注册的
*/
private Map<Class, Object> providers = new ConcurrentHashMap<Class, Object>();

/**
* 这里用来获取暴露的服务
*/
private static ProviderProxyFactory factory;

private static HttpInvoke invoke = HttpInvoke.getInstance();

/**
* 构造方法 注册服务 创建http容器,并启动
*
* @param providers
* @param config
*/
public ProviderProxyFactory(Map<Class, Object> providers, ProviderConfig config) {
this.providers = providers;
HttpContainer container = new HttpContainer(this, config);
container.start();
factory = this;
for (Map.Entry<Class, Object> entry : providers.entrySet()) {
Log.info(entry.getKey().getSimpleName() + " register");
}
}

/**
* 处理请求 服务消费者发送请求报文过来,服务提供者解析请求报文,通过反射执行方法
*/
public void handle(String target, HttpServletRequest request, HttpServletResponse response, int dispatch)
throws IOException, ServletException {
// 获取请求报文
String data = request.getParameter("data");

try {
// 反序列化
Request req = JsonParser.reqParse(data);
// 获取到注册的服务,并通过反射执行方法
Object res = req.invoke(ProviderProxyFactory.getInstance().getBeanByClass(req.getClazz()));
// 返回结果
invoke.response(JsonFormatter.resbFormatter(res), response.getOutputStream());
} catch (Exception e) {
e.printStackTrace();
} catch (RpcException e) {
e.printStackTrace();
}

}

public Object getBeanByClass(Class clazz) throws RpcException {
Object bean = providers.get(clazz);
if (bean != null) {
return bean;
}
throw new RpcException("service no register", new NullPointerException(), clazz);
}

public static ProviderProxyFactory getInstance() {
return factory;
}

}

对于服务消费者,我们通过jdk的invocationHandler来生成代理对象,对于生成的代理对象都会去执行invoke方法

ConsumerProxyFatory.java

package com.ibigsea.rpc.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import com.ibigsea.rpc.config.ConsumerConfig;
import com.ibigsea.rpc.invoke.HttpInvoke;
import com.ibigsea.rpc.serizlize.JsonFormatter;
import com.ibigsea.rpc.serizlize.JsonParser;

/**
* 服务消费者代理
*
* @author bigsea
*
*/
public class ConsumerProxyFactory implements InvocationHandler {

/**
* 消费者配置
*/
private ConsumerConfig config;

/**
* 需要通过远程调用的服务
*/
private String clazz;

private static HttpInvoke invoke = HttpInvoke.getInstance();

/**
* 创建一个动态代理对象,创建出来的动态代理对象会执行invoke方法
*
* @return
* @throws ClassNotFoundException
*/
public Object create() throws ClassNotFoundException {
Class interfaceClass = Class.forName(clazz);
return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[] { interfaceClass }, this);
}

/**
* 动态代理对象执行该方法 获取(类信息,方法,参数)通过序列化封装成请求报文,通过http请求发送报文到服务提供者
*/
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 获取类信息
Class interfaceClass = proxy.getClass().getInterfaces()[0];
// 封装成请求报文
String req = JsonFormatter.reqFormatter(interfaceClass, method.getName(), args[0]);
// 发送请求报文
String resb = invoke.request(req, config);
// 解析响应报文
return JsonParser.resbParse(resb);
}

public ConsumerConfig getConfig() {
return config;
}

public void setConfig(ConsumerConfig config) {
this.config = config;
}

public String getClazz() {
return clazz;
}

public void setClazz(String clazz) {
this.clazz = clazz;
}

}

简单的RPC框架已经写好

然后我们准备一个公共的接口jar

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

pom里面什么依赖都没有

HelloInterface.java

package com.ibigsea.facade;

import com.ibigsea.vo.People;

/**
* 定义一个接口,如此而已
* @author bigsea
*
*/
public interface HelloInterface {

public String speak(People people);

}

People.java

package com.ibigsea.vo;

import java.io.Serializable;

/**
* 实体
* @author bigsea
*
*/
public class People implements Serializable {

private static final long serialVersionUID = 1L;

private String name;

public People() {
}

public People(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

}

接口定义好了 ,我们可以开始弄服务提供者和服务消费者了

服务提供者

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架
pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ibigsea</groupId>
<artifactId>demo-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.ibigsea</groupId>
<artifactId>http-rpc</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.ibigsea</groupId>
<artifactId>demo-facade</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
HelloService.java

package com.ibigsea.service;

import org.springframework.stereotype.Service;

import com.ibigsea.facade.HelloInterface;
import com.ibigsea.vo.People;

/**
* 实现接口,通过spring配置文件,暴漏出一个服务
* @author bigsea
*
*/
@Service("helloInterface")
public class HelloService implements HelloInterface {

/**
* 方法实现,服务消费者最终执行到该方法
*/
public String speak(People people) {
return "Hello " + people.getName();
}

}
启动类App.java

package com.ibigsea;

import java.util.concurrent.CountDownLatch;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
* 启动类
* @author bigsea
*
*/
public class App {

public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:spring-*.xml");
context.start();
CountDownLatch countDownLatch = new CountDownLatch(1);
countDownLatch.await();
}

}
spring-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">


<!-- 扫描包 -->
<context:component-scan base-package="com.ibigsea" />
<!-- 支持注解配置 -->
<context:annotation-config />

<!-- 构造注入,声明需要暴漏的服务. 还有需要监听的地址 -->
<bean class="com.ibigsea.rpc.proxy.ProviderProxyFactory">
<constructor-arg name="providers">
<map key-type="java.lang.Class" value-type="java.lang.Object">
<!-- 注册服务,类信息,和接口实现 -->
<entry key="com.ibigsea.facade.HelloInterface" value-ref="helloInterface"/>
</map>
</constructor-arg>
<constructor-arg name="config">
<bean id="providerConfig" class="com.ibigsea.rpc.config.ProviderConfig">
<property name="port" value="8888"/>
</bean>
</constructor-arg>
</bean>

</beans>
服务消费者

分布式学习笔记1通过Java自己实现简单的HTTP RPC框架

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ibigsea</groupId>
<artifactId>demo-comsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.ibigsea</groupId>
<artifactId>http-rpc</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.ibigsea</groupId>
<artifactId>demo-facade</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>3.2.8.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
RefService.java

package com.ibigsea.comsumer;

import javax.annotation.Resource;

import org.springframework.stereotype.Service;

import com.ibigsea.facade.HelloInterface;
import com.ibigsea.vo.People;

@Service("refService")
public class RefService {

//这里引用到的是java生成的代理对象
@Resource
private HelloInterface helloInterface;

public void sayHello(String name) {
System.out.println(helloInterface.speak(new People(name)));
}

}
spring-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<context:component-scan base-package="com.ibigsea" />
<context:annotation-config />

<!-- 服务消费者请求的地址 -->
<bean id="consumerConfig" class="com.ibigsea.rpc.config.ConsumerConfig">
<property name="url" value="http://localhost:8888/invoke" />
</bean>

<!-- 设置请求地址,需要生成代理的代理对象 -->
<bean id="helloInterfaceInvoke" class="com.ibigsea.rpc.proxy.ConsumerProxyFactory">
<property name="config" ref="consumerConfig"/>
<property name="clazz" value="com.ibigsea.facade.HelloInterface"/>
</bean>
<!-- 产生代理对象,服务消费者可以直接通过@Resource注解引用到该对象,通过http-rpc框架调用到服务消费者 -->
<bean id="helloInterface" factory-bean="helloInterfaceInvoke" factory-method="create"/>
</beans>
测试类HttpRpcTest.java

package com.zto.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.ibigsea.comsumer.RefService;
import com.ibigsea.facade.HelloInterface;
import com.ibigsea.rpc.serizlize.JsonFormatter;

/**
* 测试类
* @author bigsea
*
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath*:spring-*.xml"})
public class HttpRpcTest
{
private static final Logger logger = LoggerFactory.getLogger(HttpRpcTest.class);

@Autowired
private RefService service;

@Test
public void test() throws InterruptedException {
service.sayHello("张三");
}
}

我们可以启动程序看看

先启动服务提供者,服务提供者控制台

五月 01, 2017 2:43:43 下午 org.springframework.context.support.ClassPathXmlApplicationContext prepareRefresh
信息: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@685f4c2e: startup date [Mon May 01 14:43:43 CST 2017]; root of context hierarchy
五月 01, 2017 2:43:43 下午 org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
信息: Loading XML bean definitions from file [E:\workspace\rpcworkspace\demo-provider\target\classes\spring-context.xml]
五月 01, 2017 2:43:44 下午 org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
信息: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@26be92ad: defining beans [helloInterface,org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,com.ibigsea.rpc.proxy.ProviderProxyFactory#0,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor]; root of factory hierarchy
14:43:45.608 [main] INFO org.mortbay.log - Logging to Logger[org.mortbay.log] via org.mortbay.log.Slf4jLog
14:43:45.622 [main] DEBUG org.mortbay.log - Container Server@4562e04d + ProviderProxyFactory@2a65fe7c as handler
14:43:45.622 [main] DEBUG org.mortbay.log - Container Server@4562e04d + SelectChannelConnector@0.0.0.0:8888 as connector
14:43:45.622 [main] INFO org.mortbay.log - jetty-6.1.26
14:43:45.638 [main] DEBUG org.mortbay.log - Container Server@4562e04d + org.mortbay.thread.QueuedThreadPool@235834f2 as threadpool
14:43:45.639 [main] DEBUG org.mortbay.log - started org.mortbay.thread.QueuedThreadPool@235834f2
14:43:45.640 [main] DEBUG org.mortbay.log - starting ProviderProxyFactory@2a65fe7c
14:43:45.640 [main] DEBUG org.mortbay.log - started ProviderProxyFactory@2a65fe7c
14:43:45.640 [main] DEBUG org.mortbay.log - starting Server@4562e04d
14:43:45.691 [main] DEBUG org.mortbay.log - started org.mortbay.jetty.nio.SelectChannelConnector$1@2b71e916
14:43:45.692 [main] INFO org.mortbay.log - Started SelectChannelConnector@0.0.0.0:8888
14:43:45.693 [main] DEBUG org.mortbay.log - started SelectChannelConnector@0.0.0.0:8888
14:43:45.693 [main] DEBUG org.mortbay.log - started Server@4562e04d
14:43:45.693 [main] INFO c.i.rpc.container.HttpContainer - 容器启动~
14:43:45.693 [main] INFO org.mortbay.log - HelloInterface register

然后执行服务消费者的测试方法,服务消费者控制台:

五月 01, 2017 2:44:31 下午 org.springframework.test.context.TestContextManager retrieveTestExecutionListeners
信息: Could not instantiate TestExecutionListener class [org.springframework.test.context.web.ServletTestExecutionListener]. Specify custom listener classes or make the default listener classes (and their dependencies) available.
五月 01, 2017 2:44:31 下午 org.springframework.test.context.TestContextManager retrieveTestExecutionListeners
信息: Could not instantiate TestExecutionListener class [org.springframework.test.context.transaction.TransactionalTestExecutionListener]. Specify custom listener classes or make the default listener classes (and their dependencies) available.
五月 01, 2017 2:44:31 下午 org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
信息: Loading XML bean definitions from file [E:\workspace\rpcworkspace\demo-comsumer\target\classes\spring-context.xml]
五月 01, 2017 2:44:32 下午 org.springframework.context.support.GenericApplicationContext prepareRefresh
信息: Refreshing org.springframework.context.support.GenericApplicationContext@7946e1f4: startup date [Mon May 01 14:44:32 CST 2017]; root of context hierarchy
五月 01, 2017 2:44:32 下午 org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
信息: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@4f51b3e0: defining beans [refService,org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,consumerConfig,helloInterfaceInvoke,helloInterface,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor]; root of factory hierarchy
Hello 张三
五月 01, 2017 2:44:33 下午 org.springframework.context.support.GenericApplicationContext doClose
信息: Closing org.springframework.context.support.GenericApplicationContext@7946e1f4: startup date [Mon May 01 14:44:32 CST 2017]; root of context hierarchy

服务提供者这里输出了消费者请求的日志

14:44:33.117 [348984985@qtp-592983282-1 - /invoke] DEBUG org.mortbay.log - REQUEST /invoke on org.mortbay.jetty.HttpConnection@6e01b90
14:44:33.213 [348984985@qtp-592983282-1 - /invoke] DEBUG org.mortbay.log - RESPONSE /invoke 200

演示成功


注意


因为这里使用了fastjson,  而我们的Request里面有类信息,进行序列化和反序列的时候我们要在启动类增加参数

 -Dfastjson.parser.autoTypeSupport=true

其他解决方案看这里

https://github.com/alibaba/fastjson/wiki/enable_autotype


分布式学习笔记1通过Java自己实现简单的HTTP RPC框架


好了,这里的代码我会上传到我的github上面