如何在 spark 集群(Synapse)中运行需要外部文件的 Scala udf?

问题描述 投票:0回答:0

这里是我如何在我的 Synapse spark 集群中挂载文件

%%spark

mssparkutils.fs.mount( 
    "abfss://[email protected]/path/", 
    "/mnt", 
    Map("linkedService" -> "myLinkedService")
)

我有一个使用这个挂载来访问文件的 python UDF:

job_id = mssparkutils.env.getJobId()
file_path = f"/synfs/{job_id}/mnt/myfile.txt"

效果很好。我可以用 udf 中的那个文件做我需要的事。

但为了性能,我在 IntelliJ 中用 Scala 重写了我的 udfs,并将其打包为一个 jar,并将其添加到我的集群中。但是我无法让 jar/jvm 在这个挂载路径上找到文件。

这是我的 Scala 应用程序的简化版本

object TestUDF {

  lazy val myFile = {
    val job_id = mssparkutils.env.getJobId()
    val file = new File(f"synfs:/$job_id%s/mnt/myfile.txt")
    // do something with file
  }

  def doThingUDF(): {
    // do thing with this.myFile
  }
}

这是我在调用时遇到的错误:

引起:java.lang.AssertionError:断言失败:“/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1677385585472_0001/container_1677385585472_0001_01_000002/synfs-4-deb29f:/e78d29 4bbe-a917-f7b19c37c682/mnt/myfile.txt”不是文件

我还尝试了其他一些方法,例如在 Scala 应用程序本身内部使用 mssparkutils 来尝试创建一个装载。或者使用 mssparkutils 将文件复制到“本地”文件路径,然后尝试访问它,但我得到一个空异常。

    mssparkutils.fs.mkdirs("file:///tmp/temp")
    mssparkutils.fs.cp(f"synfs:/$job_id%s/mnt/myfile.txt", "file:///tmp/temp/myfile.txt")

通过以下方式致电:

  lazy val myFile = {
    val file = new File("file:///tmp/temp/myfile.txt")
  }

但是我得到一个空指针异常。

关于如何解决对该文件的访问的任何想法?如果直接有 abfss 等其他选项,我不一定非要使用挂载。该文件位于 adls2(Azure 数据湖存储)中。

scala apache-spark azure-synapse
© www.soinside.com 2019 - 2024. All rights reserved.