我正在尝试制作一个 Map-reduce 程序,它将执行 2 个周期。第一个映射缩减周期将创建一个包含 2 个键|值对的文件。然后我需要在计算平均值方法中调用该文件。然后我将进行第二个映射减少周期,其中我将根据该平均数进行一些计算并在输出文件中写入一些数据。 面临的问题是我正确创建了第一个文件,然后当程序调用计算平均数的方法时,发生 FileNotFound 异常。
import org.apache.commons.text.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import java.io.IOException;
public class ge3ex3 {
private final static String PATH = "avOutput";
public static class ge3ex3AverageMapper extends Mapper<Object, Text, Text, LongWritable>{
//constant variable one
private final LongWritable one = new LongWritable(1);
//contant variable for cases
private final LongWritable cases = new LongWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//read the rows
String row = value.toString();
//Take the items from each row
String[] items = row.split(",");
//check if there are 12 columns
if (items.length == 12){
try {
//read the case column
int eachCase = Integer.parseInt(items[4]);
//set the count of the cases
cases.set(eachCase);
//write to context the cases' count and how many records
context.write(new Text("cases"), cases);
context.write(new Text("records"), one);
}catch (NumberFormatException x){
}
}
}
}
public static class ge3ex3Reducer extends Reducer<Text, LongWritable, Text, LongWritable>{
private LongWritable sum = new LongWritable();
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException{
int tempSum = 0;
//Iterate through values and add up the sum
for(LongWritable val : values){
tempSum += val.get();
}
sum.set(tempSum);
//write the sum into context
context.write(key, sum);
}
}
public static class ge3ex3Mapper extends Mapper<Object, Text, Text, LongWritable>{
private final static LongWritable ONE = new LongWritable(1);
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException{
//read the file from the first mapper
Configuration conf = context.getConfiguration();
Double av = Double.parseDouble(conf.get("av"));
//pass each value to string
String row = value.toString();
//pass it to array splited by comma
String[] items = row.split(",");
//check if the row has more cases than the average
if(items.length == 12){
try {
int eachCase = Integer.parseInt(items[4]);
if(eachCase > av){
//take year, month, country and write them into context
String year = items[3];
String month = items[2];
String country = items[6];
context.write(new Text(year + " / " + month + " / " + country), ONE);
}
}catch (NumberFormatException x){
}
}
}
}
//calculate the average
private static double calculateAv()throws IOException{
//store the sum of cases & records
long cases = 0;
long records = 0;
//read from the file from the first map-reduce phase
File fs = new File(PATH + "/part-r-00000");
if (!fs.exists()) throw new FileNotFoundException("File not Found: " + fs.getPath());
//read from file
BufferedReader bt = new BufferedReader(new FileReader(fs));
String line;
//while the line is not empty
while ((line = bt.readLine()) != null){
//Parse each line
StringTokenizer sr = new StringTokenizer(line);
//
String value = sr.nextToken();
//check if value is CASES or RECORDS
//and sign them into their perspective variables
if(value.equals("cases")){
String countCases = sr.nextToken();
cases = Long.parseLong(countCases);
}else if (value.equals("records")){
String countRecords = sr.nextToken();
records = Long.parseLong(countRecords);
}
}
//calculate the average and return it
double av = cases / (double) records;
return av;
}
public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"ge3ex3_get_Average");
job.setJarByClass(ge3ex3.class);
job.setMapperClass(ge3ex3AverageMapper.class);
job.setReducerClass(ge3ex3Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(PATH));
//wait for the next map - reduce phase to complete
job.waitForCompletion(true);
//calculate the average based on the first mapper cycle
double av = calculateAv();
//Second Mapping Phase
//pass the list with the average number
conf.setDouble("av", av);
Job job1 = Job.getInstance(conf, "ge3ex3_result");
job1.setJarByClass(ge3ex3.class);
job1.setMapperClass(ge3ex3Mapper.class);
job1.setReducerClass(ge3ex3Reducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
//wait for the next map - reduce phase to complete
job1.waitForCompletion(true);
}
}
下面是错误
Exception in thread "main" java.io.FileNotFoundException: File not Found: avOutput/part-r-00000
at ge3ex3.calculateAv(ge3ex3.java:95)
at ge3ex3.main(ge3ex3.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
还有core.xml的配置
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
显然问题出在 avCalculate 方法中,其中
File fs = new File(PATH + "/part-r-00000");
不工作。 相反,我用了
FileSystem fs = FileSystem.get(new Configuration());
Path outputPath = new Path(PATH + "/part-r-00000");
if(!fs.exists(outputPath)){
throw new FileNotFoundException("File not Found: "+outputPath.toString());
}
FSDataInputStream inputStream = fs.open(outputPath);
//read from file
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
它的作用就像一个魅力!