在 Spark 上运行 python Apache Beam 管道

问题描述 投票:0回答:3

我在这里尝试 apache beam(带有 python sdk),因此我创建了一个简单的管道,并尝试将其部署在 Spark 集群上。

from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

op = PipelineOptions([
        "--runner=DirectRunner"
    ]
)


with beam.Pipeline(options=op) as p:
    p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x+1) | beam.Map(print)

该管道与 DirectRunner 配合良好。因此,要在 Spark 上部署相同的代码(因为可移植性是 Beam 中的一个关键概念)...

首先,我编辑了

PipelineOptions
,如此处所述:

op = PipelineOptions([
        "--runner=PortableRunner",
        "--job_endpoint=localhost:8099",
        "--environment_type=LOOPBACK"
    ]
)

job_endpoint
是我使用以下命令运行的 beam Spark 作业服务器 的 docker 容器的 url:

docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://SPARK_URL:SPARK_PORT

这应该可以很好地工作,但作业在 Spark 上失败并出现以下错误:

20/10/31 14:35:58 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.

java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 6543101073799644159, local class serialVersionUID = 1574364215946805297

此外,我在

beam_spark_job_server
日志中有此警告:

WARN org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a new Spark Context.

知道问题出在哪里吗?有没有其他方法可以在 Spark 上运行 python Beam Pipelines 而无需通过容器化服务?

python apache-spark apache-beam
3个回答
2
投票

发生这种情况的原因可能是作业服务器中包含的 Spark 客户端版本与您向其提交作业的 Spark 版本之间存在版本不匹配。


0
投票

希望现在回答这个问题还不算太晚。是的,这是由于版本不匹配造成的。我测试过,它只适用于 Spark 2,不适用于版本 3。如果您需要在 Kubernetes 上运行的示例,可以参考 https://github.com/cometta/python-apache-beam-spark 。如果它对您有用,可以帮助我为存储库“加星标”。请随意在存储库上创建问题,我会调查它。


0
投票

我花了一分钟才意识到有一个独立的 docker 镜像可与 Spark v3 一起使用:

https://hub.docker.com/r/apache/beam_spark3_job_server

我在最初尝试将 https://hub.docker.com/r/apache/beam_spark_job_server 与 Spark v3 一起使用时遇到了此错误。使用适当的 docker 镜像为我解决了这个问题。

© www.soinside.com 2019 - 2024. All rights reserved.