Hadoop 正在使用 context.write() 写入文件,但输出文件为空

问题描述 投票:0回答:1

我正在运行 hadoop 代码,但遇到问题。

注意注释行

"debug exception 1"
"debug exception 2"
以及它们下面的行。由于我无法在
hadoop
map/reduce 中使用 System.out.println 打印消息,因此我使用的唯一调试方法是抛出带有消息的异常,如您在提到的注释下面看到的那样。

public class FindCoallocations {
      


public static String bucketName;
public static AWS aws;
public static boolean debugMode = true;
public static String uniqueWord = "43uireoaugibghui4reagf"; //This word doesn't exist in the corpus (hopefully).
public static String decade;
public static long N = 1;

 public static class MapperClassW1 extends Mapper<LongWritable, Text, Text , Text> {
    
    private Text word_1;
    private Text word_2;
    private Text years;
    private Text matchCount;

    public String decade;
    
    @Override
    public void setup(Context context) {
        Configuration config = context.getConfiguration();
        decade = config.getStrings("decade")[0];
    }

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException,  InterruptedException {
        word_1 = new Text();
        word_2 = new Text();
        years = new Text();
        matchCount = new Text();
      
        StringTokenizer itr = new StringTokenizer(value.toString());
        word_1.set(itr.nextToken());
        word_2.set(itr.nextToken());
        years.set(itr.nextToken());
        matchCount.set(itr.nextToken()); 
        
        if(years.toString().equals(decade)){
            Text countN = new Text(uniqueWord);
        
            context.write(countN, matchCount); // used for counting N (number of words)
            context.write (word_1, value); // used for claculating c(w1)
            //throw new IOException("exception at mapper. key: "+key.toString()+" value: "+value.toString() +" word_1: "+word_1.toString());  
        }
    }
}

public static class PartitionerClass extends Partitioner<Text, Text> {
    
    @Override
    public int getPartition(Text key, Text value, int numPartitions) {
       
        return (key.toString().hashCode() % numPartitions);
    }
}

public static class ReducerClassW1 extends Reducer<Text,Text,Text,Text> {

    public String decade;

    public void setup(Context context) {
        Configuration config = context.getConfiguration();
        decade = config.getStrings("decade")[0];
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,  InterruptedException {
        if((key.toString()).equals(uniqueWord)) // count N
        {
            long sum = 0;
            for (Text value : values) {
                sum += Long.parseLong(value.toString());
            }
            //context.write(key, new Text(Long.toString(sum)));
            context.getCounter(Counter.COMBINE_INPUT_RECORDS).increment(sum);
        }
        else  // count w1 ( value is full entry)
        {
           
            long sum = 0;
             for (Text value : values) {
                
                StringTokenizer itr = new StringTokenizer(value.toString());
                itr.nextToken();
                itr.nextToken();
                itr.nextToken();
                String tk = itr.nextToken();
                // debug exception 1
                //throw new IOException ("the key: "+key.toString()+" the value: "+value.toString()+ " sum: "+sum + "itr.nextToken(): "+tk);
                sum += Long.parseLong(tk);
                
           }
            for (Text value : values) {
                 // debug exception 2
                //throw new IOException ("the key: "+key.toString()+" the value: "+value.toString()+ " sum: "+sum);
                context.write(value, new Text("w1:"+Long.toString(sum)));
            } 
        }
       
    }
}




//Receives n args: 0 - "Step1" , 1 - decade , 2 - inputFileName1, ....... n-1 inputFileName(n-2) , n outputFileName
public static void main(String[] args) throws Exception {
    System.out.println("[DEBUG] STEP 1 started!");
    aws = AWS.getInstance();
    bucketName = aws.bucketName;
    int inputs = args.length;
    System.out.println("args: ");
    for(int i=0; i<args.length;i++){
        System.out.print("args["+i+"]"+" : "+args[i] +", ");
    }
    System.out.println("\n");
    String[] inputFileKey = new String[args.length-3];
    String outputFileKey = args[args.length-1];
    for(int i=2; i<args.length - 1;i++){
       inputFileKey[i-2] = args[i];
    }
    FindCoallocations.setDecade(args[1]);
    System.out.println("decade: "+FindCoallocations.getDecade());
    for(int i=0; i<inputFileKey.length;i++){
        System.out.println("input: "+inputFileKey[i]);
     }
     System.out.println("output: "+outputFileKey);

    
    Configuration conf = new Configuration();
    conf.setQuietMode(false);
    conf.setStrings("decade",FindCoallocations.getDecade());
    
    Job job = Job.getInstance(conf, "Join w1 count");
    job.setJarByClass(FindCoallocations.class);
    job.setMapperClass(MapperClassW1.class);
    job.setPartitionerClass(PartitionerClass.class);
    job.setCombinerClass(ReducerClassW1.class);
    job.setReducerClass(ReducerClassW1.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    for(int i=0; i<inputFileKey.length; i++){
        FileInputFormat.addInputPath(job, new Path("s3://"+bucketName+"/"+inputFileKey[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path("s3://"+bucketName+"/"+outputFileKey+"_"+decade+"_w1"));
    job.waitForCompletion(true);
    job.monitorAndPrintJob();        
    
    }
}

第一个问题,有没有更好的调试方法?目前,对于每次运行试验,我需要导出一个新的 Jar,将其上传到 S3 (AWS),然后运行 hadoop,等到完成后才能看到输出(异常消息)。整个过程大约需要10分钟。这样调试要花很长时间...

第二个问题,现在我的问题是,使用给定的代码,我的输出文件变成空的。当我取消注释下面的行

"debug exception 1"
时,我收到以下消息:

错误:java.io.IOException:键:nt 值: nt should_VERB 1830 的 1 总和:0itr.nextToken():1

到目前为止一切顺利。当我对其进行评论并取消注释

"debug exception 2"
时,我根本没有收到任何异常,表明未到达/不迭代下一个 for-loop

当我评论两个调试异常(根本不尝试抛出任何异常)时,程序成功终止,并且输出文件变为空。 (意味着第一个 for 循环 中没有发生异常)。

我正在尝试找出问题所在。

java hadoop mapreduce
1个回答
0
投票

您的代码中有两个问题如下:

(1) 在reduce handler类中,

values
Iterable
类型,只能迭代一次。如果要迭代多次,可以将迭代结果添加到一个列表中,然后对列表进行迭代,如下所示:

(2)

job.setCombinerClass
使用不当,设置
job.setCombinerClass(ReducerClassW1.class)
时,
CombinerClass
的输出结果是字符串类型的值。在Reduce中执行
Long.parseLong(value.toString())
时出现错误,因此可以删除下面所示的代码:

另外,关于您的问题1: 调试方式解决方案:可以先在本地运行MapReduce程序进行调试。调试好后,打包供服务器执行,但程序需要一些配置和调整:

  1. 在本地机器上配置HADOOP环境变量
  2. 调整代码如下:
  3. 调整程序启动命令,主要是添加startup 参数:

关于您的问题2: 修复代码中的问题后,将不会出现错误消息。 最后在我的机器上执行代码后,在设置的输出目录下成功输出结果,并且有相应的输出数据:

如果您需要本地运行的源代码文件,我也可以提供。

© www.soinside.com 2019 - 2024. All rights reserved.