因此,当我尝试使用DataflowRunner运行数据流并包含如下所示的requirements.txt时>]
google-cloud-storage==1.28.1 pandas==1.0.3 smart-open==2.0.0
每次在此行失败
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://..../beamapp-.../numpy-1.18.2.zip... Traceback (most recent call last): File "Database.py", line 107, in <module> run() File "Database.py", line 101, in run | 'Write CSV' >> beam.ParDo(WriteCSVFIle(options.output_bucket, pandora_options.output_folder)) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in __exit__ self.run().wait_until_finish() File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 483, in run self._options).run(False) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 496, in run return self.runner.run_pipeline(self, self._options) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 548, in run_pipeline self.dataflow_client.create_job(self.job), self) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper return fun(*args, **kwargs) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 624, in create_job self.create_job_description(job) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 680, in create_job_description resources = self._stage_resources(job.options) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 577, in _stage_resources staging_location=google_cloud_options.staging_location) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/portability/stager.py", line 182, in stage_job_resources pkg, FileSystems.join(staging_location, os.path.basename(pkg))) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 942, in stage_artifact local_path_to_artifact, artifact_name) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper return fun(*args, **kwargs) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 564, in _gcs_file_copy self.stage_file(to_folder, to_name, f, total_size=total_size) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 602, in stage_file response = self._storage_client.objects.Insert(request, upload=upload) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert upload=upload, upload_config=upload_config) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod http_request, client=self.client) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload return self.StreamInChunks() File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks additional_headers=additional_headers) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 957, in __StreamMedia response = send_func(self.stream.tell()) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 943, in CallSendChunk start, additional_headers=additional_headers) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1120, in __SendChunk return self.__SendMediaRequest(request, end) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1033, in __SendMediaRequest retries=self.num_retries, check_response_func=CheckResponse) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 356, in MakeRequest max_retry_wait, total_wait_sec)) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 304, in HandleExceptionsAndRebuildHttpConnections raise retry_args.exc File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 346, in MakeRequest check_response_func=check_response_func) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 396, in _MakeRequestNoRetry redirections=redirections, connection_type=connection_type) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request redirections, connection_type) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request redirections, connection_type) File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request cachekey, File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1690, in _request content, httplib2.RedirectMissingLocation: Redirected but the response is missing a Location: header.
这是我正在运行的命令
python Database.py --runner DataflowRunner --project XXX --staging_location gs://.../staging --temp_location gs://.../temp --template_location gs://.../Template --requirements_file requirements.txt
如果删除了--requirements_file requirements.txt,它会完成,但是当我尝试运行该作业时,它会失败,因为找不到包。
这是我的dataflow-requirements-cache文件夹。在清理之前,我有多个文件具有不同的版本,例如
botocore-1.16.16.tar.gz botocore-1.16.17.tar.gz botocore-1.16.18.tar.gz
清理后看起来像这样,(尝试上传numpy时仍然失败)
numpy-1.18.4.zip
urllib3-1.25.9.tar.gz
smart_open-2.0.0.tar.gz
six-1.15.0.tar.gz
setuptools-47.1.0.zip
s3transfer-0.3.3.tar.gz
rsa-4.0.tar.gz
requests-2.23.0.tar.gz
pytz-2020.1.tar.gz
python-dateutil-2.8.1.tar.gz
pyasn1-modules-0.2.8.tar.gz
pyasn1-0.4.8.tar.gz
protobuf-3.12.2.tar.gz
pandas-1.0.3.tar.gz
jmespath-0.10.0.tar.gz
idna-2.9.tar.gz
googleapis-common-protos-1.51.0.tar.gz
google-resumable-media-0.5.0.tar.gz
google-cloud-storage-1.28.1.tar.gz
google-cloud-core-1.3.0.tar.gz
google-auth-1.15.0.tar.gz
google-api-core-1.17.0.tar.gz
docutils-0.15.2.tar.gz
chardet-3.0.4.tar.gz
certifi-2020.4.5.1.tar.gz
cachetools-4.1.0.tar.gz
botocore-1.16.18.tar.gz
boto3-1.13.18.tar.gz
boto-2.49.0.tar.gz
因此,当我尝试使用DataflowRunner运行数据流并包含require.txt时,如下所示:google-cloud-storage == 1.28.1 pandas == 1.0.3 smart-open == 2.0.0每次失败在此...
好,所以无论我尝试什么,都无法使其与需求文件一起使用。所以我尝试了安装文件。所以现在命令看起来像这样