FileSystem实例化过程

时间:2023-08-13 10:05:49

HDFS案例代码

Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration); InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));
OutputStream out = new FileOutputStream(new File("log4j_download.properties"));
IOUtils.copyBytes(in, out, 4096, true); //最后一个参数表示完成拷贝之后关闭输入/出流

FileSystem.java

static final Cache CACHE = new Cache();

public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme(); //hdfs
String authority = uri.getAuthority(); //hadoop000:8020 return CACHE.get(uri, conf);
} FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
} private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
synchronized (this) {
fs = map.get(key);
} //根据URI取得一个FileSystem实例,如果允许缓存,会中从缓存中取出,否则将调用createFileSystem创建一个新实例
if (fs != null) {
return fs;
} fs = createFileSystem(uri, conf);
synchronized (this) {
FileSystem oldfs = map.get(key);
... //放入到CACHE中秋
return fs;
}
} private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf); //初始化DistributedFileSystem
return fs;
} public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) { //文件系统是否被加载过,刚开始时为false
loadFileSystems();
}
Class<? extends FileSystem> clazz = null;
if (conf != null) {
clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此时我们并没有在core-default.xml和core-site.xml中配置该属性
}
if (clazz == null) {
clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem
}
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + scheme);
}
return clazz;
} private static void loadFileSystems() {
synchronized (FileSystem.class) {
if (!FILE_SYSTEMS_LOADED) {
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
for (FileSystem fs : serviceLoader) {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
}
FILE_SYSTEMS_LOADED = true; //标识为已经从系统中加载过
}
}
}

loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:

file=class org.apache.hadoop.fs.LocalFileSystem,
ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem,
hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem,
hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem,
webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem,
s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem,
viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem,
swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem,
har=class org.apache.hadoop.fs.HarFileSystem,
s3=class org.apache.hadoop.fs.s3.S3FileSystem,
hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem

DistributedFileSystem.java

DFSClient dfs; //重点属性:客户端与服务端交互操作需要先拿到DFSClient

@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf); String host = uri.getHost(); //hadoop000 this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}

DFSClient.java

final ClientProtocol namenode; //重点属性:客户端与NameNode通信的PRC接口

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {

    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
}

NameNodeProxies.java

public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface);
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);
} public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
Text dtService = SecurityUtil.buildTokenService(nnAddr); T proxy;
if (xface == ClientProtocol.class) {
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
} ...
return new ProxyAndInfo<T>(proxy, dtService);
} private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException { //Client与NameNode的RPC交互接口
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
.getProxy(); if (withRetries) {
//使用jdk的动态代理创建实例
proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy);
}
return new ClientNamenodeProtocolTranslatorPB(proxy);
}

RetryProxy.java

public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
);
}

获取FileSystem实例源码分析总结:

1、FileSystem.get通过反射实例化了一个DistributedFileSystem;

2、DistributedFileSystem中new DFSCilent()把他作为自己的成员变量;

3、在DFSClient构造方法里面,调用了createProxy使用RPC机制得到了一个NameNode的代理对象,就可以和NameNode进行通信;

4、整个流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。