如何在Mapper和Hadoop的Reducer中提供子类?

问题描述 投票:0回答:2

我有一个从超级(父)类扩展的子(子)类。我想要一种方法为Mapper的输入值提供一般类型,以便我可以将child和parent都提供为有效值,如下所示:

public static class MyMapper扩展了Mapper <...,MyParentClass,...,...>

我希望从MyParentClass扩展的MyChildClass也是有效的。

但是当我运行程序时,如果值是子类,我会得到一个异常:

从map中输入值不匹配:期望MyParentClass,收到MyChildClass

如何启用子类和父类作为映射器的有效输入/输出值?

更新:

package hipi.examples.dumphib;

import hipi.image.FloatImage;
import hipi.image.ImageHeader;
import hipi.imagebundle.mapreduce.ImageBundleInputFormat;
import hipi.util.ByteUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class DumpHib extends Configured implements Tool {

  public static class DumpHibMapper extends Mapper<ImageHeader, FloatImage, IntWritable, Text> {

    @Override
    public void map(ImageHeader key, FloatImage value, Context context) throws IOException, InterruptedException  {

      int imageWidth = value.getWidth();
      int imageHeight = value.getHeight();

      String outputStr = null;

      if (key == null) {
    outputStr = "Failed to read image header.";
      } else if (value == null) {
    outputStr = "Failed to decode image data.";
      } else {
    String camera = key.getEXIFInformation("Model");
    String hexHash = ByteUtils.asHex(ByteUtils.FloatArraytoByteArray(value.getData()));
    outputStr = imageWidth + "x" + imageHeight + "\t(" + hexHash + ")\t  " + camera;
      }

      context.write(new IntWritable(1), new Text(outputStr));
    }

  }

  public static class DumpHibReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

    @Override
    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      for (Text value : values) {
    context.write(key, value);
      }
    }

  }

  public int run(String[] args) throws Exception {

    if (args.length < 2) {
      System.out.println("Usage: dumphib <input HIB> <output directory>");
      System.exit(0);
    }

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "dumphib");

    job.setJarByClass(DumpHib.class);
    job.setMapperClass(DumpHibMapper.class);
    job.setReducerClass(DumpHibReducer.class);

    job.setInputFormatClass(ImageBundleInputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    String inputPath = args[0];
    String outputPath = args[1];

    removeDir(outputPath, conf);

    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    job.setNumReduceTasks(1);

    return job.waitForCompletion(true) ? 0 : 1;

  }

  private static void removeDir(String path, Configuration conf) throws IOException {
    Path output_path = new Path(path);
    FileSystem fs = FileSystem.get(conf);
    if (fs.exists(output_path)) {
      fs.delete(output_path, true);
    }
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new DumpHib(), args);
    System.exit(res);
  }

}

FloatImage是一个超类,我有从它扩展的ChildFloatImage类。当从RecordReader返回ChildFloatImage时,它会抛出先前的异常。

hadoop mapper reducers
2个回答
0
投票

我接下来的解决方案是创建一个容器/包装类,它将所有必需的函数委托给原始对象,如下所示:

public class FloatImageContainer implements Writable, RawComparator<BinaryComparable> {

    private FloatImage floatImage;

    public FloatImage getFloatImage() {
        return floatImage;
    }

    public void setFloatImage(FloatImage floatImage) {
        this.floatImage = floatImage;
    }

    public FloatImageContainer() {
        this.floatImage = new FloatImage();
    }

    public FloatImageContainer(FloatImage floatImage) {
        this.floatImage = floatImage;
    }

    @Override
    public int compare(BinaryComparable o1, BinaryComparable o2) {
        // TODO Auto-generated method stub
        return floatImage.compare(o1, o2);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        // TODO Auto-generated method stub
        return floatImage.compare(b1, s1, l1, b2, s2, l2);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        floatImage.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        floatImage.readFields(in);
    }

}

在Mapper中:

public static class MyMapper extends Mapper<..., FloatImageContainer, ..., ...> {

在这种情况下,FloatImage和ChildFloatImage都可以封装在FloatImageContainer中,你可以摆脱Hadoop中的固有问题,因为只有一个类直接使用FloatImageContainer,它不是any的父/子。


0
投票

Background

原因是类型擦除使得Java(在运行时)无法检查您的MyMapper实际上是否扩展了正确的类型(就Mapper上的泛型类型参数而言)。

Java基本上编译:

List<String> list = new ArrayList<String>();
list.add("Hi");
String x = list.get(0);

List list = new ArrayList();
list.add("Hi");
String x = (String) list.get(0);

这个例子的功劳去了here

因此,您正在输入MyMapper,其中Java想要查看特定Mapper<A, B, C, D>ABCD - 在运行时不可能。所以我们必须在编译时强制进行检查。

Solution

您可以对所有自定义子类执行以下操作:

job.setMapperClass(DumpHibMapper.class);

使用java.lang.Class#asSubclass

而这样做:

job.setMapperClass(DumpHibMapper.class.asSubclass(Mapper.class));
© www.soinside.com 2019 - 2024. All rights reserved.