执行 3 Way Join MapReduce Python

问题描述 投票:0回答:1

我将以下表格存储为单独的 csv 文件:

客户(c_id、性别、地址、出生日期)

餐食(r_id,c_id,日期)(因此顾客在餐厅用餐)

餐厅(类型,r_id)

餐厅规模为 10.000,餐食为 1.000.000,顾客人数为 2.000.000

我需要以下地图缩减工作:对于所有餐厅,显示它们出现在“小酒馆”类型且顾客为男性的餐厅的餐食数量。

这将转换为以下 sql 查询:

SELECT r.r_id, COUNT(*) AS count_meals 
FROM restaurants r 
INNER JOIN meals m ON r.r_id = r.r_id 
INNER JOIN customers c ON m.c_id = c.c_id 
WHERE c.gender = 'MALE' AND r.type = 'bistro' 
GROUP BY r.r_id

上述条件会减小表的大小,如下所示: 餐厅 300 家,顾客 900,000 人,餐食保持不变

MapReduce作业通过以下命令启动:

python3 mrCustomers.py < "data/restaurants.csv" "data/customers.csv" "data/meals.csv" > output.csv

要读取文件,我依赖于每个条目的长度,我的映射器如下所示:

from mrjob.job import MRJob
from mrjob.step import MRStep
from mr3px.csvprotocol import CsvProtocol
import csv

class MRCustomers(MRJob):
    OUTPUT_PROTOCOL = CsvProtocol
    def mapper(self, _, line):
        if line.startswith('c_id') or line.startswith('r_id') or line.startswith('type'):
            return
        reader = csv.reader([line])
        columns = next(reader)
        if len(columns) == 4:
            if str(columns[1]) != 'MALE':
                return
            c_id = columns[0]
            yield c_id, "customer"
        elif len(columns) == 3:
            r_id = columns[0]
            c_id = columns[1]
            yield r_id, ("M", c_id)
        else:
            type = columns[0]
            if type == 'bistro':
                r_id = columns[1]
                yield r_id, "restaurant"

我遇到的问题是,在减速器中我总是收到每个表的块,例如在第一轮中,我有 30 位顾客、20,000 顿饭菜,但没有餐馆。如果我没有从每一轮映射器中获得匹配集,我就不可能执行连接。我的映射器逻辑有缺陷吗?另外我应该如何编写减速器?

理论上,在减速器中我应该获得具有相同键的聚合元组,因此

yield r_id, "restaurant"
yield r_id, ("M", c_id)
应该在减速器中作为一场比赛进行聚合和接收,但这不会发生

编辑:所以我在第一个工作步骤中成功地将

meals
restaurants
在减速器端连接在一起,如下所示:

    def reducer1(self, key, values):
        joins = [x for x in values]
        if len(joins) > 1:
            if joins[0][0] == "restaurants":
                for tup in joins[1:]:
                    c_id = tup[1]
                    yield c_id, (tup[0], key)
        elif joins[0][0] == "customer":
            for customer in joins:
                yield key, ("customer", key)

然后在第二个工作步骤中,我尝试将结果聚合到映射器中,然后将其传递给减速器并加入

customers

    def mapper2(self, key, value):
        yield key, value

    def reducer2(self, key, values):
        joins = [x for x in values]
        if len(joins) > 1:
            if joins[0][0] == "customer":
                for tup in joins[1:]:
                    yield tup[1], 1

    def reducer3(self, key, values):
        yield None, (key, sum(values))

    def steps(self):
        first_step = MRStep(
            mapper=self.mapper1,
            reducer=self.reducer1,
        )
        second_step = MRStep(
            mapper=self.mapper2,
            reducer=self.reducer2,
        )
        third_step = MRStep(
            reducer=self.reducer3
        )

        return [first_step, second_step, third_step]

现在我遇到以下问题:输出中生成的元组是 SQL 查询的完整结果的一部分,因此这是一个不完整的解决方案。我不明白为什么连接没有按预期工作

python sql shell mapreduce mrjob
1个回答
0
投票

这种方法对于我来说是“纯金”,让我能够理解在 MapReduce 中连接三个表来完成我必须做的非常相似的任务。我希望这也能帮助其他人!! 注意:还记录/打印中间结果(在下面的代码中进行了注释),然后停止内核,这对我充分理解 MapReduce 作业的进程有很大帮助。

原来的任务

使用三个数据集作为输入和以下输出编写一个 MapReduce 作业: 对于所有带有
fashion_news_frequency = 'Regularly'
的客户,显示他们在文章中带有

graphical_appearance_name == 'Solid'
且 color_group_name 等于“浅米色”的交易数量。
确保最终输出采用以下格式:

customer_id,count_transactions

我的解决方案

用 Jupyter 笔记本编写:
%%file mymrjob3.py # This will create a local file to run your MapReduce program from mrjob.job import MRJob from mrjob.step import MRStep from mrjob.util import log_to_stream, log_to_null from mr3px.csvprotocol import CsvProtocol import csv import logging from collections import defaultdict class MyMRJob3(MRJob): OUTPUT_PROTOCOL = CsvProtocol # write output as CSV def mapper(self, _, line): # Process each line in transactions.csv data = line.strip().split(',') # 1. There are 25 column names for articles.csv # 2. There are 5 column names for transactions.csv: t_dat,customer_id,article_id,price,sales_channel_id # 3. There are 7 column names for customers.csv: customer_id,FN,Active,club_member_status,fashion_news_frequency,age,postal_code if len(data) == 5: customer_id = data[1] articles_id = data[2] yield articles_id, ("transaction", customer_id) elif len(data) == 7: if data[4] == 'Regularly': customer_id = data[0] yield customer_id, ("customer",2) else: if data[7] == 'Solid' and data[9] == 'Light Beige': articles_id = data[0] yield articles_id, ("articles",1) def reducer(self, key, value): logger = logging.getLogger('__main__') # First all values are collected within the same key joins = [x for x in value] # If there is more than one tuple, there must be matching article_ids if len(joins) > 1: # We only care about the matching article_ids from the articles.csv # This if-statement assures that no duplicate article_ids from the transactions.csv are taken if joins[0][0] == "articles": #logger.info(joins) for tup in joins[1:]: #logger.info(tup) customer_id = tup[1] # Yield the matching number of transactions, remove the first one, which is from the articles.csv yield customer_id, (tup[0], len(joins) - 1) elif joins[0][0] == "customer": yield key, ("customer", key) def final_reducer(self, key, value): logger = logging.getLogger('__main__') customer_dict = defaultdict(int) joins = [x for x in value] # Same as above if len(joins) > 1: if joins[0][0] == "customer": #logger.info(joins[1:]) for transaction, count in joins[1:]: customer_dict[key] += count yield None, (key, customer_dict[key]) def steps(self): first_step = MRStep( mapper=self.mapper, reducer=self.reducer ) second_step = MRStep( reducer=self.final_reducer ) #return [first_step] return [first_step, second_step] if __name__ == '__main__': MyMRJob3.run()

之后我只需执行

! python3 mymrjob3.py path/to/articles.csv path/to/transactions.csv path/to/customers.csv

© www.soinside.com 2019 - 2024. All rights reserved.