如何将单个csv文件转换为用于Apache光束的多个pcollection

问题描述 投票:0回答:1

我有一个csv文件,其中前几行是ID和标签,其余几行是实际数据。用map函数共享前几对线以转换后几行与实际数据的最佳方法是什么?总体而言,我正在执行类似于this question的操作,但我不仅在顶部放置了标签,而且还添加了另一排ID。

数据看起来像这样:

-- ,id1 , id1 , id1 , id2 , id2 , id2
-- ,label,label,label,label,label,label
time1,data, data, data, data, data, data
time2,data, data, data, data, data, data

然后为每个我想将ID /时间/数据对象记录写入bigquery的唯一ID。

[基本上,我假设我需要执行一个中间管线步骤,该步骤将文件转换为多个pcollection,在此,我可以让下一步实际上根据顶部行的值转换所有文件行。如果是这种情况,最好的方法是什么?如果没有,我还能提供其他前几行的前几行到map函数的值吗?

python csv google-cloud-dataflow apache-beam
1个回答
0
投票

一种可能的解决方案是修改上一个问题中的自定义来源。否则,您可以对数据进行初始传递,以将标头保存为主要处理步骤的侧面输入:

input = p | 'Read CSV file' >> ReadFromText("input.csv")
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))

其中ParseHeadersFn检查该行是否以--开头是否有资格作为标头,如果为true,则丢弃该第一个字段,因为它是不需要的:

class ParseHeadersFn(beam.DoFn):
    """ParDo to output only the headers"""
    def process(self, element):
        if '--' in element.split(',')[0]:
          yield [','.join(element.split(',')[1:])]

然后,在ParseRowsFn中,我们可以访问headers侧输入:

class ParseRowsFn(beam.DoFn):
    """ParDo to process data rows according to header metadata"""
    def process(self, element, headers):
      if 'time1' in element.split(',')[0]:
        for id in headers[0]:
          print 'ids: ' + id
        for label in headers[1]:
          print 'labels: ' + label

请注意,我假设id行将在标签一之前,但是由于Dataflow是分布式系统,因此可能不正确。最好进行更强大的检查。

如果我们的input.csv是:

--,id1,id1,id1,id2,id2,id2
--,label1,label2,label3,label1,label2,label3
time1,data1,data2,data3,data4,data5,data6
time2,data7,data8,data9,data10,data11,data12

示例输出:

ids: id1 , id1 , id1 , id2 , id2 , id2
labels: label1,label2,label3,label1,label2,label3

使用的代码:此gist中的script.py

ParseRowsFn可以用dict(zip(...))修改以获得所需的输出,但是我不确定我是否了解输出结构。您需要这样的东西吗?

id1,time1,data1,data2,data3
id1,time2,data7,data8,data9
id2,time1,data4,data5,data6
id2,time2,data10,data11,data12

如果是这种情况,我们可以使用此answer中的技巧来确定ID的更改位置并采取相应措施:

class ParseRowsFn(beam.DoFn):
    """ParDo to process data rows according to header metadata"""
    def process(self, element, headers):
      # changing ids as per https://stackoverflow.com/a/28242076/6121516
      fields = element.split(',')

      if '--' not in fields[0]:
        ids = headers[0][0].split(',')
        labels = headers[1][0].split(',')
        id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]]
        id_changes.append(len(ids))

        for idx, change in enumerate(id_changes):
          row = {'timestamp': fields[0], 'id': ids[change - 1]}
          low = max(idx - 1, 0)
          row.update(dict(zip(labels[low:change], fields[low+1:change+1])))
          print row
          yield [row]

示例输出:

{'timestamp': u'time1', u'label2': u'data2', u'label3': u'data3', 'id': u'id1', u'label1': u'data1'}
{'timestamp': u'time1', u'label2': u'data5', u'label3': u'data6', 'id': u'id2', u'label1': u'data4'}
{'timestamp': u'time2', u'label2': u'data8', u'label3': u'data9', 'id': u'id1', u'label1': u'data7'}
{'timestamp': u'time2', u'label2': u'data11', u'label3': u'data12', 'id': u'id2', u'label1': u'data10'}

使用的代码:同一gist中的output.py

© www.soinside.com 2019 - 2024. All rights reserved.