减少功能不会影响最终输出

问题描述 投票:2回答:2

我从Mapreduce代码获得了奇怪的输出:

输入:

aa bb  
aa cc  
bb aa  
cc dd  
dd bb  
xx aa  
ss rr

输出:

aa  org.mapreduce.userscore.UserScore$ScoreWritable@1  
aa  org.mapreduce.userscore.UserScore$ScoreWritable@0  
aa  org.mapreduce.userscore.UserScore$ScoreWritable@1  
aa  org.mapreduce.userscore.UserScore$ScoreWritable@0  
bb  org.mapreduce.userscore.UserScore$ScoreWritable@0  
bb  org.mapreduce.userscore.UserScore$ScoreWritable@0  
bb  org.mapreduce.userscore.UserScore$ScoreWritable@1  
cc  org.mapreduce.userscore.UserScore$ScoreWritable@1  
cc  org.mapreduce.userscore.UserScore$ScoreWritable@0  
dd  org.mapreduce.userscore.UserScore$ScoreWritable@1  
dd  org.mapreduce.userscore.UserScore$ScoreWritable@0  
rr  org.mapreduce.userscore.UserScore$ScoreWritable@0  
ss  org.mapreduce.userscore.UserScore$ScoreWritable@1  
xx  org.mapreduce.userscore.UserScore$ScoreWritable@1  

码:

package org.mapreduce.userscore;

import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class UserScore  {

 public static class ScoreWritable implements Writable {
            private IntWritable N;
            private IntWritable M;

            //Default Constructor
            public ScoreWritable() {
                this.N = new IntWritable();
                this.M = new IntWritable();
            }

            //Custom constructor
            public ScoreWritable(IntWritable N, IntWritable M){
                this.N = N;
                this.M = M;
            }

            //Setter method to set the values of ScoreWritable objects
            public void set(IntWritable NN,IntWritable MM) {
                this.N = NN;
                this.M = MM;
            }

            //to get the first object from Score Record
            public IntWritable getN() {
                return N;
            }

            //to get the second object from Score Record
            public IntWritable getM() {
                return M;
            }

            @Override
            //overriding default readFields method.
            //It de-serializes the byte stream data
            public void readFields(DataInput in) throws IOException {
                N.readFields(in);
                M.readFields(in);
            }

            @Override
            //It serializes object data into byte stream data
            public void write(DataOutput out) throws IOException {
                N.write(out);
                M.write(out);
            }

            //@Override
            //public boolean equals(Object o) {
                //if (o instanceof ScoreWritable) {
                //ScoreWritable other = (ScoreWritable) o;
                //return N.equals(other.N) && M.equals(other.M);
                //}
                //return false;
            //}

            @Override
            public int hashCode() {
                return N.hashCode();
            }

 }

 public static class Map extends Mapper<LongWritable, Text, Text, ScoreWritable> {
    private Text user = new Text();
    private ScoreWritable score = new ScoreWritable();
    private IntWritable NN = new IntWritable();
    private IntWritable MM = new IntWritable();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int iterator = 1;
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            user.set(tokenizer.nextToken());
            if (iterator == 1) {
                NN = new IntWritable(1);
                MM = new IntWritable(0);
                iterator += 1;
            } else {
                NN = new IntWritable(0);
                MM = new IntWritable(1);
            }
            score.set(NN,MM);
            context.write(user, score);
        }
    }
 }

 public static class Reduce extends Reducer<Text, ScoreWritable, Text, IntWritable> {
     private IntWritable resultf = new IntWritable();
     public void reduce(Text key, Iterable<ScoreWritable> values, Context context) throws IOException, InterruptedException {
        //int result = ((values.getN().get()) * (values.getM()).get());
        resultf.set(result);
        context.write(key, resultf = new IntWritable(2));
    }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    //Create a new Jar and set the driver class(this class) as the main class of jar:
    Job job = new Job(conf, "userscore");
    job.setJarByClass(UserScore.class);

    //Set the map and reduce classes in the job:
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setCombinerClass(Reduce.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //job.setMapOutputKeyClass(Text.class);
    //job.setMapOutputValueClass(ScoreWritable.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setNumReduceTasks(4);

    //Set the input and the output path from the arguments
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //Run the job and wait for its completion
    System.exit(job.waitForCompletion(true) ? 0 : 1);
 }

}

我正在尝试编写Mapreduce代码以从文本文件中读取。文本文件在每行中都有一对字符串,这些字符串表示社交网络中的第一个跟随第二个用户的用户名。我试图计算每个用户的关注者总数和关注用户名,然后将这两个数相乘,形成每个用户的一种得分。

我们的想法是为值创建一个Writable自定义类(ScoreWritable),并将用户名作为Text键和Value作为ScoreWritable类传输。如果您注意到我更改了Reduce的输出以输出常量“2”,则会检查,但输出如上所示。

我究竟做错了什么?

我在虚拟机中使用Cloudera映像来编译和运行jar文件。

java mapreduce
2个回答
0
投票

您使用的是TextOutputFormat,它不知道如何打印(作为文本)您的自定义ScoreWritable,实际上它只输出ScoreWritable实例的字符串表示形式。我知道最快的解决方法是覆盖ScoreWritable的toString()方法

public String toString() {
    return "" + N.get() + "\t" + M.get();
}

或者您可以编写自己的Custom OutputFormat。例如,请参阅here

希望这可以帮助


0
投票

