我一直在尝试在 Google Cloud Dataflow 上部署管道。到目前为止,这是一个相当大的挑战。 我面临导入问题,因为我意识到 ParDo 函数需要requirements.txt 存在,如果不存在,它会说找不到所需的模块。 https://beam.apache.org/documentation/sdks/python-pipeline-dependency/
因此我尝试通过传入requirements.txt文件来解决问题,结果却遇到了非常难以理解的错误消息。
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from google.cloud.bigtable.row import DirectRow
import datetime
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://tunnel-insight-2-0-dev-291100/dataflow'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location
ib.options.recording_duration = '1m'
...
...
pipeline_result = DataflowRunner().run_pipeline(p, options=options)
我尝试使用“options.view_as(pipeline_options.SetupOptions).requirements_file =“requirements.txt””来传递要求
我收到此错误
---------------------------------------------------------------------------
CalledProcessError Traceback (most recent call last)
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
90 try:
---> 91 out = subprocess.check_output(*args, **kwargs)
92 except OSError:
/opt/conda/lib/python3.7/subprocess.py in check_output(timeout, *popenargs, **kwargs)
410 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
--> 411 **kwargs).stdout
412
/opt/conda/lib/python3.7/subprocess.py in run(input, capture_output, timeout, check, *popenargs, **kwargs)
511 raise CalledProcessError(retcode, process.args,
--> 512 output=stdout, stderr=stderr)
513 return CompletedProcess(process.args, retcode, stdout, stderr)
CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-12-f018e5c84d08> in <module>
----> 1 pipeline_result = DataflowRunner().run_pipeline(p, options=options)
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options)
491 environments.DockerEnvironment.from_container_image(
492 apiclient.get_container_image_from_options(options),
--> 493 artifacts=environments.python_sdk_dependencies(options)))
494
495 # This has to be performed before pipeline proto is constructed to make sure
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/environments.py in python_sdk_dependencies(options, tmp_dir)
624 options,
625 tmp_dir,
--> 626 skip_prestaged_dependencies=skip_prestaged_dependencies))
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in create_job_resources(options, temp_dir, build_setup_args, populate_requirements_cache, skip_prestaged_dependencies)
178 populate_requirements_cache if populate_requirements_cache else
179 Stager._populate_requirements_cache)(
--> 180 setup_options.requirements_file, requirements_cache_path)
181 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
182 resources.append((pkg, os.path.basename(pkg)))
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
234 while True:
235 try:
--> 236 return fun(*args, **kwargs)
237 except Exception as exn: # pylint: disable=broad-except
238 if not retry_filter(exn):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir)
569 ]
570 _LOGGER.info('Executing command: %s', cmd_args)
--> 571 processes.check_output(cmd_args, stderr=processes.STDOUT)
572
573 @staticmethod
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
97 "Full traceback: {} \n Pip install failed for package: {} \
98 \n Output from execution of subprocess: {}" \
---> 99 .format(traceback.format_exc(), args[0][6], error.output))
100 else:
101 raise RuntimeError("Full trace: {}, \
RuntimeError: Full traceback: Traceback (most recent call last):
File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py", line 91, in check_output
out = subprocess.check_output(*args, **kwargs)
File "/opt/conda/lib/python3.7/subprocess.py", line 411, in check_output
**kwargs).stdout
File "/opt/conda/lib/python3.7/subprocess.py", line 512, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
Pip install failed for package: -r
Output from execution of subprocess: b'Obtaining file:///root/apache-beam-custom/packages/beam/sdks/python (from -r requirements.txt (line 3))\n Saved /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zip\nCollecting absl-py==0.11.0\n Downloading absl-py-0.11.0.tar.gz (110 kB)\n Saved /tmp/dataflow-requirements-cache/absl-py-0.11.0.tar.gz\nCollecting argon2-cffi==20.1.0\n Downloading argon2-cffi-20.1.0.tar.gz (1.8 MB)\n Installing build dependencies: started\n Installing build dependencies: finished with status \'error\'\n ERROR: Command errored out with exit status 1:\n command: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\'\n cwd: None\n Complete output (85 lines):\n Collecting setuptools>=40.6.0\n Downloading setuptools-51.1.1.tar.gz (2.1 MB)\n Collecting wheel\n Downloading wheel-0.36.2.tar.gz (65 kB)\n Collecting cffi>=1.0\n Downloading cffi-1.14.4.tar.gz (471 kB)\n Collecting pycparser\n Downloading pycparser-2.20.tar.gz (161 kB)\n Skipping wheel build for setuptools, due to binaries being disabled for it.\n Skipping wheel build for wheel, due to binaries being disabled for it.\n Skipping wheel build for cffi, due to binaries being disabled for it.\n Skipping wheel build for pycparser, due to binaries being disabled for it.\n Installing collected packages: setuptools, wheel, pycparser, cffi\n Running setup.py install for setuptools: started\n Running setup.py install for setuptools: finished with status \'done\'\n Running setup.py install for wheel: started\n Running setup.py install for wheel: finished with status \'done\'\n Running setup.py install for pycparser: started\n Running setup.py install for pycparser: finished with status \'done\'\n Running setup.py install for cffi: started\n Running setup.py install for cffi: finished with status \'error\'\n ERROR: Command errored out with exit status 1:\n command: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi\n cwd: /tmp/pip-install-6zs5jguv/cffi/\n Complete output (56 lines):\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n running install\n running build\n running build_py\n creating build\n creating build/lib.linux-x86_64-3.7\n creating build/lib.linux-x86_64-3.7/cffi\n copying cffi/setuptools_ext.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/pkgconfig.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/verifier.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/vengine_gen.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/backend_ctypes.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/__init__.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/cffi_opcode.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/error.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/api.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/commontypes.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/ffiplatform.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/lock.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/cparser.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/recompiler.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/vengine_cpy.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/model.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/_cffi_include.h -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/parse_c_type.h -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/_embedding.h -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/_cffi_errors.h -> build/lib.linux-x86_64-3.7/cffi\n running build_ext\n building \'_cffi_backend\' extension\n creating build/temp.linux-x86_64-3.7\n creating build/temp.linux-x86_64-3.7/c\n gcc -pthread -B /opt/conda/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -fPIC -DUSE__THREAD -DHAVE_SYNC_SYNCHRONIZE -I/usr/include/ffi -I/usr/include/libffi -I/root/apache-beam-custom/include -I/opt/conda/include/python3.7m -c c/_cffi_backend.c -o build/temp.linux-x86_64-3.7/c/_cffi_backend.o\n c/_cffi_backend.c:15:10: fatal error: ffi.h: No such file or directory\n #include <ffi.h>\n ^~~~~~~\n compilation terminated.\n error: command \'gcc\' failed with exit status 1\n ----------------------------------------\n ERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi Check the logs for full command output.\n WARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\n You should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n ----------------------------------------\nERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\' Check the logs for full command output.\nWARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\nYou should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n'
我做错了什么吗?
-------------- 编辑-------------------------------- ------
好的,我的管道已经可以工作了,但我的requirements.txt 文件仍然存在问题,我相信我正确传递了该文件。
我的管道代码:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from google.cloud.bigtable.row import DirectRow
import datetime
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = ''
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location
ib.options.recording_duration = '1m'
# The Google Cloud PubSub topic for this example.
topic = ""
subscription = ""
output_topic = ""
# Info
project_id = ""
bigtable_instance = ""
bigtable_table_id = ""
class CreateRowFn(beam.DoFn):
def process(self,words):
from google.cloud.bigtable.row import DirectRow
import datetime
direct_row = DirectRow(row_key="phone#4c410523#20190501")
direct_row.set_cell(
"stats_summary",
b"os_build",
b"android",
datetime.datetime.now())
return [direct_row]
p = beam.Pipeline(InteractiveRunner(),options=options)
words = p | "read" >> beam.io.ReadFromPubSub(subscription=subscription)
windowed_words = (words | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
# Writing to BigTable
test = words | beam.ParDo(CreateRowFn()) | WriteToBigTable(
project_id=project_id,
instance_id=bigtable_instance,
table_id=bigtable_table_id)
pipeline_result = DataflowRunner().run_pipeline(p, options=options)
正如您在“CreateRowFn”中看到的,我需要导入
从 google.cloud.bigtable.row 导入 DirectRow
导入日期时间
只有这样才有效。
我已将 requests.txt 作为 options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt" 传递,我在 Dataflow 控制台上看到它。
如果删除导入语句,我会收到“进程中名称错误:名称'DirectRow'未定义”。
有什么办法可以克服这个问题吗?
我在常见问题解答中找到了答案。我的错误不是如何传入requirements.txt而是如何处理NameErrors
https://cloud.google.com/dataflow/docs/resources/faq
如何处理名称错误? 如果您在使用 Dataflow 服务执行管道时遇到 NameError,但在本地执行(即使用 DirectRunner)时未遇到 NameError,则您的 DoFns 可能正在使用全局命名空间中的值,而这些值在 Dataflow Worker 上不可用。
默认情况下,在 Dataflow 作业序列化期间不会保存主会话中定义的全局导入、函数和变量。例如,如果您的 DoFns 在主文件中定义并引用全局命名空间中的导入和函数,则可以将 --save_main_session 管道选项设置为 True。这将导致全局命名空间的状态被腌制并加载到数据流工作线程上。
请注意,如果全局命名空间中有无法进行 pickle 的对象,您将收到 pickle 错误。如果错误与 Python 发行版中应该可用的模块有关,您可以通过在使用该模块的本地导入该模块来解决此问题。
例如,代替:
import re
…
def myfunc():
# use re module
用途:
def myfunc():
import re
# use re module
或者,如果您的 DoFns 跨越多个文件,您应该使用不同的方法来打包工作流程和管理依赖项。
所以结论是: 在函数中使用 import 语句是可以的
Google Dataflow 工作人员已安装这些软件包:https://cloud.google.com/dataflow/docs/concepts/sdk-worker-dependency。
您还可以在运行时将 --requirements_file path://requirements.txt 作为命令中的标志传递。
我更喜欢使用 --setup_file path://setup.py 标志。安装文件的格式如下
import setuptools
REQUIRED_PACKAGES = [
'joblib==0.15.1',
'numpy==1.18.5',
'google',
'google-cloud',
'google-cloud-storage',
'cassandra-driver==3.22.0'
]
PACKAGE_NAME = 'my_package'
PACKAGE_VERSION = '0.0.1'
setuptools.setup(
name=PACKAGE_NAME,
version=PACKAGE_VERSION,
description='Searh Rank project',
install_requires=REQUIRED_PACKAGES,
author="Mohd Faisal",
packages=setuptools.find_packages()
)
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
PipelineOptions,
SetupOptions,
StandardOptions,
WorkerOptions)
from datetime import date
class Userprocess(beam.DoFn):
def process(self, msg):
yield "OK"
def run(argv=None):
logging.info("Parsing dataflow flags... ")
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
parser = argparse.ArgumentParser()
parser.add_argument(
'--project',
required=True,
help=(
'project id staging or production '))
parser.add_argument(
'--temp_location',
required=True,
help=(
'temp location'))
parser.add_argument(
'--job_name',
required=True,
help=(
'job name'))
known_args, pipeline_args = parser.parse_known_args(argv)
today = date.today()
logging.info("Processing Date is " + str(today))
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = known_args.project
google_cloud_options.job_name = known_args.job_name
google_cloud_options.temp_location = known_args.temp_location
# pipeline_options.view_as(StandardOptions).runner = known_args.runner
with beam.Pipeline(argv=pipeline_args, options=pipeline_options) as p:
beam.ParDo(Userprocess())
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
logging.info("Starting dataflow daily pipeline ")
try:
run()
except:
pass