我有一个 Scala Spark 应用程序,想要调用 pySpark/python (pyspark_script.py) 进行进一步处理。
有多种资源可以在 Python 中使用 Java/Scala 代码,但我正在寻找 scala->Pyspark
我探索了 Jython for Scala/Java 以包含 Python 代码,如下所示:
PythonInterpreter.initialize(System.getProperties, properties, sysArgs)
val pi = new PythonInterpreter()
pi.execfile("path/to/pyscript/mypysparkscript.py")
我看到错误:“ImportError:没有名为 pyspark 的模块”
Scala Spark 有什么方法可以使用相同的 SparkContext/Session 与 PYSpark 对话吗?
您可以使用 process 对象在 scala 中运行 shell 命令。
// Spark codes goes here .....
// Call pyspark code
import sys.process._
"python3 /path/to/python/file.py.!!
要使用相同的会话,请将以下行添加到 python 文件中。
spark = SparkSession.builder.getOrCreate()
您也可以使用 getActiveSession() 方法。
注意: 确保安装了 pyspark 模块。 您可以使用
pip3 install pyspark
命令来做到这一点。
该库解决了用Scala和Python开发的spark应用程序之间的交互问题。 当 Spark 操作需要在 Scala 中执行,然后在单次运行中在 Python 中执行时,这会有所帮助。 可以观察到对此类功能的某些需求:
这种需求可能是因为缺乏将代码从一种语言重写为另一种语言的能力。
如需快速介绍,请访问演示存储库:ScalaPySparkDemo
创建新的 Scala 项目。
将依赖添加到build.sbt
libraryDependencies ++= Seq(
"ru.mardaunt" %% "pysparkwrapper" % "0.1.0",
"org.apache.spark" %% "spark-sql" % "3.3.2"
)
准备您的 Scala Spark 应用程序。 在我们的示例中,它看起来平淡无奇:
package ru.example
import org.apache.spark.sql.SparkSession
object PySparkDemo extends App {
lazy val spark = SparkSession.builder()
.master("local[*]")
.getOrCreate()
}
准备您的 PySpark 应用程序并将其放入资源中。
创建一个类,负责准备 PySpark 应用程序的启动。 为此,请扩展抽象 PySparkApp 类。这将是 python 项目的一种包装类。
package ru.example
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import ru.mardaunt.python.PySparkApp
import ru.mardaunt.python.logger.SimpleLogger
class PySparkDemo(spark: SparkSession, logger: Logger)
extends PySparkApp(mainPyName = "pyspark_main.py", needKerberosAuth = false)(spark, logger) {
override protected val starterTool: String = "spark-submit"
}
注意,存放包装类的包名必须与资源中的python应用程序包名一致。 在我们的例子中,是:
ru.example
应用程序已准备好启动:
import ru.mardaunt.python.logger.SimpleLogger
new PySparkDemo(spark, SimpleLogger()).run()
如果您在 IDE 中本地运行应用程序,请确保计算机上安装了 Spark。
如果您想在集群上运行应用程序,则构建 JAR。 您需要确保正在构建一个胖 JAR。这是必要的,因为我们已经指定了外部依赖项:
"ru.mardaunt" %% "pysparkwrapper" % "0.1.0"
如果将工件 pysparkwrapper.jar 传递给
--jars
命令的 spark-submit
选项,则无法构建胖 JAR。
或者您可以简单地将当前存储库的所有文件从包
ru.mardaunt.python
拖放到您的项目中。
恭喜!现在您知道如何使用该库了。
override protected val additionalSparkConfList: List[String] =
List(
"--conf", "spark.app.name=MY_FAVORITE_APP",
"--conf", "spark.driver.cores=4"
)
val args = List("a", "b", "c")
new PySparkDemo(spark, SimpleLogger()).run(args)
或者重写包装类字段:
override protected val pythonArgs: List[String] = List("a", "b", "c")
needKerberosAuth = false
override protected val additionalSparkConfList: List[String] =
List(
"--conf", "spark.app.name=MY_FAVORITE_APP",
"--conf", "spark.driver.cores=4",
"--conf", "spark.pyspark.python=/your/python/loc/bin/python",
"--conf", "spark.pyspark.driver.python=/your/python/loc/bin/python"
)