首页

关于flink源码包中分布式hdfs、s3文件系统FileSystem源码对比分析说明

标签:FileSystem,文件系统,分布式hdfs、s3     发布时间:2018-05-02   

一、前言

关于flink源码包中的org.apache.flink.runtime.fs.hdfs.DistributedFileSystem、org.apache.flink.runtime.fs.hdfs.S3FileSystem分布式文件系统管理类,分别基于org.apache.flink.core.fs.FileSystem(参见flink-core部分)定义的统一文件系统标准类进行管理,详情参见源码部分。

二、源码说明1.FileSystem接口

package org.apache.flink.core.fs;@b@@b@import java.io.FileNotFoundException;@b@import java.io.IOException;@b@import java.net.URI;@b@import java.net.URISyntaxException;@b@import java.util.HashMap;@b@import java.util.Map;@b@import org.apache.flink.util.ClassUtils;@b@import org.apache.flink.util.StringUtils;@b@@b@public abstract class FileSystem@b@{@b@  private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";@b@  private static final String HADOOP_DISTRIBUTED_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem";@b@  private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";@b@  private static final String S3_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.s3.S3FileSystem";@b@  private static final Object SYNCHRONIZATION_OBJECT = new Object();@b@  private static final Map<FSKey, FileSystem> CACHE = new HashMap();@b@  private static final Map<String, String> FSDIRECTORY = new HashMap();@b@@b@  public static FileSystem getLocalFileSystem()@b@    throws IOException@b@  {@b@    URI localUri;@b@    try@b@    {@b@      localUri = new URI("file:///");@b@    } catch (URISyntaxException e) {@b@      throw new IOException("Cannot create URI for local file system");@b@    }@b@@b@    return get(localUri);@b@  }@b@@b@  public static FileSystem get(URI uri)@b@    throws IOException@b@  {@b@    FileSystem fs = null;@b@@b@    synchronized (SYNCHRONIZATION_OBJECT)@b@    {@b@      if (uri.getScheme() == null)@b@        try {@b@          uri = new URI("file", null, uri.getPath(), null);@b@        }@b@        catch (URISyntaxException e)@b@        {@b@          throw new IOException("FileSystem: Scheme is null. file:// or hdfs:// are example schemes. Failed for " + uri@b@            .toString() + ".");@b@        }@b@@b@@b@      FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());@b@@b@      if (!(CACHE.containsKey(key))) break label113;@b@      return ((FileSystem)CACHE.get(key));@b@@b@      label113: if (FSDIRECTORY.containsKey(uri.getScheme()))@b@        break label175;@b@      throw new IOException("No file system found with scheme " + uri.getScheme() + ". Failed for " + uri@b@        .toString() + ".");@b@@b@      label175: Class fsClass = null;@b@      try {@b@        fsClass = ClassUtils.getFileSystemByName((String)FSDIRECTORY.get(uri.getScheme()));@b@      } catch (ClassNotFoundException e1) {@b@        throw new IOException(StringUtils.stringifyException(e1));@b@      }@b@      try@b@      {@b@        fs = (FileSystem)fsClass.newInstance();@b@      }@b@      catch (InstantiationException e) {@b@        throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);@b@      }@b@      catch (IllegalAccessException e) {@b@        throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);@b@      }@b@@b@      fs.initialize(uri);@b@@b@      CACHE.put(key, fs);@b@    }@b@@b@    return fs;@b@  }@b@@b@  public abstract Path getWorkingDirectory();@b@@b@  public abstract URI getUri();@b@@b@  public abstract void initialize(URI paramURI)@b@    throws IOException;@b@@b@  public abstract FileStatus getFileStatus(Path paramPath)@b@    throws IOException;@b@@b@  public abstract BlockLocation[] getFileBlockLocations(FileStatus paramFileStatus, long paramLong1, long paramLong2)@b@    throws IOException;@b@@b@  public abstract FSDataInputStream open(Path paramPath, int paramInt)@b@    throws IOException;@b@@b@  public abstract FSDataInputStream open(Path paramPath)@b@    throws IOException;@b@@b@  public long getDefaultBlockSize()@b@  {@b@    return 33554432L;@b@  }@b@@b@  public abstract FileStatus[] listStatus(Path paramPath)@b@    throws IOException;@b@@b@  public boolean exists(Path f)@b@    throws IOException@b@  {@b@    try@b@    {@b@      return (getFileStatus(f) != null); } catch (FileNotFoundException e) {@b@    }@b@    return false;@b@  }@b@@b@  public abstract boolean delete(Path paramPath, boolean paramBoolean)@b@    throws IOException;@b@@b@  public abstract boolean mkdirs(Path paramPath)@b@    throws IOException;@b@@b@  public abstract FSDataOutputStream create(Path paramPath, boolean paramBoolean, int paramInt, short paramShort, long paramLong)@b@    throws IOException;@b@@b@  public abstract FSDataOutputStream create(Path paramPath, boolean paramBoolean)@b@    throws IOException;@b@@b@  public abstract boolean rename(Path paramPath1, Path paramPath2)@b@    throws IOException;@b@@b@  public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory)@b@    throws IOException@b@  {@b@    if (isDistributedFS()) {@b@      return false;@b@    }@b@@b@    FileStatus status = null;@b@    try {@b@      status = getFileStatus(outPath);@b@    }@b@    catch (FileNotFoundException e)@b@    {@b@    }@b@@b@    if (status != null)@b@    {@b@      switch (1.$SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode[writeMode.ordinal()])@b@      {@b@      case 1:@b@        if ((status.isDir()) && (createDirectory)) {@b@          return true;@b@        }@b@@b@        throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + WriteMode.NO_OVERWRITE@b@          .name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");@b@      case 2:@b@        if (status.isDir()) {@b@          if (createDirectory)@b@          {@b@            return true;@b@          }@b@          try@b@          {@b@            delete(outPath, true);@b@          }@b@          catch (IOException ioe)@b@          {@b@          }@b@        }@b@        else@b@        {@b@          try@b@          {@b@            delete(outPath, false);@b@          }@b@          catch (IOException ioe)@b@          {@b@          }@b@        }@b@@b@        break;@b@      default:@b@        throw new IllegalArgumentException("Invalid write mode: " + writeMode);@b@      }@b@    }@b@@b@    if (createDirectory)@b@    {@b@      try {@b@        if (!(exists(outPath))) {@b@          mkdirs(outPath);@b@        }@b@@b@      }@b@      catch (IOException ioe)@b@      {@b@      }@b@@b@      try@b@      {@b@        FileStatus check = getFileStatus(outPath);@b@        return check.isDir();@b@      } catch (FileNotFoundException check) {@b@        return false;@b@      }@b@    }@b@@b@    return (!(exists(outPath)));@b@  }@b@@b@  public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory)@b@    throws IOException@b@  {@b@    if (!(isDistributedFS())) {@b@      return false;@b@    }@b@@b@    if (exists(outPath))@b@    {@b@      switch (1.$SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode[writeMode.ordinal()])@b@      {@b@      case 1:@b@        throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + WriteMode.NO_OVERWRITE@b@          .name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");@b@      case 2:@b@        try@b@        {@b@          delete(outPath, true);@b@        }@b@        catch (IOException ioe)@b@        {@b@        }@b@@b@        break;@b@      default:@b@        throw new IllegalArgumentException("Invalid write mode: " + writeMode);@b@      }@b@    }@b@@b@    if (createDirectory)@b@    {@b@      try {@b@        if (!(exists(outPath))) {@b@          mkdirs(outPath);@b@        }@b@@b@      }@b@      catch (IOException ioe)@b@      {@b@      }@b@@b@      return ((exists(outPath)) && (getFileStatus(outPath).isDir()));@b@    }@b@@b@    return (!(exists(outPath)));@b@  }@b@@b@  public abstract boolean isDistributedFS();@b@@b@  public int getNumberOfBlocks(FileStatus file)@b@    throws IOException@b@  {@b@    int numberOfBlocks = 0;@b@@b@    if (file == null) {@b@      return 0;@b@    }@b@@b@    if (!(file.isDir())) {@b@      return getNumberOfBlocks(file.getLen(), file.getBlockSize());@b@    }@b@@b@    FileStatus[] files = listStatus(file.getPath());@b@    for (int i = 0; i < files.length; ++i)@b@    {@b@      if (!(files[i].isDir()))@b@        numberOfBlocks += getNumberOfBlocks(files[i].getLen(), files[i].getBlockSize());@b@@b@    }@b@@b@    return numberOfBlocks;@b@  }@b@@b@  private int getNumberOfBlocks(long length, long blocksize)@b@  {@b@    if (blocksize != 0L)@b@    {@b@      int numberOfBlocks = (int)(length / blocksize);@b@@b@      if (length % blocksize != 0L) {@b@        ++numberOfBlocks;@b@      }@b@@b@      return numberOfBlocks;@b@    }@b@    return 1;@b@  }@b@@b@  static@b@  {@b@    FSDIRECTORY.put("hdfs", "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem");@b@    FSDIRECTORY.put("maprfs", "org.apache.flink.runtime.fs.maprfs.MapRFileSystem");@b@    FSDIRECTORY.put("file", "org.apache.flink.core.fs.local.LocalFileSystem");@b@    FSDIRECTORY.put("s3", "org.apache.flink.runtime.fs.s3.S3FileSystem");@b@  }@b@@b@  public static class FSKey@b@  {@b@    private String scheme;@b@    private String authority;@b@@b@    public FSKey(String scheme, String authority)@b@    {@b@      this.scheme = scheme;@b@      this.authority = authority;@b@    }@b@@b@    public boolean equals(Object obj)@b@    {@b@      if (obj instanceof FSKey) {@b@        FSKey key = (FSKey)obj;@b@@b@        if (!(this.scheme.equals(key.scheme))) {@b@          return false;@b@        }@b@@b@        if ((this.authority == null) || (key.authority == null))@b@        {@b@          return ((this.authority == null) && (key.authority == null));@b@        }@b@@b@        return (this.authority.equals(key.authority));@b@      }@b@@b@      return false;@b@    }@b@@b@    public int hashCode()@b@    {@b@      if (this.scheme != null) {@b@        return this.scheme.hashCode();@b@      }@b@@b@      if (this.authority != null) {@b@        return this.authority.hashCode();@b@      }@b@@b@      return super.hashCode();@b@    }@b@  }@b@@b@  public static enum WriteMode@b@  {@b@    NO_OVERWRITE, OVERWRITE;@b@  }@b@}

