MRJob在Python中排序

问题描述 投票:1回答:1

我有一项任务,要求我在python中使用mapper / reducer来完成客户数据的MapReduce。我有一个CSV文件,其中包含CustomerID,ProductID和已用金额。第一项任务是确定每个客户的总花费,这是我轻松完成的。下一部分要求我采用此列表,并按降序排列的总金额进行排序。我在这里苦苦挣扎...建议在另一个MapReduce之上使用MapReduce。这是我的代码:

PART 1:

from mrjob.job import MRJob

class TotalAmountCust(MRJob):

    def mapper(self, _, line):
        (customerid, idno, amount) = line.split(',')
        yield customerid, float(amount)

    def reducer(self, customerid, amount):
        yield customerid, sum(amount)

if __name__ == '__main__':
    TotalAmountCust.run()

PART 2:

from mrjob.job import MRJob
from mrjob.step import MRStep

class TotalAmountCustSort(MRJob):
    def steps(self):
        return [ MRStep(mapper = self.map_by, reducer = self.red_by),
                MRStep(mapper = self.map_sort, reducer = self.red_sort) ]
    def map_by(self, _, line):
        (customerid, idno, amount) = line.split(',')
        yield customerid.zfill(2), float(amount)
    def red_by(self, customerid, amount):
        yield customerid, '%04.02f' % sum(amount)
    def map_sort(self, customerid, total):
        yield float(total), customerid
    def red_sort(self, total, customerid):
        yield total, customerid
if __name__ == '__main__':
    TotalAmountCustSort.run()

第2部分有问题,根本不会给我一个结果。任何建议都会被推荐......我尝试研究MRJob.SORT_VALUES = True,但这并没有给我我希望得到的结果。

python sorting mapreduce
1个回答
0
投票

我解决了,输出现在订购了

from mrjob.job import MRJob
from mrjob.step import MRStep

class SpendByCustomerSorted(MRJob):

    MRJob.SORT_VALUES = True

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_orders,
                   reducer=self.reducer_totals_by_customer),
            MRStep(mapper=self.mapper_make_amounts_key,
                   reducer=self.reducer_output_results_for_single_reducer)
        ]
    def mapper_get_orders(self, _, line):
        (customerID, itemID, orderAmount) = line.split(',')
        yield customerID, float(orderAmount)

    def reducer_totals_by_customer(self, customerID, orders):
        yield customerID, sum(orders)

    def mapper_make_amounts_key(self, customerID, orderTotal):
        yield None, ("%07.02f"%float(orderTotal), customerID)

    def reducer_output_results(self, n, orderTotalCustomerIDs):
        for c in orderTotalCustomerIDs:
            yield c[1], c[0]

if __name__ == '__main__':
    SpendByCustomerSorted.run()
© www.soinside.com 2019 - 2024. All rights reserved.