FileNotFound hadoop 而在那里

问题描述 投票:0回答:1

我正在尝试制作一个 Map-reduce 程序,它将执行 2 个周期。第一个映射缩减周期将创建一个包含 2 个键|值对的文件。然后我需要在计算平均值方法中调用该文件。然后我将进行第二个映射减少周期,其中我将根据该平均数进行一些计算并在输出文件中写入一些数据。 面临的问题是我正确创建了第一个文件,然后当程序调用计算平均数的方法时,发生 FileNotFound 异常。

import org.apache.commons.text.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;

import java.io.IOException;

public class ge3ex3 {
    private final static String PATH = "avOutput";
    public static class ge3ex3AverageMapper extends Mapper<Object, Text, Text, LongWritable>{
        //constant variable one
        private  final LongWritable one = new LongWritable(1);
        //contant variable for cases
        private final LongWritable cases = new LongWritable();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            //read the rows
            String row = value.toString();
            //Take the items from each row
            String[] items = row.split(",");
            //check if there are 12 columns
            if (items.length == 12){
                try {
                    //read the case column
                    int eachCase = Integer.parseInt(items[4]);
                    //set the count of the cases
                    cases.set(eachCase);
                    //write to context the cases' count and how many records
                    context.write(new Text("cases"), cases);
                    context.write(new Text("records"), one);
                }catch (NumberFormatException x){

                }

            }
        }
    }
    public static class ge3ex3Reducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        private LongWritable sum = new LongWritable();
        public void reduce(Text key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException{
            int tempSum = 0;
            //Iterate through values and add up the sum
            for(LongWritable val : values){
                tempSum += val.get();
            }
            sum.set(tempSum);
            //write the sum into context
            context.write(key, sum);
        }
    }
    public static class ge3ex3Mapper extends Mapper<Object, Text, Text, LongWritable>{
        private final static LongWritable ONE = new LongWritable(1);
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException{
            //read the file from the first mapper
            Configuration conf = context.getConfiguration();
            Double av = Double.parseDouble(conf.get("av"));
            //pass each value to string
            String row = value.toString();
            //pass it to array splited by comma
            String[] items = row.split(",");
            //check if the row has more cases than the average
            if(items.length == 12){
                try {
                    int eachCase = Integer.parseInt(items[4]);
                    if(eachCase > av){
                        //take year, month, country and write them into context
                        String year = items[3];
                        String month = items[2];
                        String country = items[6];
                        context.write(new Text(year + " / " + month + " / " + country), ONE);
                    }
                }catch (NumberFormatException x){

                }
            }
        }
    }
    //calculate the average
    private static double calculateAv()throws IOException{
        //store the sum of cases & records
        long cases = 0;
        long records = 0;
        //read from the file from the first map-reduce phase
        File fs = new File(PATH + "/part-r-00000");
        if (!fs.exists()) throw new FileNotFoundException("File not Found: " + fs.getPath());

        //read from file
        BufferedReader bt = new BufferedReader(new FileReader(fs));
        String line;
        //while the line is not empty
        while ((line = bt.readLine()) != null){
            //Parse each line
            StringTokenizer sr = new StringTokenizer(line);
            //
            String value = sr.nextToken();
            //check if value is CASES or RECORDS
            //and sign them into their perspective variables
            if(value.equals("cases")){
                String countCases = sr.nextToken();
                cases = Long.parseLong(countCases);
            }else if (value.equals("records")){
                String countRecords = sr.nextToken();
                records = Long.parseLong(countRecords);
            }
        }
        //calculate the average and return it
        double av = cases / (double) records;
        return av;
    }
    public static void main(String[] args)throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"ge3ex3_get_Average");
        job.setJarByClass(ge3ex3.class);
        job.setMapperClass(ge3ex3AverageMapper.class);
        job.setReducerClass(ge3ex3Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(PATH));
        //wait for the next map - reduce phase to complete
        job.waitForCompletion(true);

        //calculate the average based on the first mapper cycle
        double av = calculateAv();

        //Second Mapping Phase
        //pass the list with the average number
        conf.setDouble("av", av);
        Job job1 = Job.getInstance(conf, "ge3ex3_result");
        job1.setJarByClass(ge3ex3.class);
        job1.setMapperClass(ge3ex3Mapper.class);
        job1.setReducerClass(ge3ex3Reducer.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));
        //wait for the next map - reduce phase to complete
        job1.waitForCompletion(true);
    }
}

下面是错误

Exception in thread "main" java.io.FileNotFoundException: File not Found: avOutput/part-r-00000
    at ge3ex3.calculateAv(ge3ex3.java:95)
    at ge3ex3.main(ge3ex3.java:134)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:236)

还有core.xml的配置

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>
java hadoop hdfs
1个回答
0
投票

显然问题出在 avCalculate 方法中,其中

File fs = new File(PATH + "/part-r-00000");

不工作。 相反,我用了

    FileSystem fs = FileSystem.get(new Configuration());
    Path outputPath = new Path(PATH + "/part-r-00000");
    if(!fs.exists(outputPath)){
        throw new FileNotFoundException("File not Found: "+outputPath.toString());
    }
    FSDataInputStream inputStream = fs.open(outputPath);

    //read from file
    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

它的作用就像一个魅力!

© www.soinside.com 2019 - 2024. All rights reserved.