2.DistributedFileSystem实现类

package org.apache.flink.runtime.fs.hdfs;@b@@b@import java.io.File;@b@import java.io.IOException;@b@import java.lang.reflect.Method;@b@import java.net.URI;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.configuration.GlobalConfiguration;@b@import org.apache.flink.core.fs.FSDataOutputStream;@b@import org.apache.flink.util.InstantiationUtil;@b@import org.apache.hadoop.conf.Configuration;@b@import org.apache.hadoop.fs.FileSystem;@b@@b@public final class DistributedFileSystem extends org.apache.flink.core.fs.FileSystem@b@{@b@  private static final Log LOG = LogFactory.getLog(DistributedFileSystem.class);@b@  private static final String DEFAULT_HDFS_CLASS = "org.apache.hadoop.hdfs.DistributedFileSystem";@b@  private static final String HDFS_IMPLEMENTATION_KEY = "fs.hdfs.impl";@b@  private final Configuration conf;@b@  private final FileSystem fs;@b@@b@  public DistributedFileSystem()@b@    throws IOException@b@  {@b@    this.conf = getHadoopConfiguration();@b@@b@    Class fsClass = null;@b@@b@    LOG.debug("Trying to load HDFS class Hadoop 2.x style.");@b@@b@    Object fsHandle = null;@b@    try {@b@      Method newApi = FileSystem.class.getMethod("getFileSystemClass", new Class[] { String.class, Configuration.class });@b@      fsHandle = newApi.invoke(null, new Object[] { "hdfs", this.conf });@b@    }@b@    catch (Exception e)@b@    {@b@    }@b@@b@    if (fsHandle != null) {@b@      if ((fsHandle instanceof Class) && (FileSystem.class.isAssignableFrom((Class)fsHandle))) {@b@        fsClass = ((Class)fsHandle).asSubclass(FileSystem.class);@b@@b@        if (LOG.isDebugEnabled())@b@          LOG.debug(new StringBuilder().append("Loaded '").append(fsClass.getName()).append("' as HDFS class.").toString());@b@      }@b@      else@b@      {@b@        LOG.debug("Unexpected return type from 'org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, Configuration)'.");@b@        throw new RuntimeException("The value returned from org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, Configuration) is not a valid subclass of org.apache.hadoop.fs.FileSystem.");@b@      }@b@@b@    }@b@@b@    if (fsClass == null)@b@    {@b@      if (LOG.isDebugEnabled()) {@b@        LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry 'fs.hdfs.impl'.");@b@      }@b@@b@      Class classFromConfig = this.conf.getClass("fs.hdfs.impl", null);@b@@b@      if (classFromConfig != null)@b@      {@b@        if (FileSystem.class.isAssignableFrom(classFromConfig)) {@b@          fsClass = classFromConfig.asSubclass(FileSystem.class);@b@@b@          if (!(LOG.isDebugEnabled())) break label432;@b@          LOG.debug(new StringBuilder().append("Loaded HDFS class '").append(fsClass.getName()).append("' as specified in configuration.").toString()); break label432:@b@        }@b@@b@        if (LOG.isDebugEnabled()) {@b@          LOG.debug("HDFS class specified by fs.hdfs.impl is of wrong type.");@b@        }@b@@b@        throw new IOException("HDFS class specified by fs.hdfs.impl cannot be cast to a FileSystem type.");@b@      }@b@@b@      if (LOG.isDebugEnabled())@b@        LOG.debug("Trying to load default HDFS implementation org.apache.hadoop.hdfs.DistributedFileSystem");@b@@b@      try@b@      {@b@        Class reflectedClass = Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");@b@        if (FileSystem.class.isAssignableFrom(reflectedClass)) {@b@          fsClass = reflectedClass.asSubclass(FileSystem.class);@b@        } else {@b@          if (LOG.isDebugEnabled()) {@b@            LOG.debug("Default HDFS class is of wrong type.");@b@          }@b@@b@          throw new IOException("The default HDFS class 'org.apache.hadoop.hdfs.DistributedFileSystem' cannot be cast to a FileSystem type.");@b@        }@b@      }@b@      catch (ClassNotFoundException e)@b@      {@b@        if (LOG.isDebugEnabled()) {@b@          LOG.debug("Default HDFS class cannot be loaded.");@b@        }@b@@b@        throw new IOException("No HDFS class has been configured and the default class 'org.apache.hadoop.hdfs.DistributedFileSystem' cannot be loaded.");@b@      }@b@@b@    }@b@@b@    label432: this.fs = instantiateFileSystem(fsClass);@b@  }@b@@b@  public static Configuration getHadoopConfiguration()@b@  {@b@    Configuration retConf = new Configuration();@b@@b@    String hdfsDefaultPath = GlobalConfiguration.getString("fs.hdfs.hdfsdefault", null);@b@    if (hdfsDefaultPath != null)@b@      retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));@b@    else {@b@      LOG.debug("Cannot find hdfs-default configuration file");@b@    }@b@@b@    String hdfsSitePath = GlobalConfiguration.getString("fs.hdfs.hdfssite", null);@b@    if (hdfsSitePath != null)@b@      retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));@b@    else {@b@      LOG.debug("Cannot find hdfs-site configuration file");@b@    }@b@@b@    String[] possibleHadoopConfPaths = new String[4];@b@    possibleHadoopConfPaths[0] = GlobalConfiguration.getString("fs.hdfs.hadoopconf", null);@b@    possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");@b@@b@    if (System.getenv("HADOOP_HOME") != null) {@b@      possibleHadoopConfPaths[2] = new StringBuilder().append(System.getenv("HADOOP_HOME")).append("/conf").toString();@b@      possibleHadoopConfPaths[3] = new StringBuilder().append(System.getenv("HADOOP_HOME")).append("/etc/hadoop").toString();@b@    }@b@@b@    for (int i = 0; i < possibleHadoopConfPaths.length; ++i) {@b@      if (possibleHadoopConfPaths[i] == null) {@b@        break label427:@b@      }@b@@b@      if (new File(possibleHadoopConfPaths[i]).exists()) {@b@        if (new File(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/core-site.xml").toString()).exists()) {@b@          retConf.addResource(new org.apache.hadoop.fs.Path(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/core-site.xml").toString()));@b@@b@          if (LOG.isDebugEnabled())@b@            LOG.debug(new StringBuilder().append("Adding ").append(possibleHadoopConfPaths[i]).append("/core-site.xml to hadoop configuration").toString());@b@        }@b@@b@        if (new File(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/hdfs-site.xml").toString()).exists()) {@b@          retConf.addResource(new org.apache.hadoop.fs.Path(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/hdfs-site.xml").toString()));@b@@b@          if (LOG.isDebugEnabled())@b@            LOG.debug(new StringBuilder().append("Adding ").append(possibleHadoopConfPaths[i]).append("/hdfs-site.xml to hadoop configuration").toString());@b@        }@b@      }@b@    }@b@@b@    label427: return retConf;@b@  }@b@@b@  private FileSystem instantiateFileSystem(Class<? extends FileSystem> fsClass) throws IOException@b@  {@b@    try@b@    {@b@      return ((FileSystem)fsClass.newInstance());@b@    }@b@    catch (ExceptionInInitializerError e) {@b@      throw new IOException(new StringBuilder().append("The filesystem class '").append(fsClass.getName()).append("' throw an exception upon initialization.").toString(), e.getException());@b@    }@b@    catch (Throwable t) {@b@      String errorMessage = InstantiationUtil.checkForInstantiationError(fsClass);@b@      if (errorMessage != null)@b@        throw new IOException(new StringBuilder().append("The filesystem class '").append(fsClass.getName()).append("' cannot be instantiated: ").append(errorMessage).toString());@b@@b@      throw new IOException(new StringBuilder().append("An error occurred while instantiating the filesystem class '").append(fsClass.getName()).append("'.").toString(), t);@b@    }@b@  }@b@@b@  public org.apache.flink.core.fs.Path getWorkingDirectory()@b@  {@b@    return new org.apache.flink.core.fs.Path(this.fs.getWorkingDirectory().toUri());@b@  }@b@@b@  public URI getUri()@b@  {@b@    return this.fs.getUri();@b@  }@b@@b@  public void initialize(URI path)@b@    throws IOException@b@  {@b@    if (path.getAuthority() == null)@b@    {@b@      String configEntry = this.conf.get("fs.default.name", null);@b@      if (configEntry == null)@b@      {@b@        configEntry = this.conf.get("fs.defaultFS", null);@b@      }@b@@b@      if (LOG.isDebugEnabled()) {@b@        LOG.debug(new StringBuilder().append("fs.defaultFS is set to ").append(configEntry).toString());@b@      }@b@@b@      if (configEntry == null)@b@        throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("Either no default hdfs configuration was registered, ").append("or that configuration did not contain an entry for the default hdfs.").toString());@b@@b@      try@b@      {@b@        URI initURI = URI.create(configEntry);@b@@b@        if (initURI.getAuthority() == null)@b@          throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("Either no default hdfs configuration was registered, ").append("or the provided configuration contains no valid hdfs namenode address (fs.default.name or fs.defaultFS) describing the hdfs namenode host and port.").toString());@b@@b@        if (!(initURI.getScheme().equalsIgnoreCase("hdfs")))@b@          throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("Either no default hdfs configuration was registered, ").append("or the provided configuration describes a file system with scheme '").append(initURI.getScheme()).append("' other than the Hadoop Distributed File System (HDFS).").toString());@b@@b@        try@b@        {@b@          this.fs.initialize(initURI, this.conf);@b@        }@b@        catch (Exception e) {@b@          throw new IOException(".", e);@b@        }@b@@b@      }@b@      catch (IllegalArgumentException e)@b@      {@b@        throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("The configuration contains an invalid hdfs default name (fs.default.name or fs.defaultFS): ").append(configEntry).toString());@b@      }@b@    }@b@    else@b@    {@b@      try@b@      {@b@        this.fs.initialize(path, this.conf);@b@      }@b@      catch (Exception e) {@b@        throw new IOException(new StringBuilder().append("The given file URI (").append(path.toString()).append(") described the host and port of an HDFS Namenode, but the File System could not be initialized with that address").append(".").toString(), e);@b@      }@b@    }@b@  }@b@@b@  private static final String getMissingAuthorityErrorPrefix(URI path)@b@  {@b@    return new StringBuilder().append("The given HDFS file URI (").append(path.toString()).append(") did not describe the HDFS Namenode.").append(" The attempt to use a default HDFS configuration, as specified in the '").append("fs.hdfs.hdfsdefault").append("' or '").append("fs.hdfs.hdfssite").append("' config parameter failed due to the following problem: ").toString();@b@  }@b@@b@  public org.apache.flink.core.fs.FileStatus getFileStatus(org.apache.flink.core.fs.Path f)@b@    throws IOException@b@  {@b@    org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));@b@    return new DistributedFileStatus(status);@b@  }@b@@b@  public org.apache.flink.core.fs.BlockLocation[] getFileBlockLocations(org.apache.flink.core.fs.FileStatus file, long start, long len)@b@    throws IOException@b@  {@b@    if (!(file instanceof DistributedFileStatus)) {@b@      throw new IOException("file is not an instance of DistributedFileStatus");@b@    }@b@@b@    DistributedFileStatus f = (DistributedFileStatus)file;@b@@b@    org.apache.hadoop.fs.BlockLocation[] blkLocations = this.fs.getFileBlockLocations(f.getInternalFileStatus(), start, len);@b@@b@    DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length];@b@    for (int i = 0; i < distBlkLocations.length; ++i) {@b@      distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]);@b@    }@b@@b@    return distBlkLocations;@b@  }@b@@b@  public org.apache.flink.core.fs.FSDataInputStream open(org.apache.flink.core.fs.Path f, int bufferSize)@b@    throws IOException@b@  {@b@    org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(new org.apache.hadoop.fs.Path(f.toString()), bufferSize);@b@@b@    return new DistributedDataInputStream(fdis);@b@  }@b@@b@  public org.apache.flink.core.fs.FSDataInputStream open(org.apache.flink.core.fs.Path f) throws IOException@b@  {@b@    org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(new org.apache.hadoop.fs.Path(f.toString()));@b@    return new DistributedDataInputStream(fdis);@b@  }@b@@b@  public FSDataOutputStream create(org.apache.flink.core.fs.Path f, boolean overwrite, int bufferSize, short replication, long blockSize)@b@    throws IOException@b@  {@b@    org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);@b@@b@    return new DistributedDataOutputStream(fdos);@b@  }@b@@b@  public FSDataOutputStream create(org.apache.flink.core.fs.Path f, boolean overwrite)@b@    throws IOException@b@  {@b@    org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);@b@@b@    return new DistributedDataOutputStream(fdos);@b@  }@b@@b@  public boolean delete(org.apache.flink.core.fs.Path f, boolean recursive) throws IOException@b@  {@b@    return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), recursive);@b@  }@b@@b@  public org.apache.flink.core.fs.FileStatus[] listStatus(org.apache.flink.core.fs.Path f) throws IOException@b@  {@b@    org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));@b@    org.apache.flink.core.fs.FileStatus[] files = new org.apache.flink.core.fs.FileStatus[hadoopFiles.length];@b@@b@    for (int i = 0; i < files.length; ++i) {@b@      files[i] = new DistributedFileStatus(hadoopFiles[i]);@b@    }@b@@b@    return files;@b@  }@b@@b@  public boolean mkdirs(org.apache.flink.core.fs.Path f) throws IOException@b@  {@b@    return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));@b@  }@b@@b@  public boolean rename(org.apache.flink.core.fs.Path src, org.apache.flink.core.fs.Path dst) throws IOException@b@  {@b@    return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()), new org.apache.hadoop.fs.Path(dst.toString()));@b@  }@b@@b@  public long getDefaultBlockSize()@b@  {@b@    return this.fs.getDefaultBlockSize();@b@  }@b@@b@  public boolean isDistributedFS()@b@  {@b@    return true;@b@  }@b@}

3.S3FileSystem实现类

package org.apache.flink.runtime.fs.s3;@b@@b@import com.amazonaws.AmazonClientException;@b@import com.amazonaws.AmazonServiceException;@b@import com.amazonaws.auth.AWSCredentials;@b@import com.amazonaws.auth.BasicAWSCredentials;@b@import com.amazonaws.services.s3.AmazonS3Client;@b@import com.amazonaws.services.s3.model.Bucket;@b@import com.amazonaws.services.s3.model.ObjectListing;@b@import com.amazonaws.services.s3.model.ObjectMetadata;@b@import com.amazonaws.services.s3.model.Owner;@b@import com.amazonaws.services.s3.model.S3ObjectSummary;@b@import java.io.FileNotFoundException;@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.net.MalformedURLException;@b@import java.net.URI;@b@import java.net.URISyntaxException;@b@import java.net.URL;@b@import java.net.URLDecoder;@b@import java.util.ArrayList;@b@import java.util.Date;@b@import java.util.Iterator;@b@import java.util.List;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.configuration.GlobalConfiguration;@b@import org.apache.flink.core.fs.BlockLocation;@b@import org.apache.flink.core.fs.FSDataInputStream;@b@import org.apache.flink.core.fs.FSDataOutputStream;@b@import org.apache.flink.core.fs.FileStatus;@b@import org.apache.flink.core.fs.FileSystem;@b@import org.apache.flink.core.fs.Path;@b@import org.apache.flink.util.StringUtils;@b@@b@public final class S3FileSystem extends FileSystem@b@{@b@  private static final Log LOG = LogFactory.getLog(S3FileSystem.class);@b@  public static final String S3_HOST_KEY = "fs.s3.host";@b@  public static final String S3_PORT_KEY = "fs.s3.port";@b@  public static final String S3_RRS_KEY = "fs.s3.rrs";@b@  public static final String S3_ACCESS_KEY_KEY = "fs.s3.accessKey";@b@  public static final String S3_SECRET_KEY_KEY = "fs.s3.secretKey";@b@  private static final String DEFAULT_S3_HOST = "s3.amazonaws.com";@b@  private static final boolean DEFAULT_S3_RRS = 1;@b@  private static final int DEFAULT_S3_PORT = 80;@b@  private static final String HTTP_PREFIX = "http";@b@  private static final int HTTP_RESOURCE_NOT_FOUND_CODE = 404;@b@  private static final char S3_DIRECTORY_SEPARATOR = 47;@b@  public static final String S3_SCHEME = "s3";@b@  private static final String URL_ENCODE_CHARACTER = "UTF-8";@b@  private String host = null;@b@  private int port = -1;@b@  private URI s3Uri = null;@b@  private AmazonS3Client s3Client = null;@b@  private S3DirectoryStructure directoryStructure = null;@b@  private final boolean useRRS;@b@@b@  public S3FileSystem()@b@  {@b@    this.useRRS = GlobalConfiguration.getBoolean("fs.s3.rrs", true);@b@    LOG.info(new StringBuilder().append("Creating new S3 file system binding with Reduced Redundancy Storage ").append((this.useRRS) ? "enabled" : "disabled").toString());@b@  }@b@@b@  public Path getWorkingDirectory()@b@  {@b@    return new Path(this.s3Uri);@b@  }@b@@b@  public URI getUri()@b@  {@b@    return this.s3Uri;@b@  }@b@@b@  public void initialize(URI name)@b@    throws IOException@b@  {@b@    this.host = name.getHost();@b@    if (this.host == null) {@b@      LOG.debug("Provided URI does not provide a host to connect to, using configuration...");@b@      this.host = GlobalConfiguration.getString("fs.s3.host", "s3.amazonaws.com");@b@    }@b@@b@    this.port = name.getPort();@b@    if (this.port == -1) {@b@      LOG.debug("Provided URI does not provide a port to connect to, using configuration...");@b@      this.port = GlobalConfiguration.getInteger("fs.s3.port", 80);@b@    }@b@@b@    String userInfo = name.getUserInfo();@b@@b@    String awsAccessKey = null;@b@    String awsSecretKey = null;@b@@b@    if (userInfo != null)@b@    {@b@      String[] splits = userInfo.split(":");@b@      if (splits.length > 1) {@b@        awsAccessKey = URLDecoder.decode(splits[0], "UTF-8");@b@        awsSecretKey = URLDecoder.decode(splits[1], "UTF-8");@b@      }@b@    }@b@@b@    if (awsAccessKey == null) {@b@      LOG.debug("Provided URI does not provide an access key to Amazon S3, using configuration...");@b@      awsAccessKey = GlobalConfiguration.getString("fs.s3.accessKey", null);@b@      if (awsAccessKey == null)@b@        throw new IOException("Cannot determine access key to Amazon S3");@b@@b@    }@b@@b@    if (awsSecretKey == null) {@b@      LOG.debug("Provided URI does not provide a secret key to Amazon S3, using configuration...");@b@      awsSecretKey = GlobalConfiguration.getString("fs.s3.secretKey", null);@b@      if (awsSecretKey == null)@b@        throw new IOException("Cannot determine secret key to Amazon S3");@b@@b@    }@b@@b@    AWSCredentials credentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);@b@    this.s3Client = new AmazonS3Client(credentials);@b@@b@    initializeDirectoryStructure(name);@b@  }@b@@b@  private void initializeDirectoryStructure(URI name) throws IOException@b@  {@b@    String basePath = name.getPath();@b@    try@b@    {@b@      String endpoint = new URL("http", this.host, this.port, basePath).toString();@b@      if (LOG.isDebugEnabled()) {@b@        LOG.debug(new StringBuilder().append("Trying S3 endpoint ").append(endpoint).toString());@b@      }@b@@b@      this.s3Client.setEndpoint(endpoint);@b@      Owner owner = this.s3Client.getS3AccountOwner();@b@      LOG.info(new StringBuilder().append("Successfully established connection to Amazon S3 using the endpoint ").append(endpoint).toString());@b@      LOG.info(new StringBuilder().append("Amazon S3 user is ").append(owner.getDisplayName()).toString());@b@    }@b@    catch (MalformedURLException e)@b@    {@b@      throw new IOException(StringUtils.stringifyException(e));@b@    }@b@    catch (AmazonClientException e) {@b@      while (true) {@b@        if (basePath.isEmpty()) {@b@          throw new IOException(new StringBuilder().append("Cannot establish connection to Amazon S3: ").append(StringUtils.stringifyException(e)).toString());@b@        }@b@@b@        int pos = basePath.lastIndexOf("/");@b@        if (pos < 0)@b@          basePath = "";@b@        else@b@          basePath = basePath.substring(0, pos);@b@@b@      }@b@@b@    }@b@@b@    try@b@    {@b@      this.s3Uri = new URI("s3", (String)null, this.host, this.port, basePath, null, null);@b@    } catch (URISyntaxException e) {@b@      throw new IOException(StringUtils.stringifyException(e));@b@    }@b@@b@    this.directoryStructure = new S3DirectoryStructure(basePath);@b@  }@b@@b@  public FileStatus getFileStatus(Path f)@b@    throws IOException@b@  {@b@    S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@@b@    if ((!(bop.hasBucket())) && (!(bop.hasObject())))@b@      return new S3FileStatus(f, 0L, true, 0L, 0L);@b@@b@    try@b@    {@b@      if ((bop.hasBucket()) && (!(bop.hasObject())))@b@      {@b@        List buckets = this.s3Client.listBuckets();@b@        Iterator it = buckets.iterator();@b@@b@        while (it.hasNext())@b@        {@b@          Bucket bucket = (Bucket)it.next();@b@          if (bop.getBucket().equals(bucket.getName()))@b@          {@b@            long creationDate = dateToLong(bucket.getCreationDate());@b@@b@            return new S3FileStatus(f, 0L, true, creationDate, 0L);@b@          }@b@        }@b@@b@        throw new FileNotFoundException(new StringBuilder().append("Cannot find ").append(f.toUri()).toString());@b@      }@b@      try@b@      {@b@        ObjectMetadata om = this.s3Client.getObjectMetadata(bop.getBucket(), bop.getObject());@b@        long modificationDate = dateToLong(om.getLastModified());@b@@b@        if (objectRepresentsDirectory(bop.getObject(), om.getContentLength()))@b@          return new S3FileStatus(f, 0L, true, modificationDate, 0L);@b@@b@        return new S3FileStatus(f, om.getContentLength(), false, modificationDate, 0L);@b@      }@b@      catch (AmazonServiceException e)@b@      {@b@        if (e.getStatusCode() == 404)@b@          throw new FileNotFoundException(new StringBuilder().append("Cannot find ").append(f.toUri()).toString());@b@@b@        throw e;@b@      }@b@    }@b@    catch (AmazonClientException e) {@b@      throw new IOException(StringUtils.stringifyException(e));@b@    }@b@  }@b@@b@  private static long dateToLong(Date date)@b@  {@b@    if (date == null) {@b@      return 0L;@b@    }@b@@b@    return date.getTime();@b@  }@b@@b@  public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)@b@    throws IOException@b@  {@b@    if (start + len > file.getLen()) {@b@      return null;@b@    }@b@@b@    S3BlockLocation bl = new S3BlockLocation(this.host, file.getLen());@b@@b@    return new BlockLocation[] { bl };@b@  }@b@@b@  public FSDataInputStream open(Path f, int bufferSize)@b@    throws IOException@b@  {@b@    return open(f);@b@  }@b@@b@  public FSDataInputStream open(Path f)@b@    throws IOException@b@  {@b@    FileStatus fileStatus = getFileStatus(f);@b@@b@    if (fileStatus.isDir()) {@b@      throw new IOException(new StringBuilder().append("Cannot open ").append(f.toUri()).append(" because it is a directory").toString());@b@    }@b@@b@    S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@    if ((!(bop.hasBucket())) || (!(bop.hasObject()))) {@b@      throw new IOException(new StringBuilder().append(f.toUri()).append(" cannot be opened").toString());@b@    }@b@@b@    return new S3DataInputStream(this.s3Client, bop.getBucket(), bop.getObject());@b@  }@b@@b@  public FileStatus[] listStatus(Path f)@b@    throws IOException@b@  {@b@    S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@    try@b@    {@b@      if (!(bop.hasBucket()))@b@      {@b@        List list = this.s3Client.listBuckets();@b@        S3FileStatus[] array = new S3FileStatus[list.size()];@b@        Iterator it = list.iterator();@b@        int i = 0;@b@        while (it.hasNext()) {@b@          Bucket bucket = (Bucket)it.next();@b@          long creationDate = dateToLong(bucket.getCreationDate());@b@@b@          S3FileStatus status = new S3FileStatus(extendPath(f, new StringBuilder().append(bucket.getName()).append('/').toString()), 0L, true, creationDate, 0L);@b@@b@          array[(i++)] = status;@b@        }@b@@b@        return array;@b@      }@b@@b@      if ((bop.hasBucket()) && (!(bop.hasObject())))@b@      {@b@        if (!(this.s3Client.doesBucketExist(bop.getBucket()))) {@b@          throw new FileNotFoundException(new StringBuilder().append("Cannot find ").append(f.toUri()).toString());@b@        }@b@@b@        return listBucketContent(f, bop);@b@      }@b@@b@      ObjectMetadata omd = this.s3Client.getObjectMetadata(bop.getBucket(), bop.getObject());@b@      if (objectRepresentsDirectory(bop.getObject(), omd.getContentLength()))@b@      {@b@        return listBucketContent(f, bop);@b@      }@b@@b@      S3FileStatus fileStatus = new S3FileStatus(f, omd.getContentLength(), false, dateToLong(omd.getLastModified()), 0L);@b@@b@      return new FileStatus[] { fileStatus };@b@    }@b@    catch (AmazonClientException e)@b@    {@b@      throw new IOException(StringUtils.stringifyException(e));@b@    }@b@  }@b@@b@  private S3FileStatus[] listBucketContent(Path f, S3BucketObjectPair bop) throws IOException@b@  {@b@    ObjectListing listing = null;@b@    List resultList = new ArrayList();@b@@b@    int depth = (bop.hasObject()) ? getDepth(bop.getObject()) + 1 : 0;@b@    while (true)@b@    {@b@      if (listing == null)@b@        if (bop.hasObject())@b@          listing = this.s3Client.listObjects(bop.getBucket(), bop.getObject());@b@        else@b@          listing = this.s3Client.listObjects(bop.getBucket());@b@@b@      else {@b@        listing = this.s3Client.listNextBatchOfObjects(listing);@b@      }@b@@b@      List list = listing.getObjectSummaries();@b@      Iterator it = list.iterator();@b@      while (true) { S3ObjectSummary os;@b@        String key;@b@        S3FileStatus fileStatus;@b@        while (true) { while (true) { if (!(it.hasNext()))@b@              break label274;@b@            os = (S3ObjectSummary)it.next();@b@            key = os.getKey();@b@@b@            int childDepth = getDepth(os.getKey());@b@@b@            if (childDepth == depth)@b@              break;@b@@b@          }@b@@b@          if (!(bop.hasObject())) break;@b@          if (key.startsWith(bop.getObject())) {@b@            key = key.substring(bop.getObject().length());@b@          }@b@@b@          if (!(key.isEmpty()))@b@            break;@b@@b@        }@b@@b@        long modificationDate = dateToLong(os.getLastModified());@b@@b@        if (objectRepresentsDirectory(os))@b@          fileStatus = new S3FileStatus(extendPath(f, key), 0L, true, modificationDate, 0L);@b@        else {@b@          fileStatus = new S3FileStatus(extendPath(f, key), os.getSize(), false, modificationDate, 0L);@b@        }@b@@b@        resultList.add(fileStatus);@b@      }@b@@b@      if (!(listing.isTruncated())) {@b@        label274: break;@b@      }@b@@b@    }@b@@b@    return ((S3FileStatus[])resultList.toArray(new S3FileStatus[0]));@b@  }@b@@b@  private static int getDepth(String key)@b@  {@b@    int depth = 0;@b@    int nextStartPos = 0;@b@@b@    int length = key.length();@b@@b@    while (nextStartPos < length)@b@    {@b@      int sepPos = key.indexOf(47, nextStartPos);@b@      if (sepPos < 0)@b@        break;@b@@b@      ++depth;@b@      nextStartPos = sepPos + 1;@b@    }@b@@b@    if ((length > 0) && @b@      (key.charAt(length - 1) == '/')) {@b@      --depth;@b@    }@b@@b@    return depth;@b@  }@b@@b@  public boolean delete(Path f, boolean recursive) throws IOException@b@  {@b@    FileStatus fileStatus;@b@    try@b@    {@b@      fileStatus = getFileStatus(f);@b@      S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@@b@      if (fileStatus.isDir())@b@      {@b@        FileStatus[] arr$;@b@        int i$;@b@        boolean retVal = false;@b@        FileStatus[] dirContent = listStatus(f);@b@        if (dirContent.length > 0)@b@        {@b@          if (!(recursive)) {@b@            throw new IOException(new StringBuilder().append("Found non-empty directory ").append(f).append(" while performing non-recursive delete").toString());@b@          }@b@@b@          arr$ = dirContent; int len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { FileStatus entry = arr$[i$];@b@@b@            if (delete(entry.getPath(), true)) {@b@              retVal = true;@b@            }@b@@b@          }@b@@b@        }@b@@b@        if (!(bop.hasBucket()))@b@        {@b@          return retVal;@b@        }@b@@b@        if (!(bop.hasObject()))@b@        {@b@          this.s3Client.deleteBucket(bop.getBucket());@b@        }@b@        else@b@          this.s3Client.deleteObject(bop.getBucket(), bop.getObject());@b@      }@b@      else@b@      {@b@        this.s3Client.deleteObject(bop.getBucket(), bop.getObject());@b@      }@b@    } catch (AmazonClientException e) {@b@      throw new IOException(StringUtils.stringifyException(e));@b@    }@b@@b@    return true; } @b@ @b@  public boolean mkdirs(Path f) throws IOException {   } @b@  private void createEmptyObject(String bucketName, String objectName) { InputStream im = new InputStream(this)@b@    {@b@      public int read()@b@        throws IOException@b@      {@b@        return -1;@b@      }@b@@b@    };@b@    ObjectMetadata om = new ObjectMetadata();@b@    om.setContentLength(0L);@b@@b@    this.s3Client.putObject(bucketName, objectName, im, om);@b@  }@b@@b@  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)@b@    throws IOException@b@  {@b@    if ((!(overwrite)) && (exists(f))) {@b@      throw new IOException(new StringBuilder().append(f.toUri()).append(" already exists").toString());@b@    }@b@@b@    S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@    if ((!(bop.hasBucket())) || (!(bop.hasObject()))) {@b@      throw new IOException(new StringBuilder().append(f.toUri()).append(" is not a valid path to create a new file").toString());@b@    }@b@@b@    if (bufferSize < 5242880) {@b@      throw new IOException("Provided buffer must be at least 5242880 bytes");@b@    }@b@@b@    byte[] buf = new byte[bufferSize];@b@@b@    return new S3DataOutputStream(this.s3Client, bop.getBucket(), bop.getObject(), buf, this.useRRS);@b@  }@b@@b@  public FSDataOutputStream create(Path f, boolean overwrite)@b@    throws IOException@b@  {@b@    return create(f, overwrite, 5242880, 1, 1024L);@b@  }@b@@b@  private boolean objectRepresentsDirectory(S3ObjectSummary os)@b@  {@b@    return objectRepresentsDirectory(os.getKey(), os.getSize());@b@  }@b@@b@  private boolean objectRepresentsDirectory(String name, long size)@b@  {@b@    if (name.isEmpty()) {@b@      return false;@b@    }@b@@b@    return ((name.charAt(name.length() - 1) == '/') && (size == 0L));@b@  }@b@@b@  static Path extendPath(Path parent, String extension)@b@    throws IOException@b@  {@b@    String extendedPath;@b@    URI parentUri = parent.toUri();@b@@b@    if (extension.isEmpty()) {@b@      return parent;@b@    }@b@@b@    String path = parentUri.getPath();@b@@b@    if (path.isEmpty()) {@b@      if (extension.charAt(0) == '/')@b@        extendedPath = extension;@b@      else@b@        extendedPath = new StringBuilder().append("/").append(extension).toString();@b@@b@    }@b@    else if (path.charAt(path.length() - 1) == '/') {@b@      if (extension.charAt(0) == '/')@b@        if (extension.length() > 1)@b@          extendedPath = new StringBuilder().append(path).append(extension.substring(1)).toString();@b@        else@b@          extendedPath = path;@b@@b@      else@b@        extendedPath = new StringBuilder().append(path).append(extension).toString();@b@@b@    }@b@    else if (extension.charAt(0) == '/')@b@      extendedPath = new StringBuilder().append(path).append(extension).toString();@b@    else {@b@      extendedPath = new StringBuilder().append(path).append("/").append(extension).toString();@b@    }@b@@b@    try@b@    {@b@      URI extendedUri = new URI(parentUri.getScheme(), "", extendedPath, parentUri.getQuery(), parentUri.getFragment());@b@@b@      return new Path(extendedUri);@b@    } catch (URISyntaxException e) {@b@      throw new IOException(StringUtils.stringifyException(e));@b@    }@b@  }@b@@b@  public boolean rename(Path src, Path dst)@b@    throws IOException@b@  {@b@    throw new UnsupportedOperationException("This method is not yet implemented");@b@  }@b@@b@  public boolean isDistributedFS()@b@  {@b@    return true;@b@  }@b@}