我正在读取 .csv,我拆分并仅获取最后一个值。
我将字符串转换为浮点数
我比较是否小于 15,并附加到 lessthanList 值,
否则,追加到 morethanList
import apache_beam as beam
from apache_beam.io import ReadFromText
from google.colab import drive
drive.mount('/content/drive')
orders = '/content/drive/MyDrive/ColabNotebooks/whatever/taxiorders.csv'
class tellingApart(beam.DoFn):
def process(self, element):
element = float(element.split(",")[-1])
lessthanlist = []
morethan_list = []
if element < 15:
lessthanlist.append(element)
else:
morethan_list.append(element)
lessthankey = len(lessthanlist)
lessthanvalue = sum(lessthanlist)
morethankey = len(morethan_list)
morethanvalue = sum(morethan_list)
return lessthankey,lessthanvalue,morethankey,morethanvalue
with beam.Pipeline() as p:
(p
| ReadFromText(orders, skip_header_lines=2)
| beam.ParDo(tellingApart())
| beam.Map(print))
样本文件
我原本期望只得到 K,V 我搞砸了,感谢任何帮助。
这是你想要的吗?
import apache_beam as beam
from apache_beam.io import ReadFromText
class tellingApart(beam.DoFn):
def process(self, element):
element = float(element.split(",")[-1])
lessthanlist = []
morethan_list = []
if element < 15:
lessthanlist.append(element)
else:
morethan_list.append(element)
lessthankey = len(lessthanlist)
lessthanvalue = sum(lessthanlist)
morethankey = len(morethan_list)
morethanvalue = sum(morethan_list)
return [[lessthankey, lessthanvalue, morethankey, morethanvalue]]
with beam.Pipeline() as p:
(
p
| beam.Create(["test,10", "test,20"])
| beam.ParDo(tellingApart())
| beam.Map(print)
)
输出:
[1, 10.0, 0, 0]
[0, 0, 1, 20.0]