使用RPC 调用NameNode中的方法

时间:2022-03-10 17:32:14

用户在Client 端是很难对 NameNode中的信息进行直接访问的,

所以 ,在Hadoop系统中为 Client端 提供了一系列的方法调用,这些方法调用是通过RPC 方法来实现的,

根据RPC 的访问机制,可以将代码分为两个部分 : Server 端 和 Client端。

Server 端 和 Client 端 共同拥有一个 interface : XXXProtocol

其中 ,Server 端 是对 XXXProtocol进行 implements

Client 端 是对XXXProtocol  以动态代理的方式进行调用的。

不过对于应用于实现RPC 调用的接口来说,都是要继承一个 VersionedProtocol 类的实例。

并且,在接口中定义好 final static long versionID 这个 ID值, 这个ID值 是用来 在RPC 初始化的时候 作为参数进行 传入的。

拿NameNodeRpcServer(org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer)为例,

在Hadoop系统中,它是作为远程调用RPC 的Server端的存在。 它implements NamenodeProtocols

而对于 NamenodeProtocols 这个接口来说, 它extends NameProtocol,ClientProtocol 等等一系列 遵照RPC 通信机制 而继承了

VersionedProtocol 这个接口的多个接口。

在本片文章中LZ 主要介绍一下,如何实现自己的RPC Client端 来通过RPC调用 NameNodeRpcServer

来获取正在运行的NameNode中的相关信息。

===========实现思路=====================

1. 遵照 RPC 通信原理 将代码分为 Client 端 和Server 端 ,

其中Client端 由自己编写 MyClient 来实现,

2. Server端 ,由可以获取 NameNode 运行信息的NameNodeRpcServer 充当。

3. 公共调用的接口 可以由 ClientProtocol 来充当。

============实现代码=========================

package myclient;

import org.apache.hadoop.hdfs.protocol.ClientProtocol ;
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.FileSystem ;
import org.apache.hadoop.ipc.RPC ;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.log4j.Level;
import org.apache.hadoop.net.NetUtils ;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode ; import com.google.common.collect.Maps; import java.net.InetSocketAddress;
import java.io.IOException ;
import java.util.EnumSet;
import java.net.URI;
import java.net.InetAddress; //method 2 :
import org.apache.hadoop.fs.FsServerDefaults;
import com.google.protobuf.Message; /**
*
* @author inuyasha1027
*/
public class MyClient
{
private ClientProtocol proxy ; public MyClient ()throws Exception
{
Configuration conf = new Configuration () ; InetSocketAddress addr = new InetSocketAddress("node0", 9000) ; System.out.println("address default:"+addr.getHostName()+" port: "+addr.getPort()); try
{
proxy = (ClientProtocol) RPC.waitForProxy(ClientProtocol.class,
ClientProtocol.versionID, addr, conf ) ; if (proxy != null)
System.out.println("now we start RPC service") ;
else
System.out.println("fail~") ;
}
catch (IOException e)
{
e.printStackTrace();
}
} public void createFile (String src)
{
EnumSetWritable<CreateFlag> flags = new EnumSetWritable(EnumSet.of(CreateFlag.OVERWRITE)) ;; FsPermission permission = new FsPermission ( FsAction.ALL, FsAction.ALL, FsAction.ALL) ;
try
{
proxy.create(src, permission, ClientProtocol.class.getName(), flags, false, (short)2, 4096);
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void clientGetFsServerDefaults() throws IOException
{
FsServerDefaults def = proxy.getServerDefaults();
System.out.println("blockSize"+def.getBlockSize());
}
public void closeRPC()
{
System.out.println("now RPC will be stopped") ;
RPC.stopProxy(proxy);
}
public static void main(String[] args) throws Exception
{
MyClient client = new MyClient () ;
String dst = new String("hdfs://192.168.163.100:9000/user") ;
client.createFile(dst);
client.clientGetFsServerDefaults() ;
client.closeRPC();
}
}

使用RPC 调用NameNode中的方法

create那个方法,实现起来有些复杂,如果是单纯的验证一下RPC 程序的调用的话, 调用ClientProtocol中的

getServerDefaults()

这个方法就可以,这个方法会调用NameNode 中的方法,返回一个FsServerDefault 对象,

通过的个实例对象 我们可以访问 或是查询一些 NameNode中的变量和属性值等等,

在这个例子中,LZ实现的是对BlockSize 大小 这一属性值的访问。

按照这个思路的话,其实可以自己实现接口,通过这种通信方式,

借助于NamenodeRpcServer 这个类对NameNode中的各个属性进行访问的。