无法传入Dataflow的Requirements.txt

问题描述 投票:0回答:2

我一直在尝试在 Google Cloud Dataflow 上部署管道。到目前为止,这是一个相当大的挑战。 我面临导入问题,因为我意识到 ParDo 函数需要requirements.txt 存在,如果不存在,它会说找不到所需的模块。 https://beam.apache.org/documentation/sdks/python-pipeline-dependency/

因此我尝试通过传入requirements.txt文件来解决问题,结果却遇到了非常难以理解的错误消息。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

from google.cloud.bigtable.row import DirectRow
import datetime

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://tunnel-insight-2-0-dev-291100/dataflow'

# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location

ib.options.recording_duration = '1m'

...
...

pipeline_result = DataflowRunner().run_pipeline(p, options=options)

我尝试使用“options.view_as(pipeline_options.SetupOptions).requirements_file =“requirements.txt””来传递要求

我收到此错误

---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
     90     try:
---> 91       out = subprocess.check_output(*args, **kwargs)
     92     except OSError:

/opt/conda/lib/python3.7/subprocess.py in check_output(timeout, *popenargs, **kwargs)
    410     return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
--> 411                **kwargs).stdout
    412 

/opt/conda/lib/python3.7/subprocess.py in run(input, capture_output, timeout, check, *popenargs, **kwargs)
    511             raise CalledProcessError(retcode, process.args,
--> 512                                      output=stdout, stderr=stderr)
    513     return CompletedProcess(process.args, retcode, stdout, stderr)

CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-12-f018e5c84d08> in <module>
----> 1 pipeline_result = DataflowRunner().run_pipeline(p, options=options)

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options)
    491           environments.DockerEnvironment.from_container_image(
    492               apiclient.get_container_image_from_options(options),
--> 493               artifacts=environments.python_sdk_dependencies(options)))
    494 
    495     # This has to be performed before pipeline proto is constructed to make sure

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/environments.py in python_sdk_dependencies(options, tmp_dir)
    624           options,
    625           tmp_dir,
--> 626           skip_prestaged_dependencies=skip_prestaged_dependencies))

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in create_job_resources(options, temp_dir, build_setup_args, populate_requirements_cache, skip_prestaged_dependencies)
    178             populate_requirements_cache if populate_requirements_cache else
    179             Stager._populate_requirements_cache)(
--> 180                 setup_options.requirements_file, requirements_cache_path)
    181         for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
    182           resources.append((pkg, os.path.basename(pkg)))

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
    234       while True:
    235         try:
--> 236           return fun(*args, **kwargs)
    237         except Exception as exn:  # pylint: disable=broad-except
    238           if not retry_filter(exn):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir)
    569     ]
    570     _LOGGER.info('Executing command: %s', cmd_args)
--> 571     processes.check_output(cmd_args, stderr=processes.STDOUT)
    572 
    573   @staticmethod

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
     97           "Full traceback: {} \n Pip install failed for package: {} \
     98           \n Output from execution of subprocess: {}" \
