如何将中间结果存储在pyspark reduceByKey函数中?

问题描述 投票:0回答:1

这是计算平均持有成本的情况。我们只考虑增加帐户余额的交易,而不考虑减少帐户余额的交易。

# data example: ((1,'000001'),('A',0,5000,5000)),
# (1,'000001') is the groupby key , 'A' is order by key (serialno) , '0' is the account balance
# before the trade, '5000' is trade balance, '5000' is the account balance aftre the trade. We aim
# to calculate #the average cost per unit after the trades in each group by spark rdd.

confirm = [
    ((1, '000001'), ('A', 0, 5000, 5000)),
    ((1, '000001'), ('C', 9000, 1000, 10000)),
    ((1, '000001'), ('B', 5000, 5000, 9000)),
    ((2, '000001'), ('D', 0, 3300, 3000)),
    ((2, '000001'), ('F', 4000, 5000, 10000)),
    ((2, '000001'), ('E', 3000, 4200, 6000)),
    ((3, '000001'), ('G', 0, 3300, 3000)),
    ((3, '000001'), ('H', 3000, 3300, 6300))
]


def my_partition(x):
    return x[0] % 3


def partSort(x):
    xlist = list(x)
    a = sorted(xlist, key=lambda x: x[1][0])
    return iter(a)


import pandas as pd


def udf_func(x, y):
    if y is None:

        result = x[2] / x[3]
        df = pd.DataFrame([{'serialno': x[0], 'result': result}])
    else:
        result = (
                         (x if isinstance(x, float) else (x[2] / x[3])) * y[1] + y[2]
                 ) / y[3]

        df = pd.DataFrame([{'serialno': y[0], 'result': result}])
    # this is where I want to store the intermediate result,but does not work eg:
    df.to_csv("/home/zo_om/result.csv", 'a')
    return result


rdd = sc.parallelize(confirm).partitionBy(3, my_partition). \
    mapPartitions(partSort).reduceByKey(udf_func)
rdd.collect()

运行代码后,结果是:

[
 ((3, '000001'), 1.0476190476190477),
 ((2, '000001'), 1.0),
 ((1, '000001'), 1.1)
]

这是每个组的最后结果。

我只能在“ /home/zo_om/result.csv”中看到1行(仅在spark集群的一个工作节点中,zo_om是kerberos用户)。我希望看到的是8行(每个序列号('A'〜'H')一个))>

这是计算平均持有成本的情况。我们只考虑增加帐户余额的交易,而不考虑减少帐户余额的交易。 #数据示例:((1,'...

python pyspark rdd
1个回答
0
投票

我想您只会看到一行,因为pd.DataFrame.to_csv会覆盖现有数据,并且一直都写入同一路径

© www.soinside.com 2019 - 2024. All rights reserved.