ApacheBeam python 统计有多少订单低于 15,有多少订单以上或等于,k = count v = sum

问题描述 投票:0回答:1

我正在读取 .csv,我拆分并仅获取最后一个值。
我将字符串转换为浮点数
我比较是否小于 15,并附加到 lessthanList 值,
否则,追加到 morethanList

import apache_beam as beam
from apache_beam.io import ReadFromText

from google.colab import drive
drive.mount('/content/drive')

orders = '/content/drive/MyDrive/ColabNotebooks/whatever/taxiorders.csv'


class tellingApart(beam.DoFn):
  def process(self, element):
    element = float(element.split(",")[-1])
    lessthanlist = []
    morethan_list = []
    if element < 15:
      lessthanlist.append(element)
    else:
      morethan_list.append(element)
    lessthankey = len(lessthanlist)
    lessthanvalue = sum(lessthanlist)
    morethankey = len(morethan_list)
    morethanvalue = sum(morethan_list)
    return lessthankey,lessthanvalue,morethankey,morethanvalue


with beam.Pipeline() as p:
  (p
   | ReadFromText(orders, skip_header_lines=2)
   | beam.ParDo(tellingApart())
   | beam.Map(print))

样本文件

我原本期望只得到 K,V 我搞砸了,感谢任何帮助。

python apache-beam
1个回答
0
投票

这是你想要的吗?

import apache_beam as beam
from apache_beam.io import ReadFromText


class tellingApart(beam.DoFn):
    def process(self, element):
        element = float(element.split(",")[-1])
        lessthanlist = []
        morethan_list = []
        if element < 15:
            lessthanlist.append(element)
        else:
            morethan_list.append(element)
        lessthankey = len(lessthanlist)
        lessthanvalue = sum(lessthanlist)
        morethankey = len(morethan_list)
        morethanvalue = sum(morethan_list)
        return [[lessthankey, lessthanvalue, morethankey, morethanvalue]]


with beam.Pipeline() as p:
    (
        p
        | beam.Create(["test,10", "test,20"])
        | beam.ParDo(tellingApart())
        | beam.Map(print)
    )

输出:

[1, 10.0, 0, 0]
[0, 0, 1, 20.0]
© www.soinside.com 2019 - 2024. All rights reserved.