从 dataProc Spark 作业将数据发布到 pubSub 时出错:找不到功能通道服务提供者

问题描述 投票:0回答:1

我正在 GCP DataProc 集群上运行 Spark scala 作业。处理数据后,我需要将消息发布到 PubSub 主题,但出现如下错误。

未找到功能渠道服务商。尝试添加依赖项 在 grpc-okhttp、grpc-netty 或 grpc-netty-shaded 工件上

在火花处理之前一切正常。当我向 PubSub 发布消息时,我收到此错误。看代码...

Try {

  val topicName = TopicName.of(projectName, pubSubTopicName)

  val scope = new ArrayList[String]()
  scope.add("https://www.googleapis.com/auth/pubsub")

  val googleCredentials = GoogleCredentials
        .fromStream(getClass.getResourceAsStream("file path")
        .createScoped(scope)

  val batchingSettings = BatchingSettings
  .newBuilder()
  .setElementCountThreshold(elementCountThreshold)
  .setRequestByteThreshold(requestByteThreshold)
  .setDelayThreshold(delayDuration)
  .build()

  val publisher = getPublisher(
    topicName,
    batchingSettings,
    googleCredentials
  )

  val publishedData: MutableList[String] = MutableList()

  for (pubMessage <- dataToBePublished) {
    val pubSubMessage =
      getPubSubMessage(
        ByteString.copyFromUtf8(pubMessage)
      )

    val messageIdFuture = publisher.publish(pubSubMessage)

    publishedData.+=(messageIdFuture.get)
  }
}

def getPublisher(
      topicName: TopicName,
      batchingSettings: BatchingSettings,
      googleCredentials: GoogleCredentials
  ): Publisher = {

Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setBatchingSettings(batchingSettings)
  .build()

}

def getPubSubMessage( 数据:字节串 ): 发布订阅消息 = {

PubsubMessage
  .newBuilder()
  .setData(data)
  .build()

}

由于它显示频道错误,我在发布者中尝试了以下更改,但出现了相同的错误

    Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setChannelProvider(
    TopicAdminSettings
      .defaultGrpcTransportProviderBuilder()
      .build()
  )
  .build()

我也尝试在 sbt 中添加依赖项,但仍然出现相同的错误

"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"

所有三个建议的依赖项都在库中,但仍然错误。

请帮忙解决这个问题,提前致谢。

scala apache-spark google-cloud-pubsub google-cloud-dataproc
1个回答
0
投票

所以问题在于由于 pubsub 库而组装 fat jar。

以下是 build.sbt 中所需的更改

  • 仅添加grpc-netty的依赖。
"io.grpc" % "grpc-netty" % "1.49.2"
  • 更改 jar 组装中的合并策略。
assemblyShadeRules in assembly := Seq(
  ShadeRule
    .rename("com.google.common.**" -> "repackaged.com.google.common.@1")
    .inAll,
  ShadeRule
    .rename("com.google.protobuf.**" -> "repackaged.com.google.protobuf.@1")
    .inAll,
)
  • 此外,遮盖(重新打包)一些 com.google 的库,这些库在发布者创建时会导致运行时出现问题。
assemblyMergeStrategy in assembly := {
  case x if Assembly.isConfigFile(x) =>
    MergeStrategy.concat
  case PathList(ps @ _*) if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) =>
    MergeStrategy.rename
  case PathList("META-INF", xs @ _*) =>
    (xs map { _.toLowerCase }) match {
      case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
        MergeStrategy.discard
      case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
        MergeStrategy.discard
      case "plexus" :: xs =>
        MergeStrategy.discard
      case "services" :: xs =>
        MergeStrategy.filterDistinctLines
      case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) =>
        MergeStrategy.filterDistinctLines
      case _ => MergeStrategy.first
    }
  case _ => MergeStrategy.first
}

这将不会出现运行时错误。

© www.soinside.com 2019 - 2024. All rights reserved.