从 Scala Spark 代码调用 Pyspark 脚本

问题描述 投票:0回答:2

我有一个 Scala Spark 应用程序,想要调用 pySpark/python (pyspark_script.py) 进行进一步处理。

有多种资源可以在 Python 中使用 Java/Scala 代码,但我正在寻找 scala->Pyspark

我探索了 Jython for Scala/Java 以包含 Python 代码,如下所示:

PythonInterpreter.initialize(System.getProperties, properties, sysArgs)
val pi = new PythonInterpreter()
pi.execfile("path/to/pyscript/mypysparkscript.py")

我看到错误:“ImportError:没有名为 pyspark 的模块”

Scala Spark 有什么方法可以使用相同的 SparkContext/Session 与 PYSpark 对话吗?

scala apache-spark pyspark jython
2个回答
0
投票

您可以使用 process 对象在 scala 中运行 shell 命令。

// Spark codes goes here .....
// Call pyspark code 
import sys.process._
"python3 /path/to/python/file.py.!!

要使用相同的会话,请将以下行添加到 python 文件中。

spark = SparkSession.builder.getOrCreate()

您也可以使用 getActiveSession() 方法。

注意: 确保安装了 pyspark 模块。 您可以使用

pip3 install pyspark
命令来做到这一点。


0
投票

Scala Spark 和 PySpark 的共生

该库解决了用Scala和Python开发的spark应用程序之间的交互问题。 当 Spark 操作需要在 Scala 中执行,然后在单次运行中在 Python 中执行时,这会有所帮助。 可以观察到对此类功能的某些需求:

这种需求可能是因为缺乏将代码从一种语言重写为另一种语言的能力。

使用方法:

如需快速介绍,请访问演示存储库:ScalaPySparkDemo

  • 创建新的 Scala 项目。

  • 将依赖添加到build.sbt

    libraryDependencies ++= Seq(
      "ru.mardaunt"        %% "pysparkwrapper" % "0.1.0",
      "org.apache.spark"   %% "spark-sql"      % "3.3.2"
    )
    
  • 准备您的 Scala Spark 应用程序。 在我们的示例中,它看起来平淡无奇:

    package ru.example
    
    import org.apache.spark.sql.SparkSession
    
    object PySparkDemo extends App {
    
      lazy val spark = SparkSession.builder()
                                   .master("local[*]")
                                   .getOrCreate()
    
    }
    
  • 准备您的 PySpark 应用程序并将其放入资源中。

  • 创建一个类,负责准备 PySpark 应用程序的启动。 为此,请扩展抽象 PySparkApp 类。这将是 python 项目的一种包装类。

    package ru.example
    
    import org.apache.log4j.Logger
    import org.apache.spark.sql.SparkSession
    import ru.mardaunt.python.PySparkApp
    import ru.mardaunt.python.logger.SimpleLogger
    
    class PySparkDemo(spark: SparkSession, logger: Logger)
      extends PySparkApp(mainPyName = "pyspark_main.py", needKerberosAuth = false)(spark, logger) {
    
      override protected val starterTool: String = "spark-submit"
    }
    

    注意,存放包装类的包名必须与资源中的python应用程序包名一致。 在我们的例子中,是:

    ru.example

  • 应用程序已准备好启动:

    import ru.mardaunt.python.logger.SimpleLogger
    
    new PySparkDemo(spark, SimpleLogger()).run()
    

    如果您在 IDE 中本地运行应用程序,请确保计算机上安装了 Spark。

    如果您想在集群上运行应用程序,则构建 JAR。 您需要确保正在构建一个胖 JAR。这是必要的,因为我们已经指定了外部依赖项:

    "ru.mardaunt" %% "pysparkwrapper" % "0.1.0"
    

    如果将工件 pysparkwrapper.jar 传递给

    --jars
    命令的
    spark-submit
    选项,则无法构建胖 JAR。

    或者您可以简单地将当前存储库的所有文件从包

    ru.mardaunt.python
    拖放到您的项目中。

恭喜!现在您知道如何使用该库了。


常见问题解答

如何更改 PySpark 应用程序中的配置?

  • 覆盖项目的python包装子类的字段:
        override protected val additionalSparkConfList: List[String] =
          List(
            "--conf", "spark.app.name=MY_FAVORITE_APP",
            "--conf", "spark.driver.cores=4"
          )
    

如何将参数传递给 PySpark 应用程序?

  • 您可以将参数列表传递给“run”方法:
      val args = List("a", "b", "c")
      new PySparkDemo(spark, SimpleLogger()).run(args)
    
    或者重写包装类字段:
      override protected val pythonArgs: List[String] = List("a", "b", "c")
    

如何启用kerberos授权?

  • 默认启用 Kerberos 授权。但是您可以使用包装类中的标志来控制授权:
      needKerberosAuth = false
    

我需要 python 中的特定依赖项。如何在 PySpark 应用程序中使用我的依赖项?

  • 为此,您应该已经有一个安装了库的 python 环境。 然后,您可以通过将 python 解释器的路径传递给驱动程序和执行程序来配置 PySpark 应用程序:
        override protected val additionalSparkConfList: List[String] =
          List(
            "--conf", "spark.app.name=MY_FAVORITE_APP",
            "--conf", "spark.driver.cores=4",
            "--conf", "spark.pyspark.python=/your/python/loc/bin/python",
            "--conf", "spark.pyspark.driver.python=/your/python/loc/bin/python"
          )
    
© www.soinside.com 2019 - 2024. All rights reserved.