我正在运行 hadoop 代码,但遇到问题。
注意注释行
"debug exception 1"
和 "debug exception 2"
以及它们下面的行。由于我无法在 hadoopmap/reduce 中使用
System.out.println
打印消息,因此我使用的唯一调试方法是抛出带有消息的异常,如您在提到的注释下面看到的那样。
public class FindCoallocations {
public static String bucketName;
public static AWS aws;
public static boolean debugMode = true;
public static String uniqueWord = "43uireoaugibghui4reagf"; //This word doesn't exist in the corpus (hopefully).
public static String decade;
public static long N = 1;
public static class MapperClassW1 extends Mapper<LongWritable, Text, Text , Text> {
private Text word_1;
private Text word_2;
private Text years;
private Text matchCount;
public String decade;
@Override
public void setup(Context context) {
Configuration config = context.getConfiguration();
decade = config.getStrings("decade")[0];
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
word_1 = new Text();
word_2 = new Text();
years = new Text();
matchCount = new Text();
StringTokenizer itr = new StringTokenizer(value.toString());
word_1.set(itr.nextToken());
word_2.set(itr.nextToken());
years.set(itr.nextToken());
matchCount.set(itr.nextToken());
if(years.toString().equals(decade)){
Text countN = new Text(uniqueWord);
context.write(countN, matchCount); // used for counting N (number of words)
context.write (word_1, value); // used for claculating c(w1)
//throw new IOException("exception at mapper. key: "+key.toString()+" value: "+value.toString() +" word_1: "+word_1.toString());
}
}
}
public static class PartitionerClass extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
return (key.toString().hashCode() % numPartitions);
}
}
public static class ReducerClassW1 extends Reducer<Text,Text,Text,Text> {
public String decade;
public void setup(Context context) {
Configuration config = context.getConfiguration();
decade = config.getStrings("decade")[0];
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
if((key.toString()).equals(uniqueWord)) // count N
{
long sum = 0;
for (Text value : values) {
sum += Long.parseLong(value.toString());
}
//context.write(key, new Text(Long.toString(sum)));
context.getCounter(Counter.COMBINE_INPUT_RECORDS).increment(sum);
}
else // count w1 ( value is full entry)
{
long sum = 0;
for (Text value : values) {
StringTokenizer itr = new StringTokenizer(value.toString());
itr.nextToken();
itr.nextToken();
itr.nextToken();
String tk = itr.nextToken();
// debug exception 1
//throw new IOException ("the key: "+key.toString()+" the value: "+value.toString()+ " sum: "+sum + "itr.nextToken(): "+tk);
sum += Long.parseLong(tk);
}
for (Text value : values) {
// debug exception 2
//throw new IOException ("the key: "+key.toString()+" the value: "+value.toString()+ " sum: "+sum);
context.write(value, new Text("w1:"+Long.toString(sum)));
}
}
}
}
//Receives n args: 0 - "Step1" , 1 - decade , 2 - inputFileName1, ....... n-1 inputFileName(n-2) , n outputFileName
public static void main(String[] args) throws Exception {
System.out.println("[DEBUG] STEP 1 started!");
aws = AWS.getInstance();
bucketName = aws.bucketName;
int inputs = args.length;
System.out.println("args: ");
for(int i=0; i<args.length;i++){
System.out.print("args["+i+"]"+" : "+args[i] +", ");
}
System.out.println("\n");
String[] inputFileKey = new String[args.length-3];
String outputFileKey = args[args.length-1];
for(int i=2; i<args.length - 1;i++){
inputFileKey[i-2] = args[i];
}
FindCoallocations.setDecade(args[1]);
System.out.println("decade: "+FindCoallocations.getDecade());
for(int i=0; i<inputFileKey.length;i++){
System.out.println("input: "+inputFileKey[i]);
}
System.out.println("output: "+outputFileKey);
Configuration conf = new Configuration();
conf.setQuietMode(false);
conf.setStrings("decade",FindCoallocations.getDecade());
Job job = Job.getInstance(conf, "Join w1 count");
job.setJarByClass(FindCoallocations.class);
job.setMapperClass(MapperClassW1.class);
job.setPartitionerClass(PartitionerClass.class);
job.setCombinerClass(ReducerClassW1.class);
job.setReducerClass(ReducerClassW1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for(int i=0; i<inputFileKey.length; i++){
FileInputFormat.addInputPath(job, new Path("s3://"+bucketName+"/"+inputFileKey[i]));
}
FileOutputFormat.setOutputPath(job, new Path("s3://"+bucketName+"/"+outputFileKey+"_"+decade+"_w1"));
job.waitForCompletion(true);
job.monitorAndPrintJob();
}
}
第一个问题,有没有更好的调试方法?目前,对于每次运行试验,我需要导出一个新的 Jar,将其上传到 S3 (AWS),然后运行 hadoop,等到完成后才能看到输出(异常消息)。整个过程大约需要10分钟。这样调试要花很长时间...
第二个问题,现在我的问题是,使用给定的代码,我的输出文件变成空的。当我取消注释下面的行
"debug exception 1"
时,我收到以下消息:
错误:java.io.IOException:键:nt 值: nt should_VERB 1830 的 1 总和:0itr.nextToken():1
到目前为止一切顺利。当我对其进行评论并取消注释
"debug exception 2"
时,我根本没有收到任何异常,表明未到达/不迭代下一个 for-loop。
当我评论两个调试异常(根本不尝试抛出任何异常)时,程序成功终止,并且输出文件变为空。 (意味着第一个 for 循环 中没有发生异常)。
我正在尝试找出问题所在。
您的代码中有两个问题如下:
(1) 在reduce handler类中,
values
是Iterable
类型,只能迭代一次。如果要迭代多次,可以将迭代结果添加到一个列表中,然后对列表进行迭代,如下所示:
(2)
job.setCombinerClass
使用不当,设置job.setCombinerClass(ReducerClassW1.class)
时,CombinerClass
的输出结果是字符串类型的值。在Reduce中执行Long.parseLong(value.toString())
时出现错误,因此可以删除下面所示的代码:
另外,关于您的问题1: 调试方式解决方案:可以先在本地运行MapReduce程序进行调试。调试好后,打包供服务器执行,但程序需要一些配置和调整:
关于您的问题2: 修复代码中的问题后,将不会出现错误消息。 最后在我的机器上执行代码后,在设置的输出目录下成功输出结果,并且有相应的输出数据:
如果您需要本地运行的源代码文件,我也可以提供。