从CSV文件读取并上传到Google Data Store Python

问题描述 投票:0回答:1

我的要求是从csv文件中读取数据以及标头,并使用带有Dataflow的Python在Google Dat a Store中创建相同的结构。我曾尝试创建如下示例代码。

下面是我的示例CSV,

First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"

我的pyhton 2.7代码段如下

import csv
import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper

class CSVtoDict(beam.DoFn):
    """Converts line into dictionary"""
    def process(self, element, headers):
        rec = ""
        element = element.encode('utf-8')
        try:
            for line in csv.reader([element]):
                rec = line

            if len(rec) == len(headers):
                data = {header.strip(): val.strip() for header, val in zip(headers, rec)}
                return [data]
            else:
                print "bad: {}".format(rec)
        except Exception:
            pass


class CreateEntities(beam.DoFn):
    """Creates Datastore entity"""
    def process(self, element):
        entity = entity_pb2.Entity()
        sku = int(element.pop('sku'))
        element['LastName'] = float(element['LastName'])
        element['FirstName'] = unicode(element['FirstName'].decode('utf-8'))
        element['DateOfBirth'] = unicode(element['DateOfBirth'].decode('utf-8'))

        datastore_helper.add_key_path(entity.key, 'Actor', sku)
        datastore_helper.add_properties(entity, element)
        return [entity]


class ProcessOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--input',
                dest='input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

def dataflow(argv=None):
    process_options = PipelineOptions().view_as(ProcessOptions)
    p = beam.Pipeline(options=process_options)

    (p 
    | 'Reading input file' >> beam.io.ReadFromText(process_options.input)
    | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(),
                                                       ['sku', 'FirstName', 'LastName', 'DateOfBirth'])
    | 'Create entities' >> beam.ParDo(CreateEntities())
    | 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
     )    
    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    dataflow()

我可以使用数据流加载实体,但是我想解析CSV文件从标题然后行],而不是硬编码类CreateEntities中的值并将其写入数据存储实体中。

基本上将同一行CSV文件作为输入输入到数据流作业中。有人可以帮忙吗?

Required Output in Data Store for Key Actor:

First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"

感谢,GS

我的要求是从csv文件中读取数据以及标头,并使用带有Dataflow的Python在Google Dat a Store中创建相同的结构。我曾尝试如下创建示例代码。我的样品...

python csv google-cloud-datastore google-cloud-dataflow
1个回答
0
投票

[Apache Beam通过将读取的文件分散给许多工作人员来并行化数据处理,这意味着大多数工作人员根本不会读取标头行。

© www.soinside.com 2019 - 2024. All rights reserved.