首页

关于mahout源码包中的HadoopUtil工具类对hadoop常用任务创建删除、系统路径等操作

标签:HadoopUtil,工具类,hadoop,mahout     发布时间:2018-03-26   

一、前言

关于mahout源码包中对apachehadoop通过工具类org.apache.mahout.common.HadoopUtil,进行org.apache.hadoop.mapreduce.Job任务创建准备、获取分布式文件系统缓存路径getSingleCachedFile等。

二、源码说明

package org.apache.mahout.common;@b@@b@import java.io.FileNotFoundException;@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.net.URI;@b@import java.util.ArrayList;@b@import java.util.Arrays;@b@import java.util.Comparator;@b@import java.util.Iterator;@b@import java.util.List;@b@@b@import com.google.common.base.Joiner;@b@import com.google.common.base.Preconditions;@b@import org.apache.hadoop.conf.Configuration;@b@import org.apache.hadoop.filecache.DistributedCache;@b@import org.apache.hadoop.fs.FSDataInputStream;@b@import org.apache.hadoop.fs.FSDataOutputStream;@b@import org.apache.hadoop.fs.FileStatus;@b@import org.apache.hadoop.fs.FileSystem;@b@import org.apache.hadoop.fs.LocalFileSystem;@b@import org.apache.hadoop.fs.Path;@b@import org.apache.hadoop.fs.PathFilter;@b@import org.apache.hadoop.io.Writable;@b@import org.apache.hadoop.mapreduce.InputFormat;@b@import org.apache.hadoop.mapreduce.Job;@b@import org.apache.hadoop.mapreduce.JobContext;@b@import org.apache.hadoop.mapreduce.Mapper;@b@import org.apache.hadoop.mapreduce.OutputFormat;@b@import org.apache.hadoop.mapreduce.Reducer;@b@import org.apache.mahout.common.iterator.sequencefile.PathType;@b@import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;@b@import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public final class HadoopUtil {@b@@b@  private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);@b@@b@  private HadoopUtil() { }@b@@b@  @b@  public static Job prepareJob(Path inputPath,@b@                           Path outputPath,@b@                           Class<? extends InputFormat> inputFormat,@b@                           Class<? extends Mapper> mapper,@b@                           Class<? extends Writable> mapperKey,@b@                           Class<? extends Writable> mapperValue,@b@                           Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {@b@@b@    Job job = new Job(new Configuration(conf));@b@    Configuration jobConf = job.getConfiguration();@b@@b@    if (mapper.equals(Mapper.class)) {@b@      throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");@b@    }@b@    job.setJarByClass(mapper);@b@@b@    job.setInputFormatClass(inputFormat);@b@    jobConf.set("mapred.input.dir", inputPath.toString());@b@@b@    job.setMapperClass(mapper);@b@    job.setMapOutputKeyClass(mapperKey);@b@    job.setMapOutputValueClass(mapperValue);@b@    job.setOutputKeyClass(mapperKey);@b@    job.setOutputValueClass(mapperValue);@b@    jobConf.setBoolean("mapred.compress.map.output", true);@b@    job.setNumReduceTasks(0);@b@@b@    job.setOutputFormatClass(outputFormat);@b@    jobConf.set("mapred.output.dir", outputPath.toString());@b@@b@    return job;@b@  }@b@@b@  @b@  public static Job prepareJob(Path inputPath,@b@                           Path outputPath,@b@                           Class<? extends InputFormat> inputFormat,@b@                           Class<? extends Mapper> mapper,@b@                           Class<? extends Writable> mapperKey,@b@                           Class<? extends Writable> mapperValue,@b@                           Class<? extends Reducer> reducer,@b@                           Class<? extends Writable> reducerKey,@b@                           Class<? extends Writable> reducerValue,@b@                           Class<? extends OutputFormat> outputFormat,@b@                           Configuration conf) throws IOException {@b@@b@    Job job = new Job(new Configuration(conf));@b@    Configuration jobConf = job.getConfiguration();@b@@b@    if (reducer.equals(Reducer.class)) {@b@      if (mapper.equals(Mapper.class)) {@b@        throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");@b@      }@b@      job.setJarByClass(mapper);@b@    } else {@b@      job.setJarByClass(reducer);@b@    }@b@@b@    job.setInputFormatClass(inputFormat);@b@    jobConf.set("mapred.input.dir", inputPath.toString());@b@@b@    job.setMapperClass(mapper);@b@    if (mapperKey != null) {@b@      job.setMapOutputKeyClass(mapperKey);@b@    }@b@    if (mapperValue != null) {@b@      job.setMapOutputValueClass(mapperValue);@b@    }@b@@b@    jobConf.setBoolean("mapred.compress.map.output", true);@b@@b@    job.setReducerClass(reducer);@b@    job.setOutputKeyClass(reducerKey);@b@    job.setOutputValueClass(reducerValue);@b@@b@    job.setOutputFormatClass(outputFormat);@b@    jobConf.set("mapred.output.dir", outputPath.toString());@b@@b@    return job;@b@  }@b@@b@@b@  public static String getCustomJobName(String className, JobContext job,@b@                                  Class<? extends Mapper> mapper,@b@                                  Class<? extends Reducer> reducer) {@b@    StringBuilder name = new StringBuilder(100);@b@    String customJobName = job.getJobName();@b@    if (customJobName == null || customJobName.trim().isEmpty()) {@b@      name.append(className);@b@    } else {@b@      name.append(customJobName);@b@    }@b@    name.append('-').append(mapper.getSimpleName());@b@    name.append('-').append(reducer.getSimpleName());@b@    return name.toString();@b@  }@b@@b@@b@  public static void delete(Configuration conf, Iterable<Path> paths) throws IOException {@b@    if (conf == null) {@b@      conf = new Configuration();@b@    }@b@    for (Path path : paths) {@b@      FileSystem fs = path.getFileSystem(conf);@b@      if (fs.exists(path)) {@b@        log.info("Deleting {}", path);@b@        fs.delete(path, true);@b@      }@b@    }@b@  }@b@@b@  public static void delete(Configuration conf, Path... paths) throws IOException {@b@    delete(conf, Arrays.asList(paths));@b@  }@b@@b@  public static long countRecords(Path path, Configuration conf) throws IOException {@b@    long count = 0;@b@    Iterator<?> iterator = new SequenceFileValueIterator<>(path, true, conf);@b@    while (iterator.hasNext()) {@b@      iterator.next();@b@      count++;@b@    }@b@    return count;@b@  }@b@@b@ @b@  public static long countRecords(Path path, PathType pt, PathFilter filter, Configuration conf) throws IOException {@b@    long count = 0;@b@    Iterator<?> iterator = new SequenceFileDirValueIterator<>(path, pt, filter, null, true, conf);@b@    while (iterator.hasNext()) {@b@      iterator.next();@b@      count++;@b@    }@b@    return count;@b@  }@b@@b@  public static InputStream openStream(Path path, Configuration conf) throws IOException {@b@    FileSystem fs = FileSystem.get(path.toUri(), conf);@b@    return fs.open(path.makeQualified(path.toUri(), path));@b@  }@b@@b@  public static FileStatus[] getFileStatus(Path path, PathType pathType, PathFilter filter,@b@      Comparator<FileStatus> ordering, Configuration conf) throws IOException {@b@    FileStatus[] statuses;@b@    FileSystem fs = path.getFileSystem(conf);@b@    if (filter == null) {@b@      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : listStatus(fs, path);@b@    } else {@b@      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : listStatus(fs, path, filter);@b@    }@b@    if (ordering != null) {@b@      Arrays.sort(statuses, ordering);@b@    }@b@    return statuses;@b@  }@b@@b@  public static FileStatus[] listStatus(FileSystem fs, Path path) throws IOException {@b@    try {@b@      return fs.listStatus(path);@b@    } catch (FileNotFoundException e) {@b@      return new FileStatus[0];@b@    }@b@  }@b@@b@  public static FileStatus[] listStatus(FileSystem fs, Path path, PathFilter filter) throws IOException {@b@    try {@b@      return fs.listStatus(path, filter);@b@    } catch (FileNotFoundException e) {@b@      return new FileStatus[0];@b@    }@b@  }@b@@b@  public static void cacheFiles(Path fileToCache, Configuration conf) {@b@    DistributedCache.setCacheFiles(new URI[]{fileToCache.toUri()}, conf);@b@  }@b@@b@ @b@  public static Path getSingleCachedFile(Configuration conf) throws IOException {@b@    return getCachedFiles(conf)[0];@b@  }@b@@b@ @b@  public static Path[] getCachedFiles(Configuration conf) throws IOException {@b@    LocalFileSystem localFs = FileSystem.getLocal(conf);@b@    Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);@b@@b@    URI[] fallbackFiles = DistributedCache.getCacheFiles(conf);@b@@b@    // fallback for local execution@b@    if (cacheFiles == null) {@b@@b@      Preconditions.checkState(fallbackFiles != null, "Unable to find cached files!");@b@@b@      cacheFiles = new Path[fallbackFiles.length];@b@      for (int n = 0; n < fallbackFiles.length; n++) {@b@        cacheFiles[n] = new Path(fallbackFiles[n].getPath());@b@      }@b@    } else {@b@@b@      for (int n = 0; n < cacheFiles.length; n++) {@b@        cacheFiles[n] = localFs.makeQualified(cacheFiles[n]);@b@        // fallback for local execution@b@        if (!localFs.exists(cacheFiles[n])) {@b@          cacheFiles[n] = new Path(fallbackFiles[n].getPath());@b@        }@b@      }@b@    }@b@@b@    Preconditions.checkState(cacheFiles.length > 0, "Unable to find cached files!");@b@@b@    return cacheFiles;@b@  }@b@@b@  public static void setSerializations(Configuration configuration) {@b@    configuration.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"@b@        + "org.apache.hadoop.io.serializer.WritableSerialization");@b@  }@b@@b@  public static void writeInt(int value, Path path, Configuration configuration) throws IOException {@b@    FileSystem fs = FileSystem.get(path.toUri(), configuration);@b@    try (FSDataOutputStream out = fs.create(path)) {@b@      out.writeInt(value);@b@    }@b@  }@b@@b@  public static int readInt(Path path, Configuration configuration) throws IOException {@b@    FileSystem fs = FileSystem.get(path.toUri(), configuration);@b@    try (FSDataInputStream in = fs.open(path)) {@b@      return in.readInt();@b@    }@b@  }@b@@b@  /**@b@   * Builds a comma-separated list of input splits@b@   * @param fs - File System@b@   * @param fileStatus - File Status@b@   * @return list of directories as a comma-separated String@b@   * @throws IOException - IO Exception@b@   */@b@  public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {@b@    boolean containsFiles = false;@b@    List<String> directoriesList = new ArrayList<>();@b@    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {@b@      if (childFileStatus.isDir()) {@b@        String subDirectoryList = buildDirList(fs, childFileStatus);@b@        directoriesList.add(subDirectoryList);@b@      } else {@b@        containsFiles = true;@b@      }@b@    }@b@@b@    if (containsFiles) {@b@      directoriesList.add(fileStatus.getPath().toUri().getPath());@b@    }@b@    return Joiner.on(',').skipNulls().join(directoriesList.iterator());@b@  }@b@@b@  /**@b@   * Builds a comma-separated list of input splits@b@   * @param fs - File System@b@   * @param fileStatus - File Status@b@   * @param pathFilter - path filter@b@   * @return list of directories as a comma-separated String@b@   * @throws IOException - IO Exception@b@   */@b@  public static String buildDirList(FileSystem fs, FileStatus fileStatus, PathFilter pathFilter) throws IOException {@b@    boolean containsFiles = false;@b@    List<String> directoriesList = new ArrayList<>();@b@    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), pathFilter)) {@b@      if (childFileStatus.isDir()) {@b@        String subDirectoryList = buildDirList(fs, childFileStatus);@b@        directoriesList.add(subDirectoryList);@b@      } else {@b@        containsFiles = true;@b@      }@b@    }@b@@b@    if (containsFiles) {@b@      directoriesList.add(fileStatus.getPath().toUri().getPath());@b@    }@b@    return Joiner.on(',').skipNulls().join(directoriesList.iterator());@b@  }@b@@b@  /**@b@   *@b@   * @param configuration  -  configuration@b@   * @param filePath - Input File Path@b@   * @return relative file Path@b@   * @throws IOException - IO Exception@b@   */@b@  public static String calcRelativeFilePath(Configuration configuration, Path filePath) throws IOException {@b@    FileSystem fs = filePath.getFileSystem(configuration);@b@    FileStatus fst = fs.getFileStatus(filePath);@b@    String currentPath = fst.getPath().toString().replaceFirst("file:", "");@b@@b@    String basePath = configuration.get("baseinputpath");@b@    if (!basePath.endsWith("/")) {@b@      basePath += "/";@b@    }@b@    basePath = basePath.replaceFirst("file:", "");@b@    String[] parts = currentPath.split(basePath);@b@@b@    if (parts.length == 2) {@b@      return parts[1];@b@    } else if (parts.length == 1) {@b@      return parts[0];@b@    }@b@    return currentPath;@b@  }@b@@b@  /**@b@   * Finds a file in the DistributedCache@b@   *@b@   * @param partOfFilename a substring of the file name@b@   * @param localFiles holds references to files stored in distributed cache@b@   * @return Path to first matched file or null if nothing was found@b@   **/@b@  public static Path findInCacheByPartOfFilename(String partOfFilename, URI[] localFiles) {@b@    for (URI distCacheFile : localFiles) {@b@      log.info("trying find a file in distributed cache containing [{}] in its name", partOfFilename);@b@      if (distCacheFile != null && distCacheFile.toString().contains(partOfFilename)) {@b@        log.info("found file [{}] containing [{}]", distCacheFile.toString(), partOfFilename);@b@        return new Path(distCacheFile.getPath());@b@      }@b@    }@b@    return null;@b@  }@b@}