出于某种原因,jdbc postgresql 对于下沉批处理数据效果很好,但它不适用于我的新版本的 Spark 3.2.4、Scala 2.12.15 和 hadoop 3.3.4 的流数据。
Jar 文件:
在 Spark 3.1.3 中一切都工作得很好,但我需要升级 Spark
对于批量数据,以下代码按预期工作
conf = SparkConf()\
.setAppName('Testing jdbc postgresql')\
.set("spark.streaming.stopGracefullyOnShutdown", "true")\
.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
.set("spark.sql.shuffle.partitions", 2)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
data2 = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",400000000),
("Jen","Mary","Brown","","F",-1)
]
schema = T.StructType([ \
T.StructField("firstname",T.StringType(),True), \
T.StructField("middlename",T.StringType(),True), \
T.StructField("lastname",T.StringType(),True), \
T.StructField("id", T.StringType(), True), \
T.StructField("gender", T.StringType(), True), \
T.StructField("salary", T.IntegerType(), True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
# df.write.jdbc(url=url, table="test", mode=mode, properties=properties)
df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
.option("dbtable",os.environ['DB_TABLE_1MIN_MAP']).option("user",os.environ['USER_DB'])\
.option("password", os.environ['PASSWORD_DB']).mode(os.environ['MODE_DB'])\
.option("driver", "org.postgresql.Driver").save()
使用 .env 文件
FORMAT=jdbc
URL_DB=jdbc:postgresql://localhost:5432/testing
DB_TABLE_1MIN=c1_fraud_1min
DB_TABLE_5MIN=c1_fraud_5min
DB_TABLE_1MIN_MAP=c1_fraud_1min_map
USER_DB=postgres
PASSWORD_DB=1234
MODE_DB=append
CHECKPOINT_LOCATION_C1_1MIN=/user/ivan/chk-point-dir-1min/notify
CHECKPOINT_LOCATION_C1_5MIN=/user/ivan/chk-point-dir-5min/notify
CHECKPOINT_LOCATION_C1_1MIN_MAP=/user/ivan/chk-point-dir-1min-map/notify
但是,对于流数据,以下代码不适用于
format('jdbc')
(请考虑到它适用于format('console')
,这使得所有这个问题变得非常奇怪)(对于spark 3.1.3绝对一切正常)
conf = SparkConf()\
.setAppName('Testing Streaming testing 4')\
.set("spark.streaming.stopGracefullyOnShutdown", "true")\
.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
.set("spark.jars", "/home/ivan/spark/jars/postgresql-42.5.0.jar")\
.set("spark.sql.shuffle.partitions", 2)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
jsonFormatSchema = open("./schemas/JsonFormatSchema-v1.json","r").read()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094") \
.option("subscribe", "bank-creditcard-one-status") \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("substring(value, 6) as avro_value") \
.select(from_avro("avro_value", jsonFormatSchema).alias("value"))
kafka_df.printSchema()
flattened_df = kafka_df.select("value.*")\
.withColumn("created_time", F.to_timestamp(F.col("created_time"),
"yyyy-MM-dd HH:mm:ss"))\
.withColumn("no_approved_credit_c1_status",
F.when(F.col('approved_credit_c1_status')==0,1).otherwise(0))
flattened_df.printSchema()
window_agg_df_approved_c1_1_min = flattened_df \
.withWatermark("created_time","5 minute") \
.groupBy(
window(F.col("created_time"), "1 minute")) \
.agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
window_agg_df_approved_c1_5_min = flattened_df \
.withWatermark("created_time","5 minute") \
.groupBy( # col("BrokerCode"),
window(F.col("created_time"), "5 minute")) \
.agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
window_df_map_approved_c1_1_min = flattened_df \
.withWatermark("created_time","5 minute")
window_agg_df_approved_c1_1_min.printSchema()
window_agg_df_approved_c1_5_min.printSchema()
output_df_approved_c1_1_min\
= window_agg_df_approved_c1_1_min.select("window.start",
"window.end",
"total_approved_c1",
"total_no_approved_c1")
output_df_approved_c1_5_min\
= window_agg_df_approved_c1_5_min.select("window.start",
"window.end",
"total_approved_c1",
"total_no_approved_c1")
def foreach_batch_function_1min(df, epoch_id):
df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
.option("dbtable",os.environ['DB_TABLE_1MIN']).option("user",os.environ['USER_DB'])\
.option("password", os.environ['PASSWORD_DB']).option("driver", "org.postgresql.Driver").mode(os.environ['MODE_DB']).save()
output_df_approved_c1_1_min\
.writeStream\
.option("checkpointLocation", os.environ['CHECKPOINT_LOCATION_C1_1MIN'])\
.foreachBatch(foreach_batch_function_1min).start()
spark.streams.awaitAnyTermination()
请考虑到,如果我使用
format('console')
来代替,一切都会正常工作:
output_df_approved_c1_1_min.writeStream.format('console').outputMode("complete").start()
我附上了日志,希望这会有所帮助
711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] INFO SparkContext: Created broadcast 1 from start at NativeMethodAccessorImpl.java:0
23/11/12 10:38:27.533 stream execution thread for [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] ERROR MicroBatchExecution: Query [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] terminated with error
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy28.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: py4j.Py4JNetworkException: Error while sending a command: c
p0
call
ro133
L0
e
at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:253)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 30 more
Caused by: java.net.SocketException: Connection reset
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at py4j.ClientServerConnection.readBlockingResponse(ClientServerConnection.java:313)
at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:229)
... 31 more
23/11/12 10:38:27.615 shutdown-hook-0 INFO SparkContext: Invoking stop() from shutdown hook
23/11/12 10:38:27.620 shutdown-hook-0 INFO AbstractConnector: Stopped Spark@449eb268{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
23/11/12 10:38:27.623 shutdown-hook-0 INFO SparkUI: Stopped Spark web UI at http://192.168.100.54:4040
23/11/12 10:38:27.628 YARN application state monitor INFO YarnClientSchedulerBackend: Interrupting monitor thread
23/11/12 10:38:27.641 shutdown-hook-0 INFO YarnClientSchedulerBackend: Shutting down all executors
23/11/12 10:38:27.642 dispatcher-CoarseGrainedScheduler INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
23/11/12 10:38:27.646 shutdown-hook-0 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
23/11/12 10:38:27.657 dispatcher-event-loop-9 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/12 10:38:27.668 shutdown-hook-0 INFO MemoryStore: MemoryStore cleared
23/11/12 10:38:27.669 shutdown-hook-0 INFO BlockManager: BlockManager stopped
23/11/12 10:38:27.675 shutdown-hook-0 INFO BlockManagerMaster: BlockManagerMaster stopped
23/11/12 10:38:27.677 dispatcher-event-loop-12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/12 10:38:27.690 shutdown-hook-0 INFO SparkContext: Successfully stopped SparkContext
23/11/12 10:38:27.690 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called
23/11/12 10:38:27.691 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437/pyspark-6518207c-6aac-4439-bd7c-476e7e7637cf
23/11/12 10:38:27.693 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437
23/11/12 10:38:27.695 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-4f37d206-2576-42ad-95a9-1ead167eac4e
我非常感谢您的帮助。
错误消息显示:
java.net.SocketException: Connection reset
- 我认为罪魁祸首是这个参数:
URL_DB=jdbc:postgresql://localhost:5432/testing
如果您有多节点集群,并且 PostgreSQL 仅在驱动程序节点上运行,则在单独节点上运行的执行程序将尝试到达自己的
localhost
,并且会失败。相反,请尝试提供 PostgreSQL 主机名而不是 localhost
。