对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:
1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。
2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。
3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法。
4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。
5.其他的改造如:
1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问
2)Thrift通过两种方式调用服务Client和Iface
// *) Client API 调用
(EchoService.Client)client.echo("hello lilei"); ---(1)
// *) Service 接口 调用
(EchoService.Iface)service.echo("hello lilei"); ---(2)
Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。
下面我们来一一实现:
一、pom.xml引入依赖jar包
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.2</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.0.9.RELEASE</version>
</dependency> <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
二、使用zookeeper管理服务节点配置
RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。
注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.
Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构:
每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
1). PERSISTENT: 永久节点
2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
注: 临时节点不能成为父节点
Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
1). KeeperState: Disconnected,SyncConnected,Expired
2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
1). namespace: 命名空间,来区分不同应用
2). service: 服务接口, 采用发布方的类全名来表示
3). version: 版本号
借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
*) 数据模型的设计
具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.
1.定义Zookeeper的客户端的管理
ZookeeperFactory.Java
package cn.slimsmart.thrift.rpc.zookeeper; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.StringUtils; /**
* 获取zookeeper客户端链接
*/
public class ZookeeperFactory implements FactoryBean<CuratorFramework> { private String zkHosts;
// session超时
private int sessionTimeout = 30000;
private int connectionTimeout = 30000; // 共享一个zk链接
private boolean singleton = true; // 全局path前缀,常用来区分不同的应用
private String namespace; private final static String ROOT = "rpc"; private CuratorFramework zkClient; public void setZkHosts(String zkHosts) {
this.zkHosts = zkHosts;
} public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
} public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
} public void setSingleton(boolean singleton) {
this.singleton = singleton;
} public void setNamespace(String namespace) {
this.namespace = namespace;
} public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
} @Override
public CuratorFramework getObject() throws Exception {
if (singleton) {
if (zkClient == null) {
zkClient = create();
zkClient.start();
}
return zkClient;
}
return create();
} @Override
public Class<?> getObjectType() {
return CuratorFramework.class;
} @Override
public boolean isSingleton() {
return singleton;
} public CuratorFramework create() throws Exception {
if (StringUtils.isEmpty(namespace)) {
namespace = ROOT;
} else {
namespace = ROOT +"/"+ namespace;
}
return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
} public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
.canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.defaultData(null).build();
} public void close() {
if (zkClient != null) {
zkClient.close();
}
}
}
2.服务端注册服务
由于服务端配置需要获取本机的IP地址,因此定义IP获取接口
ThriftServerIpResolve.java
package cn.slimsmart.thrift.rpc.zookeeper; /**
*
* 解析thrift-server端IP地址,用于注册服务
* 1) 可以从一个物理机器或者虚机的特殊文件中解析
* 2) 可以获取指定网卡序号的Ip
* 3) 其他
*/
public interface ThriftServerIpResolve { String getServerIp() throws Exception; void reset(); //当IP变更时,将会调用reset方法
static interface IpRestCalllBack{
public void rest(String newIp);
}
}
可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
ThriftServerIpLocalNetworkResolve.java
package cn.slimsmart.thrift.rpc.zookeeper; import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* 解析网卡Ip
*
*/
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve { private Logger logger = LoggerFactory.getLogger(getClass()); //缓存
private String serverIp; public void setServerIp(String serverIp) {
this.serverIp = serverIp;
} @Override
public String getServerIp() {
if (serverIp != null) {
return serverIp;
}
// 一个主机有多个网络接口
try {
Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
while (netInterfaces.hasMoreElements()) {
NetworkInterface netInterface = netInterfaces.nextElement();
// 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if(address instanceof Inet6Address){
continue;
}
if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
serverIp = address.getHostAddress();
logger.info("resolve server ip :"+ serverIp);
continue;
}
}
}
} catch (SocketException e) {
e.printStackTrace();
}
return serverIp;
} @Override
public void reset() {
serverIp = null;
}
}
接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
ThriftServerAddressRegister.java
package cn.slimsmart.thrift.rpc.zookeeper; /**
* 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
*/
public interface ThriftServerAddressRegister {
/**
* 发布服务接口
* @param service 服务接口名称,一个产品中不能重复
* @param version 服务接口的版本号,默认1.0.0
* @param address 服务发布的地址和端口
*/
void register(String service,String version,String address);
}
实现:ThriftServerAddressRegisterZookeeper.java
package cn.slimsmart.thrift.rpc.zookeeper; import java.io.UnsupportedEncodingException; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils; import cn.slimsmart.thrift.rpc.ThriftException; /**
* 注册服务列表到Zookeeper
*/
public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{ private Logger logger = LoggerFactory.getLogger(getClass()); private CuratorFramework zkClient; public ThriftServerAddressRegisterZookeeper(){} public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
this.zkClient = zkClient;
} public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
} @Override
public void register(String service, String version, String address) {
if(zkClient.getState() == CuratorFrameworkState.LATENT){
zkClient.start();
}
if(StringUtils.isEmpty(version)){
version="1.0.0";
}
//临时节点
try {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/"+service+"/"+version+"/"+address);
} catch (UnsupportedEncodingException e) {
logger.error("register service address to zookeeper exception:{}",e);
throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
} catch (Exception e) {
logger.error("register service address to zookeeper exception:{}",e);
throw new ThriftException("register service address to zookeeper exception:{}", e);
}
} public void close(){
zkClient.close();
}
}
3.客户端发现服务
定义获取服务地址接口
ThriftServerAddressProvider.java
package cn.slimsmart.thrift.rpc.zookeeper; import java.net.InetSocketAddress;
import java.util.List; /**
* thrift server-service地址提供者,以便构建客户端连接池
*/
public interface ThriftServerAddressProvider { //获取服务名称
String getService(); /**
* 获取所有服务端地址
* @return
*/
List<InetSocketAddress> findServerAddressList(); /**
* 选取一个合适的address,可以随机获取等'
* 内部可以使用合适的算法.
* @return
*/
InetSocketAddress selector(); void close();
}
基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java
package cn.slimsmart.thrift.rpc.zookeeper; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; /**
* 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
*/
public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean { private Logger logger = LoggerFactory.getLogger(getClass()); // 注册服务
private String service;
// 服务版本号
private String version = "1.0.0"; private PathChildrenCache cachedPath; private CuratorFramework zkClient; // 用来保存当前provider所接触过的地址记录
// 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
private Set<String> trace = new HashSet<String>(); private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>(); private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>(); private Object lock = new Object(); // 默认权重
private static final Integer DEFAULT_WEIGHT = 1; public void setService(String service) {
this.service = service;
} public void setVersion(String version) {
this.version = version;
} public ThriftServerAddressProviderZookeeper() {
} public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
this.zkClient = zkClient;
} public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
} @Override
public void afterPropertiesSet() throws Exception {
// 如果zk尚未启动,则启动
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
buildPathChildrenCache(zkClient, getServicePath(), true);
cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
} private String getServicePath(){
return "/" + service + "/" + version;
}
private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
cachedPath = new PathChildrenCache(client, path, cacheData);
cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
logger.info("Connection is reconection.");
break;
case CONNECTION_SUSPENDED:
logger.info("Connection is suspended.");
break;
case CONNECTION_LOST:
logger.warn("Connection error,waiting...");
return;
default:
//
}
// 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
cachedPath.rebuild();
rebuild();
} protected void rebuild() throws Exception {
List<ChildData> children = cachedPath.getCurrentData();
if (children == null || children.isEmpty()) {
// 有可能所有的thrift server都与zookeeper断开了链接
// 但是,有可能,thrift client与thrift server之间的网络是良好的
// 因此此处是否需要清空container,是需要多方面考虑的.
container.clear();
logger.error("thrift server-cluster error....");
return;
}
List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
String path = null;
for (ChildData data : children) {
path = data.getPath();
logger.debug("get path:"+path);
path = path.substring(getServicePath().length()+1);
logger.debug("get serviceAddress:"+path);
String address = new String(path.getBytes(), "utf-8");
current.addAll(transfer(address));
trace.add(address);
}
Collections.shuffle(current);
synchronized (lock) {
container.clear();
container.addAll(current);
inner.clear();
inner.addAll(current); }
}
});
} private List<InetSocketAddress> transfer(String address) {
String[] hostname = address.split(":");
Integer weight = DEFAULT_WEIGHT;
if (hostname.length == 3) {
weight = Integer.valueOf(hostname[2]);
}
String ip = hostname[0];
Integer port = Integer.valueOf(hostname[1]);
List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
// 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
for (int i = 0; i < weight; i++) {
result.add(new InetSocketAddress(ip, port));
}
return result;
} @Override
public List<InetSocketAddress> findServerAddressList() {
return Collections.unmodifiableList(container);
} @Override
public synchronized InetSocketAddress selector() {
if (inner.isEmpty()) {
if (!container.isEmpty()) {
inner.addAll(container);
} else if (!trace.isEmpty()) {
synchronized (lock) {
for (String hostname : trace) {
container.addAll(transfer(hostname));
}
Collections.shuffle(container);
inner.addAll(container);
}
}
}
return inner.poll();
} @Override
public void close() {
try {
cachedPath.close();
zkClient.close();
} catch (Exception e) {
}
} @Override
public String getService() {
return service;
} }
对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java
三、服务端服务注册实现
ThriftServiceServerFactory.java
package cn.slimsmart.thrift.rpc; import java.lang.instrument.IllegalClassFormatException;
import java.lang.reflect.Constructor; import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.StringUtils; import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve; /**
* 服务端注册服务工厂
*/
public class ThriftServiceServerFactory implements InitializingBean {
// 服务注册本机端口
private Integer port = 8299;
// 优先级
private Integer weight = 1;// default
// 服务实现类
private Object service;// serice实现类
//服务版本号
private String version;
// 解析本机IP
private ThriftServerIpResolve thriftServerIpResolve;
//服务注册
private ThriftServerAddressRegister thriftServerAddressRegister; private ServerThread serverThread; public void setPort(Integer port) {
this.port = port;
} public void setWeight(Integer weight) {
this.weight = weight;
} public void setService(Object service) {
this.service = service;
} public void setVersion(String version) {
this.version = version;
} public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
this.thriftServerIpResolve = thriftServerIpResolve;
} public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
this.thriftServerAddressRegister = thriftServerAddressRegister;
} @Override
public void afterPropertiesSet() throws Exception {
if (thriftServerIpResolve == null) {
thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
}
String serverIP = thriftServerIpResolve.getServerIp();
if (StringUtils.isEmpty(serverIP)) {
throw new ThriftException("cant find server ip...");
} String hostname = serverIP + ":" + port + ":" + weight;
Class<?> serviceClass = service.getClass();
// 获取实现类接口
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
TProcessor processor = null;
String serviceName = null;
for (Class<?> clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
serviceName = clazz.getEnclosingClass().getName();
String pname = serviceName + "$Processor";
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> pclass = classLoader.loadClass(pname);
if (!TProcessor.class.isAssignableFrom(pclass)) {
continue;
}
Constructor<?> constructor = pclass.getConstructor(clazz);
processor = (TProcessor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要单独的线程,因为serve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// 注册服务
if (thriftServerAddressRegister != null) {
thriftServerAddressRegister.register(serviceName, version, hostname);
} }
class ServerThread extends Thread {
private TServer server;
ServerThread(TProcessor processor, int port) throws Exception {
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
TProcessorFactory processorFactory = new TProcessorFactory(processor);
tArgs.processorFactory(processorFactory);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
server = new TThreadedSelectorServer(tArgs);
} @Override
public void run(){
try{
//启动服务
server.serve();
}catch(Exception e){
//
}
} public void stopServer(){
server.stop();
}
} public void close() {
serverThread.stopServer();
}
}
四、客户端获取服务代理及连接池实现
客户端连接池实现:ThriftClientPoolFactory.java
package cn.slimsmart.thrift.rpc; import java.net.InetSocketAddress; import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport; import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider; /**
* 连接池,thrift-client for spring
*/
public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> { private final ThriftServerAddressProvider serverAddressProvider;
private final TServiceClientFactory<TServiceClient> clientFactory;
private PoolOperationCallBack callback; protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
this.serverAddressProvider = addressProvider;
this.clientFactory = clientFactory;
} protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
PoolOperationCallBack callback) throws Exception {
this.serverAddressProvider = addressProvider;
this.clientFactory = clientFactory;
this.callback = callback;
} static interface PoolOperationCallBack {
// 销毁client之前执行
void destroy(TServiceClient client); // 创建成功是执行
void make(TServiceClient client);
} public void destroyObject(TServiceClient client) throws Exception {
if (callback != null) {
try {
callback.destroy(client);
} catch (Exception e) {
//
}
}
TTransport pin = client.getInputProtocol().getTransport();
pin.close();
} public boolean validateObject(TServiceClient client) {
TTransport pin = client.getInputProtocol().getTransport();
return pin.isOpen();
} @Override
public TServiceClient makeObject() throws Exception {
InetSocketAddress address = serverAddressProvider.selector();
TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
TTransport transport = new TFramedTransport(tsocket);
TProtocol protocol = new TBinaryProtocol(transport);
TServiceClient client = this.clientFactory.getClient(protocol);
transport.open();
if (callback != null) {
try {
callback.make(client);
} catch (Exception e) {
//
}
}
return client;
} }
客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java
package cn.slimsmart.thrift.rpc; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean; import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider; /**
* 客户端代理
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean { private Integer maxActive = 32;// 最大活跃连接数 // ms,default 3 min,链接空闲时间
// -1,关闭空闲检测
private Integer idleTime = 180000;
private ThriftServerAddressProvider serverAddressProvider; private Object proxyClient;
private Class<?> objectClass; private GenericObjectPool<TServiceClient> pool; private PoolOperationCallBack callback = new PoolOperationCallBack() {
@Override
public void make(TServiceClient client) {
System.out.println("create");
} @Override
public void destroy(TServiceClient client) {
System.out.println("destroy");
}
}; public void setMaxActive(Integer maxActive) {
this.maxActive = maxActive;
} public void setIdleTime(Integer idleTime) {
this.idleTime = idleTime;
} public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
this.serverAddressProvider = serverAddressProvider;
} @Override
public void afterPropertiesSet() throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 加载Iface接口
objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
// 加载Client.Factory类
Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
poolConfig.maxActive = maxActive;
poolConfig.minIdle = 0;
poolConfig.minEvictableIdleTimeMillis = idleTime;
poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//
TServiceClient client = pool.borrowObject();
try {
return method.invoke(client, args);
} catch (Exception e) {
throw e;
} finally {
pool.returnObject(client);
}
}
});
} @Override
public Object getObject() throws Exception {
return proxyClient;
} @Override
public Class<?> getObjectType() {
return objectClass;
} @Override
public boolean isSingleton() {
return true;
} public void close() {
if (serverAddressProvider != null) {
serverAddressProvider.close();
}
}
}
下面我们看一下服务端和客户端的配置;
服务端spring-context-thrift-server.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
default-lazy-init="false"> <!-- zookeeper -->
<bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
destroy-method="close">
<property name="zkHosts"
value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
<property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
<property name="connectionTimeout" value="3000" />
<property name="sessionTimeout" value="3000" />
<property name="singleton" value="true" />
</bean>
<bean id="sericeAddressRegister"
class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"
destroy-method="close">
<property name="zkClient" ref="thriftZookeeper" />
</bean>
<bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" /> <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
destroy-method="close">
<property name="service" ref="echoSerivceImpl" />
<property name="port" value="9000" />
<property name="version" value="1.0.0" />
<property name="weight" value="1" />
<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
</bean> <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
destroy-method="close">
<property name="service" ref="echoSerivceImpl" />
<property name="port" value="9001" />
<property name="version" value="1.0.0" />
<property name="weight" value="1" />
<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
</bean> <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
destroy-method="close">
<property name="service" ref="echoSerivceImpl" />
<property name="port" value="9002" />
<property name="version" value="1.0.0" />
<property name="weight" value="1" />
<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
</bean>
</beans>
客户端:spring-context-thrift-client.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
default-lazy-init="false"> <!-- fixedAddress -->
<!--
<bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">
<property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
<property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />
</bean>
<bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">
<property name="maxActive" value="5" />
<property name="idleTime" value="10000" />
<property name="serverAddressProvider" ref="fixedAddressProvider" />
</bean>
-->
<!-- zookeeper -->
<bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
destroy-method="close">
<property name="zkHosts"
value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
<property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
<property name="connectionTimeout" value="3000" />
<property name="sessionTimeout" value="3000" />
<property name="singleton" value="true" />
</bean>
<bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">
<property name="maxActive" value="5" />
<property name="idleTime" value="1800000" />
<property name="serverAddressProvider">
<bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
<property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
<property name="version" value="1.0.0" />
<property name="zkClient" ref="thriftZookeeper" />
</bean>
</property>
</bean>
</beans>
运行服务端后,我们可以看见zookeeper注册了多个服务地址。