Hadoop MapReduce无法产生所需的输出

问题描述 投票:2回答:1

我有一个包含专利信息的大文件。标题如下:“ PATENT”,“ GYEAR”,“ GDATE”,“ APPYEAR”,“ COUNTRY”,“ POSTATE”,“ ASSIGNEE”,“ ASSCODE”,“ CLAIMS”。

我想按年份计算每项专利的平均权利要求,其中关键是年份,价值是平均金额。但是,reducer输出显示我的平均数量始终是1.0。我的程序哪里出错了?

主类

 public static void main(String [] args) throws Exception{
    int res = ToolRunner.run(new Configuration(), new AvgClaimsByYear(), args);
    System.exit(res);
}

驱动程序类

    Configuration config = this.getConf();  
    Job job = Job.getInstance(config, "average claims per year"); 
    job.setJarByClass(AvgClaimsByYear.class);
    job.setMapperClass(TheMapper.class);
    job.setPartitionerClass(ThePartitioner.class);
    job.setNumReduceTasks(4);
    job.setReducerClass(TheReducer.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;

Mapper类

    public static class TheMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
      private IntWritable yearAsKeyOut = new IntWritable();
      private IntWritable claimsAsValueOut = new IntWritable(1);
      @Override
      public void map(LongWritable keyIn, Text valueIn, Context context) throws IOException,InterruptedException {
        String line = valueIn.toString();
        if(line.contains("PATENT")) {
            return; //skip header
        }
        else {
            String [] patentData = line.split(","); 
            yearAsKeyOut.set(Integer.parseInt(patentData[1])); 
            if (patentData[8].length() > 0) {
                claimsAsValueOut.set(Integer.parseInt(patentData[8]));
            }
        }
        context.write(yearAsKeyOut, claimsAsValueOut);
    }   
}

分区程序类

    public static class ThePartitioner extends Partitioner<IntWritable, IntWritable> {
      public int getPartition(IntWritable keyIn, IntWritable valueIn, int totalNumPartition) {
        int theYear = keyIn.get();

        if (theYear <= 1970) {
            return 0;
        }
        else if(theYear > 1970 && theYear <= 1979) {
            return 1;
        }
        else if(theYear > 1979 && theYear <=1989) {
            return 2;
        }
        else{
            return 3;
        }
    }

}

Reducer类

 public static class TheReducer extends Reducer<IntWritable,IntWritable,IntWritable,FloatWritable> {
    @Override
    public void reduce(IntWritable yearKey, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
        int totalClaimsThatYear = 0;
        int totalPatentCountThatYear = 0;
        FloatWritable avgClaim = new FloatWritable();

        for(IntWritable value : values) {

            totalClaimsThatYear += value.get();
            totalPatentCountThatYear += 1;      
        }
        avgClaim.set(calculateAvgClaimPerPatent (totalPatentCountThatYear, totalClaimsThatYear)); 
        context.write(yearKey, avgClaim);
    }

    public float calculateAvgClaimPerPatent (int totalPatentCount, int totalClaims) {
        return (float)totalClaims/totalPatentCount;
    }
}

输入

  3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
  3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
  3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,
  3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
  3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,

输出

1963 1.0 
1964 1.0
1965 1.0 
1966 1.0 
1967 1.0 
1968 1.0 
1969 1.0 
1970 1.0
hadoop mapreduce
1个回答
1
投票

在calculateAvgClaimPerPatent()中,您的表达式在转换为浮点数之前执行整数除法。将两个整数转换为除法前的float。

-编辑-

而且,再次查看代码,平均写出的实际上是每条记录的平均索偿数,由分区程序定义的4个间隔进行分组。换句话说,1972年一项专利的权利要求数与1975年另一项专利的权利要求数平均。与您的问题描述不符。

© www.soinside.com 2019 - 2024. All rights reserved.