---> 99           .format(traceback.format_exc(), args[0][6], error.output))
    100       else:
    101         raise RuntimeError("Full trace: {}, \

RuntimeError: Full traceback: Traceback (most recent call last):
  File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py", line 91, in check_output
    out = subprocess.check_output(*args, **kwargs)
  File "/opt/conda/lib/python3.7/subprocess.py", line 411, in check_output
    **kwargs).stdout
  File "/opt/conda/lib/python3.7/subprocess.py", line 512, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
 
 Pip install failed for package: -r           
 Output from execution of subprocess: b'Obtaining file:///root/apache-beam-custom/packages/beam/sdks/python (from -r requirements.txt (line 3))\n  Saved /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zip\nCollecting absl-py==0.11.0\n  Downloading absl-py-0.11.0.tar.gz (110 kB)\n  Saved /tmp/dataflow-requirements-cache/absl-py-0.11.0.tar.gz\nCollecting argon2-cffi==20.1.0\n  Downloading argon2-cffi-20.1.0.tar.gz (1.8 MB)\n  Installing build dependencies: started\n  Installing build dependencies: finished with status \'error\'\n  ERROR: Command errored out with exit status 1:\n   command: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\'\n       cwd: None\n  Complete output (85 lines):\n  Collecting setuptools>=40.6.0\n    Downloading setuptools-51.1.1.tar.gz (2.1 MB)\n  Collecting wheel\n    Downloading wheel-0.36.2.tar.gz (65 kB)\n  Collecting cffi>=1.0\n    Downloading cffi-1.14.4.tar.gz (471 kB)\n  Collecting pycparser\n    Downloading pycparser-2.20.tar.gz (161 kB)\n  Skipping wheel build for setuptools, due to binaries being disabled for it.\n  Skipping wheel build for wheel, due to binaries being disabled for it.\n  Skipping wheel build for cffi, due to binaries being disabled for it.\n  Skipping wheel build for pycparser, due to binaries being disabled for it.\n  Installing collected packages: setuptools, wheel, pycparser, cffi\n      Running setup.py install for setuptools: started\n      Running setup.py install for setuptools: finished with status \'done\'\n      Running setup.py install for wheel: started\n      Running setup.py install for wheel: finished with status \'done\'\n      Running setup.py install for pycparser: started\n      Running setup.py install for pycparser: finished with status \'done\'\n      Running setup.py install for cffi: started\n      Running setup.py install for cffi: finished with status \'error\'\n      ERROR: Command errored out with exit status 1:\n       command: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi\n           cwd: /tmp/pip-install-6zs5jguv/cffi/\n      Complete output (56 lines):\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      running install\n      running build\n      running build_py\n      creating build\n      creating build/lib.linux-x86_64-3.7\n      creating build/lib.linux-x86_64-3.7/cffi\n      copying cffi/setuptools_ext.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/pkgconfig.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/verifier.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/vengine_gen.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/backend_ctypes.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/__init__.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/cffi_opcode.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/error.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/api.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/commontypes.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/ffiplatform.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/lock.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/cparser.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/recompiler.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/vengine_cpy.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/model.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/_cffi_include.h -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/parse_c_type.h -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/_embedding.h -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/_cffi_errors.h -> build/lib.linux-x86_64-3.7/cffi\n      running build_ext\n      building \'_cffi_backend\' extension\n      creating build/temp.linux-x86_64-3.7\n      creating build/temp.linux-x86_64-3.7/c\n      gcc -pthread -B /opt/conda/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -fPIC -DUSE__THREAD -DHAVE_SYNC_SYNCHRONIZE -I/usr/include/ffi -I/usr/include/libffi -I/root/apache-beam-custom/include -I/opt/conda/include/python3.7m -c c/_cffi_backend.c -o build/temp.linux-x86_64-3.7/c/_cffi_backend.o\n      c/_cffi_backend.c:15:10: fatal error: ffi.h: No such file or directory\n       #include <ffi.h>\n                ^~~~~~~\n      compilation terminated.\n      error: command \'gcc\' failed with exit status 1\n      ----------------------------------------\n  ERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi Check the logs for full command output.\n  WARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\n  You should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n  ----------------------------------------\nERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\' Check the logs for full command output.\nWARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\nYou should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n'

我做错了什么吗?

-------------- 编辑-------------------------------- ------

好的,我的管道已经可以工作了,但我的requirements.txt 文件仍然存在问题,我相信我正确传递了该文件。

我的管道代码:

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

from google.cloud.bigtable.row import DirectRow
import datetime

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = ''

# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location

ib.options.recording_duration = '1m'

# The Google Cloud PubSub topic for this example.
topic = ""
subscription = ""
output_topic = ""

# Info
project_id = ""
bigtable_instance = ""
bigtable_table_id = ""

class CreateRowFn(beam.DoFn):
    def process(self,words):
        from google.cloud.bigtable.row import DirectRow
        import datetime
        direct_row = DirectRow(row_key="phone#4c410523#20190501")
        direct_row.set_cell(
            "stats_summary",
            b"os_build",
            b"android",
            datetime.datetime.now())
        return [direct_row]

