很新的数据流,我一直在寻找天为一个解决我的问题。我需要运行一个事故是由以下格式的CSV文件读取日期:2019010420300033,通过它通过不同的流量和在BigQuery中的时间戳结束。有没有办法先做到这一点或输入文件必须转换为换股日期(我知道这样的格式作品:2019年1月1日20:30:00.331)。或者说,是有可能有数据流输出以某种方式与该日期一新的管道转换?
谢谢
这是数据流一件容易的事。您可以使用一个帕尔多或地图。
在从CSV每行下面的例子将被传递到MAP(convertDate)。功能convertDate,你需要修改,以适应您的日期转换,然后返回修改后的行。然后将整个转换CSV被写入输出文件集。
实施例(简化的),使用地图:
def convertDate(line):
# convert date to desired format
# Split line into columns, change date format for desired column
# Rejoin columns into line and return
cols = line.split(',') # change for your column seperator
cols[2] = my_change_method_for_date(cols[2]) # code the date conversion here
return ",".join(cols)
with beam.Pipeline(argv=pipeline_args) as p:
lines = p | 'ReadCsvFile' >> beam.io.ReadFromText(args.input)
lines = lines | 'ConvertDate' >> beam.Map(convertDate)
lines | 'WriteCsvFile' >> beam.io.WriteToText(args.output)