Thrift学习笔记(5)--Thrift 半同步半异步的服务端模型

时间:2022-08-28 23:30:04

Thrift 半同步半异步的服务端模型

1、创建服务端

package com.thriftServer;

import com.service.demo.Hello;
import com.service.demo.impl.HelloServiceImpl;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;

/**
* 注册服务端 半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。 THsHaServer
* 非阻塞
*/

public class HelloTHsHaServer {
/**
* 启动 Thrift 服务器
* @param args
*/

public static void main(String[] args) {
try {
// 设置服务端口为 7911 // 阻塞IO
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(7911);
// 设置协议工厂为 TBinaryProtocol.Factory
// 关联处理器与 Hello 服务的实现
TProcessor tprocessor = new Hello.Processor(new HelloServiceImpl());
//线程池服务模型参数,使用标准的阻塞式IO,预先创建一组线程处理请求。//多线程服务模型
//半同步半异步
THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.transportFactory(new TFramedTransport.Factory());
//二进制协议
tArgs.protocolFactory(new TBinaryProtocol.Factory());
// 半同步半异步的服务模型
TServer server = new THsHaServer(tArgs);
System.out.println("Start server on port 7911...");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}

2、创建客户端

package com.thriftClient;

import com.service.demo.Hello;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Created by ssjk on 2016/10/21.
*/

public class HelloAsyncClient {
public static final String SERVER_IP = "127.0.0.1";
public static final int SERVER_PORT = 7911;
public static final int TIMEOUT = 30000;

public static void main(String[] args) throws TException, IOException, InterruptedException {
//异步调用管理器
TAsyncClientManager clientManager = new TAsyncClientManager();
//设置传输通道,调用非阻塞IO
TNonblockingTransport transport = new TNonblockingSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
// 协议要和服务端一致
//TProtocolFactory tprotocol = new TCompactProtocol.Factory();
TProtocolFactory tprotocol = new TBinaryProtocol.Factory();

Hello.AsyncClient asyncClient = new Hello.AsyncClient(tprotocol,clientManager,transport);
CountDownLatch latch = new CountDownLatch(1);
AsynCallback callBack = new AsynCallback(latch);
System.out.println("call method sayHello start ...");
// 调用服务
asyncClient.helloString("你好",callBack);
System.out.println("call method sayHello .... end");
//等待完成异步调用
boolean wait = latch.await(30, TimeUnit.SECONDS);
System.out.println("latch.await =:" + wait);
}
}
class AsynCallback implements AsyncMethodCallback<Hello.AsyncClient.helloString_call>{
private CountDownLatch latch;

public AsynCallback(CountDownLatch latch) {
this.latch = latch;
}


@Override
public void onComplete(Hello.AsyncClient.helloString_call helloString_call) {
System.out.println("onComplete");
try {
String result = helloString_call.getResult();
System.out.println("AsynCall result :" + result);
}catch (Exception e){
e.printStackTrace();
}finally {
latch.countDown();
}
}

@Override
public void onError(Exception exception) {
System.out.println("onError :" + exception.getMessage());
latch.countDown();
}
}