p = beam.Pipeline(InteractiveRunner(),options=options)
words = p | "read" >> beam.io.ReadFromPubSub(subscription=subscription)
windowed_words = (words | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
# Writing to BigTable
test = words | beam.ParDo(CreateRowFn()) | WriteToBigTable(
    project_id=project_id,
            instance_id=bigtable_instance,
            table_id=bigtable_table_id)

pipeline_result = DataflowRunner().run_pipeline(p, options=options)

正如您在“CreateRowFn”中看到的,我需要导入
从 google.cloud.bigtable.row 导入 DirectRow 导入日期时间 只有这样才有效。

我已将 requests.txt 作为 options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt" 传递,我在 Dataflow 控制台上看到它。

如果删除导入语句,我会收到“进程中名称错误:名称'DirectRow'未定义”。

有什么办法可以克服这个问题吗?

google-cloud-dataflow apache-beam
2个回答
1
投票

我在常见问题解答中找到了答案。我的错误不是如何传入requirements.txt而是如何处理NameErrors

https://cloud.google.com/dataflow/docs/resources/faq

如何处理名称错误? 如果您在使用 Dataflow 服务执行管道时遇到 NameError,但在本地执行(即使用 DirectRunner)时未遇到 NameError,则您的 DoFns 可能正在使用全局命名空间中的值,而这些值在 Dataflow Worker 上不可用。

默认情况下,在 Dataflow 作业序列化期间不会保存主会话中定义的全局导入、函数和变量。例如,如果您的 DoFns 在主文件中定义并引用全局命名空间中的导入和函数,则可以将 --save_main_session 管道选项设置为 True。这将导致全局命名空间的状态被腌制并加载到数据流工作线程上。

请注意,如果全局命名空间中有无法进行 pickle 的对象,您将收到 pickle 错误。如果错误与 Python 发行版中应该可用的模块有关,您可以通过在使用该模块的本地导入该模块来解决此问题。

例如,代替:

import re
…
def myfunc():
  # use re module

用途:

def myfunc():
  import re
  # use re module

或者,如果您的 DoFns 跨越多个文件,您应该使用不同的方法来打包工作流程和管理依赖项。

所以结论是: 在函数中使用 import 语句是可以的


1
投票

Google Dataflow 工作人员已安装这些软件包:https://cloud.google.com/dataflow/docs/concepts/sdk-worker-dependency

  1. 如果您从云作曲家运行它 在这种情况下,您需要将新包添加到 PYPI PACKAGES 中,如下所示。

  2. 您还可以在运行时将 --requirements_file path://requirements.txt 作为命令中的标志传递。

  3. 我更喜欢使用 --setup_file path://setup.py 标志。安装文件的格式如下

       import setuptools
       
       REQUIRED_PACKAGES = [
           'joblib==0.15.1',
           'numpy==1.18.5',
           'google',
           'google-cloud',
           'google-cloud-storage',
           'cassandra-driver==3.22.0'
       ]
       PACKAGE_NAME = 'my_package'
       PACKAGE_VERSION = '0.0.1'
       setuptools.setup(
           name=PACKAGE_NAME,
           version=PACKAGE_VERSION,
           description='Searh Rank project',
           install_requires=REQUIRED_PACKAGES,
           author="Mohd Faisal",
           packages=setuptools.find_packages()
       )
  1. 数据流脚本使用以下格式:
from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
                                                  PipelineOptions,
                                                  SetupOptions,
                                                  StandardOptions,
                                                  WorkerOptions)
from datetime import date


class Userprocess(beam.DoFn):

    def process(self, msg):
        yield "OK"


def run(argv=None):
    logging.info("Parsing dataflow flags... ")
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--project',
        required=True,
        help=(
            'project id staging or production '))
    parser.add_argument(
        '--temp_location',
        required=True,
        help=(
            'temp location'))
    parser.add_argument(
        '--job_name',
        required=True,
        help=(
            'job name'))

    known_args, pipeline_args = parser.parse_known_args(argv)
    today = date.today()

    logging.info("Processing Date is " + str(today))
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.job_name = known_args.job_name
    google_cloud_options.temp_location = known_args.temp_location
    # pipeline_options.view_as(StandardOptions).runner = known_args.runner

    with beam.Pipeline(argv=pipeline_args, options=pipeline_options) as p:
        beam.ParDo(Userprocess())


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    logging.info("Starting dataflow daily pipeline ")
    try:
        run()
    except:
        pass


  1. 尝试在本地运行脚本以查找错误。
© www.soinside.com 2019 - 2024. All rights reserved.