这是计算平均持有成本的情况。我们只考虑增加帐户余额的交易,而不考虑减少帐户余额的交易。
# data example: ((1,'000001'),('A',0,5000,5000)),
# (1,'000001') is the groupby key , 'A' is order by key (serialno) , '0' is the account balance
# before the trade, '5000' is trade balance, '5000' is the account balance aftre the trade. We aim
# to calculate #the average cost per unit after the trades in each group by spark rdd.
confirm = [
((1, '000001'), ('A', 0, 5000, 5000)),
((1, '000001'), ('C', 9000, 1000, 10000)),
((1, '000001'), ('B', 5000, 5000, 9000)),
((2, '000001'), ('D', 0, 3300, 3000)),
((2, '000001'), ('F', 4000, 5000, 10000)),
((2, '000001'), ('E', 3000, 4200, 6000)),
((3, '000001'), ('G', 0, 3300, 3000)),
((3, '000001'), ('H', 3000, 3300, 6300))
]
def my_partition(x):
return x[0] % 3
def partSort(x):
xlist = list(x)
a = sorted(xlist, key=lambda x: x[1][0])
return iter(a)
import pandas as pd
def udf_func(x, y):
if y is None:
result = x[2] / x[3]
df = pd.DataFrame([{'serialno': x[0], 'result': result}])
else:
result = (
(x if isinstance(x, float) else (x[2] / x[3])) * y[1] + y[2]
) / y[3]
df = pd.DataFrame([{'serialno': y[0], 'result': result}])
# this is where I want to store the intermediate result,but does not work eg:
df.to_csv("/home/zo_om/result.csv", 'a')
return result
rdd = sc.parallelize(confirm).partitionBy(3, my_partition). \
mapPartitions(partSort).reduceByKey(udf_func)
rdd.collect()
运行代码后,结果是:
[
((3, '000001'), 1.0476190476190477),
((2, '000001'), 1.0),
((1, '000001'), 1.1)
]
这是每个组的最后结果。
我只能在“ /home/zo_om/result.csv”中看到1行(仅在spark集群的一个工作节点中,zo_om是kerberos用户)。我希望看到的是8行(每个序列号('A'〜'H')一个))>
这是计算平均持有成本的情况。我们只考虑增加帐户余额的交易,而不考虑减少帐户余额的交易。 #数据示例:((1,'...
我想您只会看到一行,因为pd.DataFrame.to_csv
会覆盖现有数据,并且一直都写入同一路径