jdbc postgres 在 Spark 3.2.4 中批量写入但不流式传输

问题描述 投票:0回答:1

出于某种原因,jdbc postgresql 对于下沉批处理数据效果很好,但它不适用于我的新版本的 Spark 3.2.4、Scala 2.12.15 和 hadoop 3.3.4 的流数据。

Jar 文件:

  • kafka-clients-3.3.2.jar
  • postgresql-42.5.0.jar
  • spark-sql-kafka-0-10_2.12-3.2.4.jar
  • spark-streaming-kafka-0-10- assembly_2.12-3.2.4.jar
  • spark-token-provider-kafka-0-10_2.12-3.2.4.jar
  • commons-pool2-2.11.1.jar

在 Spark 3.1.3 中一切都工作得很好,但我需要升级 Spark

对于批量数据,以下代码按预期工作

conf = SparkConf()\
.setAppName('Testing jdbc postgresql')\
.set("spark.streaming.stopGracefullyOnShutdown", "true")\
.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
.set("spark.sql.shuffle.partitions", 2)

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()




data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",400000000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = T.StructType([ \
    T.StructField("firstname",T.StringType(),True), \
    T.StructField("middlename",T.StringType(),True), \
    T.StructField("lastname",T.StringType(),True), \
    T.StructField("id", T.StringType(), True), \
    T.StructField("gender", T.StringType(), True), \
    T.StructField("salary", T.IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)

# df.write.jdbc(url=url, table="test", mode=mode, properties=properties)
df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
        .option("dbtable",os.environ['DB_TABLE_1MIN_MAP']).option("user",os.environ['USER_DB'])\
        .option("password", os.environ['PASSWORD_DB']).mode(os.environ['MODE_DB'])\
        .option("driver", "org.postgresql.Driver").save()

使用 .env 文件

FORMAT=jdbc
URL_DB=jdbc:postgresql://localhost:5432/testing
DB_TABLE_1MIN=c1_fraud_1min
DB_TABLE_5MIN=c1_fraud_5min
DB_TABLE_1MIN_MAP=c1_fraud_1min_map
USER_DB=postgres
PASSWORD_DB=1234
MODE_DB=append
CHECKPOINT_LOCATION_C1_1MIN=/user/ivan/chk-point-dir-1min/notify
CHECKPOINT_LOCATION_C1_5MIN=/user/ivan/chk-point-dir-5min/notify
CHECKPOINT_LOCATION_C1_1MIN_MAP=/user/ivan/chk-point-dir-1min-map/notify

但是,对于流数据,以下代码不适用于

format('jdbc')
(请考虑到它适用于
format('console')
,这使得所有这个问题变得非常奇怪)(对于spark 3.1.3绝对一切正常)

conf = SparkConf()\
        .setAppName('Testing Streaming testing 4')\
        .set("spark.streaming.stopGracefullyOnShutdown", "true")\
        .set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
        .set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
        .set("spark.jars", "/home/ivan/spark/jars/postgresql-42.5.0.jar")\
        .set("spark.sql.shuffle.partitions", 2)

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

jsonFormatSchema = open("./schemas/JsonFormatSchema-v1.json","r").read()  
    
kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094") \
        .option("subscribe", "bank-creditcard-one-status") \
        .option("startingOffsets", "earliest") \
        .load() \
        .selectExpr("substring(value, 6) as avro_value") \
        .select(from_avro("avro_value", jsonFormatSchema).alias("value")) 
        

kafka_df.printSchema()


flattened_df = kafka_df.select("value.*")\
    .withColumn("created_time", F.to_timestamp(F.col("created_time"), 
                                               "yyyy-MM-dd HH:mm:ss"))\
    .withColumn("no_approved_credit_c1_status", 
                F.when(F.col('approved_credit_c1_status')==0,1).otherwise(0))
    
flattened_df.printSchema()

    

window_agg_df_approved_c1_1_min = flattened_df \
    .withWatermark("created_time","5 minute") \
    .groupBy( 
     window(F.col("created_time"), "1 minute")) \
    .agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
    F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
    
window_agg_df_approved_c1_5_min = flattened_df \
    .withWatermark("created_time","5 minute") \
    .groupBy(  # col("BrokerCode"),
     window(F.col("created_time"), "5 minute")) \
    .agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
    F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
    
window_df_map_approved_c1_1_min = flattened_df \
    .withWatermark("created_time","5 minute") 
    
