Map减少错误的输出

问题描述 投票:0回答:2

我正在尝试收集特定站点的最高和最低温度,然后查找每个不同日期的温度总和,但是我在映射器中始终遇到错误,并且尝试了很多其他方法,例如使用stringtokenizer但还是一样,我得到一个错误。

样本输入。

站点日期(YYYYMMDD)元素温度标记1 flat2其他值

我只需要输入中的测站,日期(键),元素和温度

USW00003889,20180101,TMAX,122,7,1700
USW00003889,20180101,TMIN,-67,7,1700
UK000056225,20180101,TOBS,56,7,1700
UK000056225,20180101,PRCP,0,7,1700
UK000056225,20180101,SNOW,0,7
USC00264341,20180101,SNWD,0,7,1700
USC00256837,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
UK000056225,20180101,SNWD,0,7,800
USW00003889,20180102,TMAX,12,E
USW00003889,20180102,TMIN,3,E
UK000056225,20180101,PRCP,42,E
SWE00138880,20180101,PRCP,50,E
UK000056225,20180101,PRCP,0,a
USC00256480,20180101,PRCP,0,7,700
USC00256480,20180101,SNOW,0,7
USC00256480,20180101,SNWD,0,7,700
SWE00138880,20180103,TMAX,-228,7,800
SWE00138880,20180103,TMIN,-328,7,800
USC00247342,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
SWE00137764,20180101,PRCP,63,E
UK000056225,20180101,SNWD,0,E
USW00003889,20180104,TMAX,-43,W
USW00003889,20180104,TMIN,-177,W
            public static class MaxMinMapper
                 extends Mapper<Object, Text, Text, IntWritable> {

               private Text newDate = new Text(); 

               public void map(Object key, Text value, Context context) throws 
                    IOException, 
                     InterruptedException {


                String stationID = "USW00003889";
                String[] tokens = value.toString().split(",");
                String station = "";
                String date = "";
                String element = "";
                int data = 0;

                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = Integer.parseInt(tokens[3]);


                if (stationID.equals(station) && ( element.equals("TMAX") || 
                       element.equals("TMIN")) ) {

                    newDate.set(date);
                    context.write(newDate, new IntWritable(data));

                     }

                 }


              }

        public static class MaxMinReducer
            extends Reducer<Text, Text, Text, IntWritable> {

             private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

                int sumResult = 0;
                int val1 = 0;
                int val2 = 0;

                while (values.iterator().hasNext()) {

                        val1 = values.iterator().next().get();
                        val2 = values.iterator().next().get();
                        sumResult = val1 + val2;

                    }

                    result.set(sumResult);


                context.write(key, result);

                }
            }

        }

请帮助我,谢谢。

更新:使用条件验证每一行,并将数据变量更改为String(稍后再更改回Integer-> IntWritable)。

            if (tokens.length <= 5) {
                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = tokens[3];
                otherValue = tokens[4];
            }else{
                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = tokens[3];
                otherValue = tokens[4];
                otherValue2 = tokens[5];
            }

Update2:好的,我现在将输出写入文件,但是输出错误。我需要它来添加具有相同日期(键)的两个值。我在做什么错了?

OUTPUT:

20180101    -67
20180101    122
20180102    3
20180102    12
20180104    -177
20180104    -43
Desired Output
20180101    55
20180102    15
20180104    -220

这也是我收到的错误,即使我得到输出。

    ERROR: (gcloud.dataproc.jobs.submit.hadoop) Job [8e31c44ccd394017a4a28b3b16471aca] failed with error:
Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at 'https://console.cloud.google.com/dataproc/jobs/8e31c44ccd394017a4a28b3b16471aca
?project=driven-airway-257512&region=us-central1' and in 'gs://dataproc-261a376e-7874-4151-b6b7-566c18758206-us-central1/google-cloud-dataproc-metainfo/f912a2f0-107f-40b6-94
56-b6a72cc8bfc4/jobs/8e31c44ccd394017a4a28b3b16471aca/driveroutput'.
    19/11/14 12:53:24 INFO client.RMProxy: Connecting to ResourceManager at cluster-1e8f-m/10.128.0.12:8032
