我的要求是从csv文件中读取数据以及标头,并使用带有Dataflow的Python在Google Dat a Store中创建相同的结构。我曾尝试创建如下示例代码。
下面是我的示例CSV,
First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"
我的pyhton 2.7代码段如下
import csv
import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
class CSVtoDict(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element, headers):
rec = ""
element = element.encode('utf-8')
try:
for line in csv.reader([element]):
rec = line
if len(rec) == len(headers):
data = {header.strip(): val.strip() for header, val in zip(headers, rec)}
return [data]
else:
print "bad: {}".format(rec)
except Exception:
pass
class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""
def process(self, element):
entity = entity_pb2.Entity()
sku = int(element.pop('sku'))
element['LastName'] = float(element['LastName'])
element['FirstName'] = unicode(element['FirstName'].decode('utf-8'))
element['DateOfBirth'] = unicode(element['DateOfBirth'].decode('utf-8'))
datastore_helper.add_key_path(entity.key, 'Actor', sku)
datastore_helper.add_properties(entity, element)
return [entity]
class ProcessOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
dest='input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
def dataflow(argv=None):
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
(p
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(),
['sku', 'FirstName', 'LastName', 'DateOfBirth'])
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
dataflow()
我可以使用数据流加载实体,但是我想解析CSV文件从标题然后行],而不是硬编码类CreateEntities中的值并将其写入数据存储实体中。
基本上将同一行CSV文件作为输入输入到数据流作业中。有人可以帮忙吗?
Required Output in Data Store for Key Actor: First Name Last Name Date of Birth Tom,Cruise "July 3, 1962" Bruce,Willis "March 19, 1955" Morgan,Freeman "June 1, 1937" John,Wayne "May 26, 1907"
感谢,GS
我的要求是从csv文件中读取数据以及标头,并使用带有Dataflow的Python在Google Dat a Store中创建相同的结构。我曾尝试如下创建示例代码。我的样品...
[Apache Beam通过将读取的文件分散给许多工作人员来并行化数据处理,这意味着大多数工作人员根本不会读取标头行。