类型错误:无法解压不可迭代的浮点对象 - MapReduce - mrjob

问题描述 投票:0回答:1

我正在测试一个简单的示例来了解 MapReduce 和 mrjob

目标是将所有数字的对数相加,并将所有数字的计数除以该总和。

代码非常简单明了:

# mrMedian.py

from mrjob.job import MRJob
from mrjob.step import MRStep

import math

class MrMedian(MRJob):

    def __init__(self, *args, **kwargs):
        super(MrMedian, self).__init__(*args, **kwargs)
        self.inCount = 0
        self.inLogSum = 0.0

    #increment the count of elements and add the 
    # logarithm of the current number to the summation
    def map(self, key, val):
        inVal = float(val)
        self.inCount += 1
        self.inLogSum += math.log(inVal)

    # return the count and summation after all numbers are processed     
    def map_final(self):
        yield (1, [self.inCount, self.inLogSum])

    # aggregate the count and summation values and yield the result
    def reduce(self, key, packedValues):
        cumLogSum=1.0
        cumN=0

        for valArr in packedValues:
            nj = int(valArr[0])
            cumN += nj        
            cumLogSum += float(valArr[1])
        
        median = cumN/cumLogSum
        yield (median)

    # define mapper and reducer
    def steps(self):
        return ([
            MRStep(mapper=self.map, reducer=self.reduce, mapper_final=self.map_final)
        ])
    
# to run:
# python MrMedian.py < inputFile.txt     
if __name__ == '__main__':
    MrMedian.run()

map_final
方法中,我得到了
(1, [self.inCount, self.inLogSum])
。值
1
是被忽略的键,列表
[self.inCount, self.inLogSum]
是在
reduce
方法中我们应该将其 (
packedValues
) 视为可迭代的值,并使用
for
以某种方式迭代它循环。

我收到此错误:

(venv) shahriar@Lenovo:/media/shahriar/01D779182B58B9D0$ python mrMedian.py < inputFile.txt > outFile.txt No configs found; falling back on auto-configuration No configs specified for inline runner Creating temp directory /tmp/mrMedian.shahriar.20221113.152412.029427 Running step 1 of 1... reading from STDIN

Error while reading from /tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000/input:

Traceback (most recent call last):   
    File "/media/shahriar/01D779182B58B9D0/assignment2/mrMedian.py", line 43, in <module>
    MrMedian.run()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 616, in run
    cls().execute()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 687, in execute
    self.run_job()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 636, in run_job
    runner.run()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/runner.py", line 503, in run
    self._run()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 161, in _run
    self._run_step(step, step_num)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 170, in _run_step
    self._run_streaming_step(step, step_num)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 187, in _run_streaming_step
    self._run_reducers(step_num, num_reducer_tasks)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 289, in _run_reducers
    self._run_multiple(   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 130, in _run_multiple
    func()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 746, in _run_task
    invoke_task(   File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/inline.py", line 133, in invoke_task
    task.execute()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 681, in execute
    self.run_reducer(self.options.step_num)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 795, in run_reducer
    for k, v in self.reduce_pairs(read_lines(), step_num=step_num):   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 866, in reduce_pairs
    for k, v in self._combine_or_reduce_pairs(pairs, 'reducer', step_num):   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 889, in _combine_or_reduce_pairs
    for k, v in task(key, values) or (): 
    TypeError: cannot unpack non-iterable float object

作为

map_final
方法结果的输入文件是可以的:

shahriar@Lenovo-:/tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000$ cat input 
1   [13, 78.5753201837955]
1   [13, 77.20894832945609]
1   [12, 75.70546637672973]
1   [12, 73.97942285230064]
1   [13, 78.7642193551817]
1   [13, 74.83203774429285]
1   [13, 72.28868623927899]
1   [11, 67.51370208632588]

我评论了reducer方法中的

for
循环来检查错误是否是由于
packedValues
造成的,但我再次收到错误。

任何想法表示赞赏。

python mapreduce hadoop-streaming mrjob
1个回答
0
投票

我已经通过reducer函数的yield(键,值)对解决了类似的问题。

def reduce(self, key, packed_values):
    ...
    # must yield k, v here
    yield 1, cumulative_n / cumulative_ln_val

查看 job.py 的源代码,看起来“line 889, in _combine_or_reduce_pairs”期望来自reducer函数的键值结果。

def _combine_or_reduce_pairs(self, pairs, mrc, step_num=0):
    """Helper for :py:meth:`combine_pairs` and :py:meth:`reduce_pairs`."""
    step = self._get_step(step_num, MRStep)

    task = step[mrc]
    task_init = step[mrc + '_init']
    task_final = step[mrc + '_final']
    if task is None:
        raise ValueError('No %s in step %d' % (mrc, step_num))

    if task_init:
        for k, v in task_init() or ():
            yield k, v

    # group all values of the same key together, and pass to the reducer
    #
    # be careful to use generators for everything, to allow for
    # very large groupings of values
    for key, pairs_for_key in itertools.groupby(pairs, lambda k_v: k_v[0]):
        values = (value for _, value in pairs_for_key)
        for k, v in task(key, values) or ():
            yield k, v
© www.soinside.com 2019 - 2024. All rights reserved.