19/11/14 12:53:25 INFO client.AHSProxy: Connecting to Application History server at cluster-1e8f-m/10.128.0.12:10200
19/11/14 12:53:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/14 12:53:26 INFO input.FileInputFormat: Total input files to process : 1
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: number of splits:1
19/11/14 12:53:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573654432484_0035
19/11/14 12:53:27 INFO impl.YarnClientImpl: Submitted application application_1573654432484_0035
19/11/14 12:53:27 INFO mapreduce.Job: The url to track the job: http://cluster-1e8f-m:8088/proxy/application_1573654432484_0035/
19/11/14 12:53:27 INFO mapreduce.Job: Running job: job_1573654432484_0035
19/11/14 12:53:35 INFO mapreduce.Job: Job job_1573654432484_0035 running in uber mode : false
19/11/14 12:53:35 INFO mapreduce.Job:  map 0% reduce 0%
19/11/14 12:53:41 INFO mapreduce.Job:  map 100% reduce 0%
19/11/14 12:53:52 INFO mapreduce.Job:  map 100% reduce 20%
19/11/14 12:53:53 INFO mapreduce.Job:  map 100% reduce 40%
19/11/14 12:53:54 INFO mapreduce.Job:  map 100% reduce 60%
19/11/14 12:53:56 INFO mapreduce.Job:  map 100% reduce 80%
19/11/14 12:53:57 INFO mapreduce.Job:  map 100% reduce 100%
19/11/14 12:53:58 INFO mapreduce.Job: Job job_1573654432484_0035 completed successfully
19/11/14 12:53:58 INFO mapreduce.Job: Counters: 55
    File System Counters
        FILE: Number of bytes read=120
        FILE: Number of bytes written=1247665
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        GS: Number of bytes read=846
        GS: Number of bytes written=76
        GS: Number of read operations=0
        GS: Number of large read operations=0
        GS: Number of write operations=0
        HDFS: Number of bytes read=139
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=1
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Job Counters 
        Killed reduce tasks=1
        Launched map tasks=1
        Launched reduce tasks=5
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=17348
        Total time spent by all reduces in occupied slots (ms)=195920
        Total time spent by all map tasks (ms)=4337
        Total time spent by all reduce tasks (ms)=48980
        Total vcore-milliseconds taken by all map tasks=4337
        Total vcore-milliseconds taken by all reduce tasks=48980
        Total megabyte-milliseconds taken by all map tasks=8882176
        Total megabyte-milliseconds taken by all reduce tasks=100311040
    Map-Reduce Framework
        Map input records=25
        Map output records=6
        Map output bytes=78
        Map output materialized bytes=120
        Input split bytes=139
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=120
        Reduce input records=6
        Reduce output records=6
        Spilled Records=12
        Shuffled Maps =5
        Failed Shuffles=0
        Merged Map outputs=5
        GC time elapsed (ms)=1409
        CPU time spent (ms)=6350
        Physical memory (bytes) snapshot=1900220416
        Virtual memory (bytes) snapshot=21124952064
        Total committed heap usage (bytes)=1492123648
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=846
    File Output Format Counters 
        Bytes Written=76
Job output is complete

更新3:

我更新了Reducer(在LowKey说了之后),它给了我与上面相同的输出。它没有做我想要做的加法。它完全忽略了该操作。为什么?

    public static class MaxMinReducer
            extends Reducer<Text, Text, Text, IntWritable> {

             public IntWritable result = new IntWritable();

             public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

                int value = 0;
                int sumResult = 0;
                Iterator<IntWritable> iterator = values.iterator();

                while (values.iterator().hasNext()) {

                    value = iterator.next().get();

                        sumResult = sumResult + value;

                }   

                result.set(sumResult);
                context.write(key, result);
            }

        }
java dictionary hadoop mapreduce indexoutofboundsexception
2个回答
0
投票
这些列是否用制表符分隔?如果是,那么不要期望在其中找到空格。

0
投票
您在做什么错?好吧,一方面,你为什么要拥有:

final int missing = -9999;

没有任何意义。

下面,您似乎有一些代码显然应该添加两个值,但似乎您不小心将列表中的项目扔掉了。查看您的位置:

if (values.iterator().next().get() != missing)

嗯...您从未保存过该值,所以这意味着您将其丢弃了。

另一个问题是您添加错误...由于某种原因,您试图为循环的每次迭代添加两个值。您应该添加一个,因此您的循环应如下所示:

IntWritable value = null; Iterator iterator = values.iterator(); while (values.iterator().hasNext()) { value = iterator.next().get(); if (value != missing){ sumResult = sumResult + value; } }

下一个明显的问题是将输出行放在while循环内:

while (values.iterator().hasNext()) { [...] context.write(key, result); }

这意味着,每当您将一个项目读入减速器时,您都会写出一个项目。我认为您要尝试做的是读入给定键的所有项,然后写一个减小的值(总和)。在这种情况下,您不应将输出放在循环内。应该在之后。

while ([...]) { [...] } result.set(sumResult); context.write(key, result);

© www.soinside.com 2019 - 2024. All rights reserved.