在reduce阶段之后合并输出文件

问题描述 投票:73回答:10

在mapreduce中,每个reduce任务将其输出写入名为part-r-nnnnn的文件,其中nnnnn是与reduce任务关联的分区ID。 map / reduce是否合并这些文件?如果有,怎么样?

hadoop mapreduce
10个回答
117
投票

您可以通过调用以下命令来委托reduce输出文件的整个合并,而不是自己合并文件。

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

注意这会在本地组合HDFS文件。确保在运行之前有足够的磁盘空间


0
投票

。 map / reduce是否合并这些文件?

不,它没有合并。

您可以使用IdentityReducer来实现您的目标。

不执行缩减,将所有输入值直接写入输出。

public void reduce(K key,
                   Iterator<V> values,
                   OutputCollector<K,V> output,
                   Reporter reporter)
            throws IOException

将所有键和值直接写入输出。

看看相关的SE帖子:

hadoop: difference between 0 reducer and identity reducer?


27
投票

不,这些文件不会被Hadoop合并。您获得的文件数与reduce任务数相同。

如果您需要它作为下一个作业的输入,那么不要担心有单独的文件。只需将整个目录指定为下一个作业的输入。

如果确实需要群集外的数据,那么我通常会在从群集中提取数据时将它们合并到接收端。

即这样的事情:

hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt

8
投票

这是您可以用于在HDFS中合并文件的功能

public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException {
    FileSystem fs = FileSystem.get(config);
    Path srcPath = new Path(src);
    Path dstPath = new Path(dest);

    // Check if the path already exists
    if (!(fs.exists(srcPath))) {
        logger.info("Path " + src + " does not exists!");
        return false;
    }

    if (!(fs.exists(dstPath))) {
        logger.info("Path " + dest + " does not exists!");
        return false;
    }
    return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null);
}

7
投票

仅对于文本文件和HDFS作为源和目标,请使用以下命令:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

这将连接input_hdfs_dir中的所有文件,并将输出写回output_hdfs_file的HDFS。请记住,所有数据都将被带回本地系统,然后再次上传到hdfs,尽管没有创建临时文件,这使用UNIX pe即时发生。

此外,这不适用于非文本文件,如Avro,ORC等。

对于二进制文件,您可以执行类似的操作(如果您在目录上映射了Hive表):

insert overwrite table tbl select * from tbl

根据您的配置,这可能还会创建多个文件。要创建单个文件,请使用mapreduce.job.reduces=1将reducers的数量设置为1,或将hive属性设置为hive.merge.mapredfiles=true


4
投票

part-r-nnnnn文件是在'r'之间指定的reduce阶段之后生成的。现在的事实是,如果你有一个减速器运行,你将有一个输出文件,如part-r-00000。如果减速器的数量是2,那么你将得到part-r-00000和part-r-00001,依此类推。看,如果输出文件太大而无法放入机器内存中,因为hadoop框架设计为在商品机器上运行,那么文件会被分割。根据MRv1,您可以使用20个减速器来限制逻辑。您可能需要在配置文件mapred-site.xml中自定义更多但相同的需求。谈论你的问题;您可以使用getmerge,也可以通过将以下语句嵌入到驱动程序代码中来将reducers的数量设置为1

job.setNumReduceTasks(1);

希望这能回答你的问题。


3
投票

您可以运行其他map / reduce任务,其中map和reduce不会更改数据,分区程序会将所有数据分配给单个reducer。


1
投票

除了我之前的回答,我还有一个答案,我几分钟前就试过了。您可以使用CustomOutputFormat,它看起来像下面给出的代码

public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> {

    @Override
    public RecordWriter<StudentKey,PassValue> getRecordWriter(
            TaskAttemptContext tac) throws IOException, InterruptedException {
        //step 1: GET THE CURRENT PATH
        Path currPath=FileOutputFormat.getOutputPath(tac);

        //Create the full path
        Path fullPath=new Path(currPath,"Aniruddha.txt");

        //create the file in the file system
        FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
        FSDataOutputStream fileOut=fs.create(fullPath,tac);
        return new VictorRecordWriter(fileOut);
    }

}

只是,看看最后一行的第四行。我使用自己的名字作为输出文件名,我用15个reducer测试了程序。文件仍然保持不变。因此,可以获得单个输出文件而不是两个或更多文件,但是非常清楚输出文件的大小不得超过主存储器的大小,即输出文件必须适合商品机器的内存,否则可能存在输出文件拆分出现问题。谢谢!!


0
投票

为什么不使用像这样的猪脚本来合并分区文件:

stuff = load "/path/to/dir/*"

store stuff into "/path/to/mergedir"

0
投票

如果文件有标题,你可以通过这样做摆脱它:

hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv

然后手动为output.csv添加标题

© www.soinside.com 2019 - 2024. All rights reserved.