所以我设法使代码工作。正如您所看到的,存在一些问题:

  1. 管理自定义类中的数据流(我猜),感谢@gtosto建议使用ToString()
  2. 在Reducer中错误使用变量。
  3. Reducer中的错误迭代方法。

我还添加了一个单独的Combiner类来优化Mapper和Reducer之间的网络流。

这是最终的代码:(附评论)

package org.mapreduce.userscore;

import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class UserScore  {
    // Defining a custum class that contains two IntWritabe values
    // this custom class will be used to hold the Value part of the key-value pairs between the mapper and reducers

 public static class ScoreWritable implements Writable {
            private IntWritable N;
            private IntWritable M;

            //Default Constructor
            public ScoreWritable() {
                this.N = new IntWritable();
                this.M = new IntWritable();
            }

            //Custom constructor
            public ScoreWritable(IntWritable N, IntWritable M){
                this.N = N;
                this.M = M;
            }

            //Setter method to set the values of ScoreWritable objects
            public void set(IntWritable NN,IntWritable MM) {
                this.N = NN;
                this.M = MM;
            }

            //to get the first object from Score Record
            public IntWritable getN() {
                return N;
            }

            //to get the second object from Score Record
            public IntWritable getM() {
                return M;
            }

            @Override
            //overriding default readFields method.
            //It de-serializes the byte stream data
            public void readFields(DataInput in) throws IOException {
                N.readFields(in);
                M.readFields(in);
            }

            @Override
            //It serializes object data into byte stream data
            public void write(DataOutput out) throws IOException {
                N.write(out);
                M.write(out);
            }

            @Override
            //OrganizING the data stream in this custom class
            public String toString() {
                return "" + N.get() + "\t" + M.get();
            }


            @Override
            public int hashCode() {
                return N.hashCode();
            }

 }

 public static class Map extends Mapper<LongWritable, Text, Text, ScoreWritable> {
    private Text user = new Text();
    private ScoreWritable score = new ScoreWritable();  //variabe sscore will hold the pair (N,M) for eatch user
    private IntWritable NN = new IntWritable();
    private IntWritable MM = new IntWritable();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int iterator = 1;
        // tokenizing: variable tokenizer will hold the first username then the second username in each ine of the input text file
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            user.set(tokenizer.nextToken());
            if (iterator == 1) {                         // here variabe tokenizer holds the first username
                NN = new IntWritable(1);                 // saying that this user (username1) is folowing ssomeone
                MM = new IntWritable(0);
                iterator += 1;
            } else {                                     // here variabe tokenizer will hold the second username
                NN = new IntWritable(0);
                MM = new IntWritable(1);                 // saying that this user (username2) is being followed by someone
            }
            score.set(NN,MM);                            // giving eiter (1,0) or (0,1) to variable score
            context.write(user, score);                  // assigning variable score for each user in each line
        }   // emitting [Ali, (1,0)] or [Ali, (0,1)] means that Ali is following someone or being followed by someone, respectively.
    }       // next: the Reducer will go through all the values for each key, sum the total internal values of the key.
 }

    public static class Combine extends Reducer<Text, ScoreWritable, Text, ScoreWritable> {
        private IntWritable resultf = new IntWritable();
        private IntWritable NNN = new IntWritable();
        private IntWritable MMM = new IntWritable();
        public void reduce(Text key, Iterable<ScoreWritable> values, Reducer<Text, ScoreWritable, Text, ScoreWritable>.Context context)
                throws IOException, InterruptedException {
            int sum1 = 0;
            int sum2 = 0;
            for (ScoreWritable val:values) {
                sum1 += val.getN().get();
                sum2 += val.getM().get();
            }
            NNN = new IntWritable(sum1);
            MMM = new IntWritable(sum2);
            context.write(key, new ScoreWritable(NNN, MMM));    // this will combine all the values for each key before emitting the new pairs to Reduce function
        }
    }

 public static class Reduce extends Reducer<Text, ScoreWritable, Text, IntWritable> {
     private IntWritable resultf = new IntWritable();
     public void reduce(Text key, Iterable<ScoreWritable> values, Reducer<Text, ScoreWritable, Text, IntWritable>.Context context)
             throws IOException, InterruptedException {
         int sum3 = 0;
         int sum4 = 0;
         for (ScoreWritable val:values) {
             sum3 = val.getN().get();                // if the current user is following 20 people, then Sum3 = 20
             sum4 = val.getM().get();                // if the current user is being followed by 30 people, then Sum4 = 30
         }
         int result = sum3 * sum4;
         resultf.set(result);
         context.write(key, resultf);                // this will emit the current user and his/her corresponding score
    }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    //Create a new Jar and set the driver class(this class) as the main class of jar:
    Job job = new Job(conf, "userscore");
    job.setJarByClass(UserScore.class);

    //Set the map and reduce classes in the job:
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setCombinerClass(Combine.class);                 //activated unique combiner class which is different than the Reducer's IO is different

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapOutputKeyClass(Text.class);                //assigning output class for mapper since it is different than the Reducer's output class
    job.setMapOutputValueClass(ScoreWritable.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setNumReduceTasks(4);                            //assigning 4 reducers

    //Set the input and the output path from the arguments
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //Run the job and wait for its completion
    System.exit(job.waitForCompletion(true) ? 0 : 1);
 }

}

这是4个输出文本文件之一的一部分:

user0   2745
user1001    18724
user1005    2405
user1009    16577
user1012    1710
user1016    10074
user1023    2173
user1027    791
© www.soinside.com 2019 - 2024. All rights reserved.