针对单个源具有多个接收器的结构化流

问题描述 投票:1回答:1

我们正在创建一个Spark结构化流应用程序,其中有一个单一来源和多个接收者(Kafka和HDFS)。我们在Spark 2.3.1集群上,因此无法在foreachBatch上使用DataStreamWriter。因此,我选择了“多流编写器”方法作为被提倡的here和其他多个地方。不幸的是,该Stream中的[[仅有一个]]适用于所有批次,而另一个仍处于休眠状态。我在这里想解决我的问题时缺少什么? val spark = SparkSession .builder .appName("Spark-Structured-Streaming") .enableHiveSupport() .getOrCreate() val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val words: Dataset[String] = lines.as[String] .flatMap(_.split(" ")) .map(_.concat(Random.nextInt(10).toString)) /** THIS DOESN'T STREAM TO HDFS (This work's only when the below one is commented) **/ words.writeStream .format("parquet") .option("path", "path/to/destination/dir") .option("checkpointLocation", "some_location_1") .start() /** THIS WORK'S **/ words.writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .option("checkpointLocation", "some_location_2") .start() spark.streams.awaitAnyTermination()

我只看到发布到Kafka的数据,但看不到写入HDFS的数据。只有当我注释掉Kafka出版流时,休眠的生命才诞生。

更新:使用简单的接收器的类似的易于重现的脚本

def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("Spark-Structured-Streaming").master("local[4]").getOrCreate() val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load() import spark.implicits._ val words: Dataset[String] = lines.as[String].flatMap(_.split(" ")).map(_.concat(Random.nextInt(10).toString)) words.writeStream.queryName("query1").format("console").start() words.writeStream.queryName("query2").foreach(new ForeachWriter[String] { override def open(partitionId: Long, version: Long): Boolean = true override def process(value: String): Unit = { println(value) } override def close(errorOrNull: Throwable): Unit = {} }) .start() spark.streams.awaitAnyTermination() }

