如何在火花JDBC应用程序选项“DBTABLE”指定子查询,而在Greenplum的从表中读取数据? [重复]

问题描述 投票:-3回答:1

这个问题已经在这里有一个答案:

我试图从Greenplum的表数据读入的使用星火HDFS。我给子查询中选择阅读Greenplum的表如下。

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"

println("ExecQuery: " + execQuery)

val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
                     .option("dbtable", execQuery)
                     .option("user", devUsrName).option("password", devPwd)
                     .option("partitionColumn","id")
                     .option("lowerBound", 165512)
                     .option("upperBound", 11521481695656862L)
                     .option("numPartitions",300).load()

当我运行的代码,我看到以下异常:

Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF$1(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)

异常显示:public为dbname和子查询(使用ExecQuery)作为tablename

我试图给EXEC查询为:

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"

要么

val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"

他们都不是工作。我使用的jar:Greenplum的-spark_2.11-1.4.0.jar来读取数据的Greenplum。下面是火花提交我试着使用:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar

我从greenplumn文档提到的说明写的代码:https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html

我无法确定我在这里犯了这个错误。任何人都可以让我知道我该怎么解决这个问题?

scala apache-spark greenplum
1个回答
0
投票

选择与子查询来代替dbtable是内置的JDBC数据源的一个特征。然而Greenplum的星火连接器似乎并没有提供这样的能力。

具体地,源是由dbschemadbtable确定了the latter one should be(重点煤矿):

在Greenplum数据表的名称。从Greenplum的数据库中读取时,该表必须驻留在dbschema选项的值识别的Greenplum的数据库模式。

这说明你得到的异常。

在你共享的代码,同时没有证据表明你确实需要这样的功能。既然你不应用任何特定数据库的逻辑进程可能会简单地改写为

import org.apache.spark.sql.functions.{col, lit}

val allColumns: Seq[String] = ???

val dataDF = spark.read.format("greenplum")
  .option("url", conUrl)
  .option("dbtable", "xx_lines")
  .option("dbschema", "dbanscience")
  .option("partitionColumn", "id")
  .option("user", devUsrName)
  .option("password", devPwd)
  .load()
  .where("year = 2017 and month=12")
  .select(allColumns map col:_*)
  .withColumn(flagCol, lit(0))

请注意,您使用其他选项(upperBoundlowerBoundnumPartitionsare neither supported也不需要。

根据官方文档:

Greenplum的数据库跨段存储表数据。使用Greenplum的电火花连接器来加载Greenplum数据表火花应用识别一个特定的表列作为分配柱。的连接器使用的数据值在此列中的每个Greenplum数据段的一个或多个火花的分区分配特定的表中的数据行。

所以当你看到分配机制是内置的JDBC源完全不同。

连接器还提供了额外的partitionsPerSegment option which sets

每Greenplum数据段星火分区的数量。可选,默认值是1分。

© www.soinside.com 2019 - 2024. All rights reserved.