数据源io.pivotal.greenplum.spark.GreenplumRelationProvider不支持流式写入

问题描述 投票:0回答:2

我正在尝试从kafka读取数据并使用spark将其上传到greenplum数据库。我正在使用greenplum-spark连接器,但我得到数据源io.pivotal.greenplum.spark.GreenplumRelationProvider不支持流式写入。是不是greenplum源不支持流数据?我可以在网站上看到“连续ETL管道(流媒体)”。

我尝试将数据源作为“greenplum”和“io.pivotal.greenplum.spark.GreenplumRelationProvider”提供给.format(“datasource”)

val EventStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", args(0))
  .option("subscribe", args(1))
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load

val gscWriteOptionMap = Map(
  "url" -> "link for greenplum",
  "user" -> "****",
  "password" -> "****",
  "dbschema" -> "dbname"
)
val stateEventDS = EventStream
  .selectExpr("CAST(key AS String)", "*****(value)")
  .as[(String, ******)]
  .map(_._2)

val EventOutputStream = stateEventDS.writeStream
  .format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
  .options(gscWriteOptionMap)
  .start()

assetEventOutputStream.awaitTermination()
scala apache-kafka spark-streaming greenplum
2个回答
0
投票

Greenplum Spark Structured Streaming

演示如何使用JDBC将writeStream API与GPDB一起使用

以下代码块使用速率流源进行读取,并使用基于JDBC的接收器批量流式传输到GPDB

Batch based streaming

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

import scala.concurrent.duration._

val sq = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("myjdbc").
  option("checkpointLocation", "/tmp/jdbc-checkpoint").
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

Record based streaming

这使用ForeachWriter

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

import scala.concurrent.duration._

val url="jdbc:postgresql://gsc-dev:5432/gpadmin"
val user ="gpadmin"
val pwd = "changeme"
val jdbcWriter = new JDBCSink(url,user, pwd)

val sq = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format(jdbcWriter).
  option("checkpointLocation", "/tmp/jdbc-checkpoint").
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

1
投票

您使用的是什么版本的GPDB / Spark?你可以绕过火花,转而使用Greenplum-Kafka连接器。

https://gpdb.docs.pivotal.io/5170/greenplum-kafka/overview.html

在早期版本中,Greenplum-Spark Connector公开了名为io.pivotal.greenplum.spark.GreenplumRelationProvider的Spark数据源,以将Greenplum数据库中的数据读入Spark DataFrame。

在更高版本中,连接器公开名为greenplum的Spark数据源,以在Spark和Greenplum Database之间传输数据。

应该是这样的 -

val EventOutputStream = stateEventDS.write.format(“greenplum”).options(gscWriteOptionMap).save()

见:https://greenplum-spark.docs.pivotal.io/160/write_to_gpdb.html

© www.soinside.com 2019 - 2024. All rights reserved.