日志:输入A B C

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 20/05/30 12:17:37 INFO SparkContext: Running Spark version 2.3.1 20/05/30 12:17:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 20/05/30 12:17:38 INFO SparkContext: Submitted application: Spark-Structured-Streaming 20/05/30 12:17:38 INFO SecurityManager: Changing view acls to: z001v7w 20/05/30 12:17:38 INFO SecurityManager: Changing modify acls to: z001v7w 20/05/30 12:17:38 INFO SecurityManager: Changing view acls groups to: 20/05/30 12:17:38 INFO SecurityManager: Changing modify acls groups to: 20/05/30 12:17:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(z001v7w); groups with view permissions: Set(); users with modify permissions: Set(z001v7w); groups with modify permissions: Set() 20/05/30 12:17:38 INFO Utils: Successfully started service 'sparkDriver' on port 49262. 20/05/30 12:17:39 INFO SparkEnv: Registering MapOutputTracker 20/05/30 12:17:39 INFO SparkEnv: Registering BlockManagerMaster 20/05/30 12:17:39 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 20/05/30 12:17:39 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 20/05/30 12:17:39 INFO DiskBlockManager: Created local directory at /private/var/folders/rr/vxr2k38j6_qcl3xmbf2g_25933gq0_/T/blockmgr-80dcbdff-eb1c-49e5-a33b-47a897735d31 20/05/30 12:17:39 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB 20/05/30 12:17:39 INFO SparkEnv: Registering OutputCommitCoordinator 20/05/30 12:17:39 INFO Utils: Successfully started service 'SparkUI' on port 4040. 20/05/30 12:17:39 INFO SparkUI: Bound SparkUI to 127.0.0.1, and started at http://localhost:4040 20/05/30 12:17:39 INFO Executor: Starting executor ID driver on host localhost 20/05/30 12:17:39 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49263. 20/05/30 12:17:39 INFO NettyBlockTransferService: Server created on localhost:49263 20/05/30 12:17:39 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 20/05/30 12:17:39 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, localhost, 49263, None) 20/05/30 12:17:39 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49263 with 2004.6 MB RAM, BlockManagerId(driver, localhost, 49263, None) 20/05/30 12:17:39 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, localhost, 49263, None) 20/05/30 12:17:39 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, localhost, 49263, None) 20/05/30 12:17:40 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/****/spark-warehouse/'). 20/05/30 12:17:40 INFO SharedState: Warehouse path is 'file:/****/spark-warehouse/'. 20/05/30 12:17:40 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 20/05/30 12:17:40 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery. 20/05/30 12:17:43 INFO MicroBatchExecution: Starting query1 [id = a5714e40-553f-4d21-9115-9c0d9ffe950b, runId = 8d206486-36d8-4319-ad3d-fa5cb00a79ca]. Use file:///private/var/folders/rr/vxr2k38j6_qcl3xmbf2g_25933gq0_/T/temporary-5f818bb9-b347-432f-8c43-07fccec0e9e8 to store the query checkpoint. 20/05/30 12:17:43 WARN ClosureCleaner: Expected a closure; got com.tgt.sign.POCTestMain$$anon$1 20/05/30 12:17:43 INFO MicroBatchExecution: Starting new streaming query. 20/05/30 12:17:43 INFO MicroBatchExecution: Streaming query made progress: { "id" : "a5714e40-553f-4d21-9115-9c0d9ffe950b", "runId" : "8d206486-36d8-4319-ad3d-fa5cb00a79ca", "name" : "query1", "timestamp" : "2020-05-30T17:17:43.915Z", "batchId" : 0, "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 0, "triggerExecution" : 10 }, "stateOperators" : [ ], "sources" : [ { "description" : "TextSocketSource[host: localhost, port: 9999]", "startOffset" : null, "endOffset" : null, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@5e761dda" } } 20/05/30 12:17:43 INFO MicroBatchExecution: Starting query2 [id = 776c6082-920a-4406-913f-c1b2eda5f12d, runId = 7d066ebb-fe7b-454c-98c5-2ed05a8c0bda]. Use file:///private/var/folders/rr/vxr2k38j6_qcl3xmbf2g_25933gq0_/T/temporary-7a3485d8-9973-43a7-8a35-4a5dd0745719 to store the query checkpoint. 20/05/30 12:17:43 INFO MicroBatchExecution: Starting new streaming query. 20/05/30 12:17:43 INFO MicroBatchExecution: Streaming query made progress: { "id" : "776c6082-920a-4406-913f-c1b2eda5f12d", "runId" : "7d066ebb-fe7b-454c-98c5-2ed05a8c0bda", "name" : "query2", "timestamp" : "2020-05-30T17:17:43.991Z", "batchId" : 0, "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 0, "triggerExecution" : 1 }, "stateOperators" : [ ], "sources" : [ { "description" : "TextSocketSource[host: localhost, port: 9999]", "startOffset" : null, "endOffset" : null, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "ForeachSink" } } 20/05/30 12:17:49 INFO MicroBatchExecution: Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1590859069597,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider)) 20/05/30 12:17:50 INFO CodeGenerator: Code generated in 474.062005 ms 20/05/30 12:17:51 INFO CodeGenerator: Code generated in 58.908865 ms 20/05/30 12:17:51 INFO WriteToDataSourceV2Exec: Start processing data source writer: org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6ccc266. The input RDD has 4 partitions. 20/05/30 12:17:51 INFO SparkContext: Starting job: start at POCTestMain.scala:147 20/05/30 12:17:51 INFO DAGScheduler: Got job 0 (start at POCTestMain.scala:147) with 4 output partitions 20/05/30 12:17:51 INFO DAGScheduler: Final stage: ResultStage 0 (start at POCTestMain.scala:147) 20/05/30 12:17:51 INFO DAGScheduler: Parents of final stage: List() 20/05/30 12:17:51 INFO DAGScheduler: Missing parents: List() 20/05/30 12:17:51 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at start at POCTestMain.scala:147), which has no missing parents 20/05/30 12:17:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 12.0 KB, free 2004.6 MB) 20/05/30 12:17:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.3 KB, free 2004.6 MB) 20/05/30 12:17:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:49263 (size: 5.3 KB, free: 2004.6 MB) 20/05/30 12:17:51 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039 20/05/30 12:17:51 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at start at POCTestMain.scala:147) (first 15 tasks are for partitions Vector(0, 1, 2, 3)) 20/05/30 12:17:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 20/05/30 12:17:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7851 bytes) 20/05/30 12:17:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7851 bytes) 20/05/30 12:17:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7851 bytes) 20/05/30 12:17:51 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 7995 bytes) 20/05/30 12:17:51 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 20/05/30 12:17:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 20/05/30 12:17:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 20/05/30 12:17:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 20/05/30 12:17:51 INFO CodeGenerator: Code generated in 25.371124 ms 20/05/30 12:17:51 INFO CodeGenerator: Code generated in 12.870672 ms 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 0 is committing. 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 2 is committing. 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 1 is committing. 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 0 committed. 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 1 committed. 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 2 committed. 20/05/30 12:17:51 INFO CodeGenerator: Code generated in 14.157876 ms 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 3 is committing. 20/05/30 12:17:51 INFO DataWritingSparkTask: Writer for partition 3 committed. 20/05/30 12:17:51 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1240 bytes result sent to driver 20/05/30 12:17:51 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1240 bytes result sent to driver 20/05/30 12:17:51 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1240 bytes result sent to driver 20/05/30 12:17:51 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2297 bytes result sent to driver 20/05/30 12:17:51 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 227 ms on localhost (executor driver) (1/4) 20/05/30 12:17:51 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 256 ms on localhost (executor driver) (2/4) 20/05/30 12:17:51 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 233 ms on localhost (executor driver) (3/4) 20/05/30 12:17:51 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 232 ms on localhost (executor driver) (4/4) 20/05/30 12:17:51 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 20/05/30 12:17:51 INFO DAGScheduler: ResultStage 0 (start at POCTestMain.scala:147) finished in 0.557 s 20/05/30 12:17:51 INFO DAGScheduler: Job 0 finished: start at POCTestMain.scala:147, took 0.616436 s 20/05/30 12:17:51 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6ccc266 is committing. ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ | A8| | B9| | C4| +-----+

我们正在创建一个Spark结构化流应用程序,其中有一个单一来源和多个接收者(Kafka和HDFS)。我们在Spark 2.3.1集群上,因此在...
apache-spark hadoop apache-spark-sql spark-streaming spark-structured-streaming
1个回答
0
投票
我也遇到类似的问题,并将代码更改为
© www.soinside.com 2019 - 2024. All rights reserved.