Apache thrift RPC 双向通信

时间:2022-12-27 08:26:28

      在上一篇介绍Apache thrift 安装和使用,写了一个简单的demo,讲解thrift服务的发布和客户端调用,但只是单向的客户端发送消息,服务端接收消息。而客户端却得不到服务器的响应。

在不涉及语言平台的制约,WebService可胜任做这些服务端的处理。

     基于大部分业务需求,更需要服务端能够响应处理数据。下面我通过一个demo案例,介绍下Apache thrift 双向通信的使用。

一.首先我们还是需要安装好Apache thrift。这里不再赘述,戳这里查看我上篇文章的介绍:http://www.cnblogs.com/sumingk/articles/6073105.html

二.其次准备好thrift 所需的jar包:

    Apache thrift RPC 双向通信

三.新建一个Java web项目,编写thrift脚本,命名为student.thrift  如下:

namespace java com.zhj.student

typedef i32 int  
typedef i16 short
typedef i64 long

//Student Entity
struct Student { 
   1: string name
} 


service Zthrift { 
   oneway void send(1:Student msg)
}

四.执行student.thrift 文件,thrift  --gen java  student.thrift (该文件我还是放在c盘根目录下执行),随后生产gen-java文件,如下:

Apache thrift RPC 双向通信

五.将新生成的两文件拷入项目中,其中Student.java 是实体类,Zthrift.java是生成的类。

六.编写thrift服务端类。

package com.zhj.server;

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.zhj.student.Student;
import com.zhj.student.Zthrift;
import com.zhj.student.Zthrift.Iface;

public class ZServer {

        public static void main(String[] args){
            try {
                TServerSocket tServerSocket=new TServerSocket(9999);
                TThreadPoolServer.Args targs=new TThreadPoolServer.Args(tServerSocket);
                TBinaryProtocol.Factory factory=new TBinaryProtocol.Factory();
                //获取processFactory
                TProcessorFactory tProcessorFactory= getProcessorFactory();
                targs.protocolFactory(factory);
                targs.processorFactory(tProcessorFactory);
                TThreadPoolServer tThreadPoolServer=new TThreadPoolServer(targs);
                System.out.println("start server...");
                tThreadPoolServer.serve();
                
            } catch (TTransportException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
        
        /**
         * 内部类获取 getProcessorFactory
         * @return
         */
        public static int tt= 0;
        public static TProcessorFactory getProcessorFactory(){
            
            TProcessorFactory tProcessorFactory=new TProcessorFactory(null){
                public TProcessor getProcessor(final TTransport tTransport){
                    Thread thread = new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                            try {
                                
                                System.out.println("服务端休眠5秒后,执行响应......");
                                //延时五秒回复(延迟执行给客户端发送消息)
                                Thread.sleep(5000);
                                tt +=100;
                                System.out.println("延时五秒回复时,tt = " +tt);
                                 //这里可以把client提取作为成员变量来多次使用
                                Zthrift.Client client = new Zthrift.Client(new TBinaryProtocol(tTransport));
                                //给客户端响应消息
                                client.send(new Student("....test"));
                                
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            } catch (TException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    });
                    thread.start();
                    
                    return new Zthrift.Processor<Iface>(new Iface() {
                        
                        @Override
                        public void send(Student msg) throws TException {
                            // TODO Auto-generated method stub
                            tt+=10;
                            System.out.println("接收客户端消息时,tt = " +tt);
                            //接受客户端消息
                             System.out.println("....."+msg.toString());
                        }
                    });
                    
                }
            };
            return tProcessorFactory;
        }
}

此处,内部类使用比较频繁,阅读会有些困难。Zthrift,Processor构造方法需要传入一个Iface 接口,该接口有一个接收客户端的方法send(), msg 是一个Student对象。

 七.实现的客户端调用。如下:

package com.zhj.client;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;

import com.zhj.student.Student;
import com.zhj.student.Zthrift.Iface;
import com.zhj.student.Zthrift;

public class ZClient {

    public static void main(String[]args){
        final TSocket tSocket=new TSocket("127.0.0.1",9999);
        Zthrift.Client client=new Zthrift.Client(new TBinaryProtocol(tSocket));
        try {
            tSocket.open();
            runMethod(tSocket);
            //向服务端发送消息
            client.send(new Student("小明1"));
           
        } catch (TTransportException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public static void runMethod(final TSocket tSocket){
        Thread thread = new Thread(new Runnable() {
            
            @Override
            public void run() {
                Zthrift.Processor<Iface> mp = new Zthrift.Processor<Zthrift.Iface>(new Iface() {
                    
                    @Override
                    public void send(Student msg) throws TException {
                        // TODO Auto-generated method stub
                        Long start = System.currentTimeMillis();
                        try {
                            while(true){
                                //具体接收时间待定
                                if((System.currentTimeMillis()-start)>0.1*60*1000){
                                    System.out.println("响应消息超时...");
                                    break;
                                }
                                else {
                                    System.out.println("收到服务端响应消息: "+msg);
                                }
                                //休眠两秒
                                Thread.sleep(2000L);
                            }
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    
                });
                
                try {
                    while(mp.process(new TBinaryProtocol(tSocket), new TBinaryProtocol(tSocket))){
                        //阻塞式方法,不需要内容
                        System.out.println("走阻塞式方法");
                        //关闭tScoket
                        // tSocket.close();
                    }
                } catch (TException e) {
                    System.out.println("连接已断开...");
                    e.printStackTrace();
                }
            }
        });
        thread.start();
    }
}

在这里,我加入了一个超时响应的死循环,用于接收服务端返回的消息,控制台可以查看服务端给的响应消息。

八.运行服务端和客户端main方法,控制台打印如下:

Apache thrift RPC 双向通信

Apache thrift RPC 双向通信

代码阅读有些困难,有困难或不合理之处,请小伙伴指出。Thank you!