下面就是我试图检查(如完整性检查),有多少订单后的“过滤器”转换处理的样本pyspark代码段。所以,我试图定义一个蓄电池,并用它作为计数器收到“处理订单节数”。
orders=inputpath + "/orders" # Accepting and creating the "full input path" for input file
counter=sc.accumulator(0) #defining accumulator
def OrderTuples(order): #defining a function to incorporate "counter increment" for every records filtered out from filter transformation to the map transformation
counter.add(1)
return (int(order.split(",")[0]),1)
ordersFiltered = sc.textFile(orders). \
filter(lambda order : month in order.split(",")[1]). \
map(lambda order : OrderTuples(order)) # Calling the function here
print(f"NO OF ORDERS PROCESSED:{counter}") # printing the accumulator final value here
但作为最终的输出,我仍然得到零值。我要去的地方错了。我使用的蓄电池首次。 sc.textFile(订单)是具有默认的2个分区,我使用--num执行人2(13节点集群)赞赏这里伸出援助之手:)
ordersFiltered
需要滤波器拉姆达之前执行的动作(如收集)实际上是评价