window_agg_df_approved_c1_1_min.printSchema()
window_agg_df_approved_c1_5_min.printSchema()


output_df_approved_c1_1_min\
    = window_agg_df_approved_c1_1_min.select("window.start", 
                                             "window.end", 
                                             "total_approved_c1",
                                             "total_no_approved_c1")
output_df_approved_c1_5_min\
    = window_agg_df_approved_c1_5_min.select("window.start", 
                                             "window.end", 
                                             "total_approved_c1",
                                             "total_no_approved_c1")




def foreach_batch_function_1min(df, epoch_id):
    
    df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
    .option("dbtable",os.environ['DB_TABLE_1MIN']).option("user",os.environ['USER_DB'])\
    .option("password", os.environ['PASSWORD_DB']).option("driver", "org.postgresql.Driver").mode(os.environ['MODE_DB']).save()

output_df_approved_c1_1_min\
    .writeStream\
    .option("checkpointLocation", os.environ['CHECKPOINT_LOCATION_C1_1MIN'])\
    .foreachBatch(foreach_batch_function_1min).start()

spark.streams.awaitAnyTermination()

请考虑到,如果我使用

format('console')
来代替,一切都会正常工作:

output_df_approved_c1_1_min.writeStream.format('console').outputMode("complete").start()

我附上了日志,希望这会有所帮助

 711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] INFO SparkContext: Created broadcast 1 from start at NativeMethodAccessorImpl.java:0
    23/11/12 10:38:27.533 stream execution thread for [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] ERROR MicroBatchExecution: Query [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] terminated with error
    py4j.Py4JException: Error while sending a command.
        at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy28.call(Unknown Source)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
    Caused by: py4j.Py4JNetworkException: Error while sending a command: c
    p0
    call
    ro133
    L0
    e
    
        at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:253)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
        ... 30 more
    Caused by: java.net.SocketException: Connection reset
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
        at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
        at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
        at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
        at py4j.ClientServerConnection.readBlockingResponse(ClientServerConnection.java:313)
        at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:229)
        ... 31 more
    23/11/12 10:38:27.615 shutdown-hook-0 INFO SparkContext: Invoking stop() from shutdown hook
    23/11/12 10:38:27.620 shutdown-hook-0 INFO AbstractConnector: Stopped Spark@449eb268{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
    23/11/12 10:38:27.623 shutdown-hook-0 INFO SparkUI: Stopped Spark web UI at http://192.168.100.54:4040
    23/11/12 10:38:27.628 YARN application state monitor INFO YarnClientSchedulerBackend: Interrupting monitor thread
    23/11/12 10:38:27.641 shutdown-hook-0 INFO YarnClientSchedulerBackend: Shutting down all executors
    23/11/12 10:38:27.642 dispatcher-CoarseGrainedScheduler INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
    23/11/12 10:38:27.646 shutdown-hook-0 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
    23/11/12 10:38:27.657 dispatcher-event-loop-9 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    23/11/12 10:38:27.668 shutdown-hook-0 INFO MemoryStore: MemoryStore cleared
    23/11/12 10:38:27.669 shutdown-hook-0 INFO BlockManager: BlockManager stopped
    23/11/12 10:38:27.675 shutdown-hook-0 INFO BlockManagerMaster: BlockManagerMaster stopped
    23/11/12 10:38:27.677 dispatcher-event-loop-12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    23/11/12 10:38:27.690 shutdown-hook-0 INFO SparkContext: Successfully stopped SparkContext
    23/11/12 10:38:27.690 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called
    23/11/12 10:38:27.691 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437/pyspark-6518207c-6aac-4439-bd7c-476e7e7637cf
    23/11/12 10:38:27.693 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437
    23/11/12 10:38:27.695 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-4f37d206-2576-42ad-95a9-1ead167eac4e

我非常感谢您的帮助。

pyspark spark-streaming spark-jdbc
1个回答
0
投票

错误消息显示:

java.net.SocketException: Connection reset
- 我认为罪魁祸首是这个参数:

URL_DB=jdbc:postgresql://localhost:5432/testing

如果您有多节点集群,并且 PostgreSQL 仅在驱动程序节点上运行,则在单独节点上运行的执行程序将尝试到达自己的

localhost
,并且会失败。相反,请尝试提供 PostgreSQL 主机名而不是
localhost

© www.soinside.com 2019 - 2024. All rights reserved.