我正在尝试按照本教程在 GCP 上用 python 运行 beam 脚本:
[https://levelup.gitconnected.com/scaling-scikit-learn-with-apache-beam-251eb6fcf75b][1]
但我不断收到以下错误:
AttributeError: module 'google.cloud' has no attribute 'storage'
我的 requirements.txt 中有 google-cloud-storage 所以真的不确定我在这里遗漏了什么。
我的完整脚本:
import apache_beam as beam
import json
query = """
SELECT
year,
plurality,
apgar_5min,
mother_age,
father_age,
gestation_weeks,
ever_born,
case when mother_married = true then 1 else 0 end as mother_married,
weight_pounds as weight,
current_timestamp as time,
GENERATE_UUID() as guid
FROM `bigquery-public-data.samples.natality`
order by rand()
limit 100
"""
class ApplyDoFn(beam.DoFn):
def __init__(self):
self._model = None
from google.cloud import storage
import pandas as pd
import pickle as pkl
self._storage = storage
self._pkl = pkl
self._pd = pd
def process(self, element):
if self._model is None:
bucket = self._storage.Client().get_bucket('bqr_dump')
blob = bucket.get_blob('natality/sklearn-linear')
self._model = self._pkl.loads(blob.download_as_string())
new_x = self._pd.DataFrame.from_dict(element,
orient='index').transpose().fillna(0)
pred_weight = self._model.predict(new_x.iloc[:, 1:8])[0]
return [ {'guid': element['guid'],
'predicted_weight': pred_weight,
'time': str(element['time'])}]
# set up pipeline options
options = {'project': my-project-name,
'runner': 'DataflowRunner',
'temp_location': 'gs://bqr_dump/tmp',
'staging_location': 'gs://bqr_dump/tmp'
}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(
query=query,
use_standard_sql=True))
| 'Apply Model' >> beam.ParDo(ApplyDoFn())
| 'Save to BigQuery' >> beam.io.WriteToBigQuery(
'pzn-pi-sto:beam_test.weight_preds',
schema='guid:STRING,weight:FLOAT64,time:STRING',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
和我的要求.txt:
google-cloud==0.34.0
google-cloud-storage==1.30.0
apache-beam[GCP]==2.20.0
确保你安装了正确的版本。因为谷歌维护的模块会不断更新。如果您只是为所需的软件包提供 pip install,它将直接安装最新版本的软件包。