如何使用Spark Java将Spark Dataframe写入Kafka Producer时控制记录数

问题描述 投票:0回答:1

我有一个带有两列的spark数据框,分别为'keyCol'列和'valCol'列。数据帧的大小巨大,将近1亿行。我想以小批量(即每分钟10000条记录)的形式向kafka主题写入/产生数据框。这个Spark作业每天将运行一次,从而创建此数据帧

如何在下面的代码中以每分钟10000个记录的小批量实现写入,或者请提出是否有更好/有效的方法来实现这一目标。

spark_df.foreachPartition(partitions ->{
            Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
            while (partitions) {
                Row row =  partitions.next();
                producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                       //Callback code goes here
                    }
                });
            }
            return;
        });
dataframe apache-spark apache-kafka spark-streaming kafka-producer-api
1个回答
0
投票

您可以如下使用grouped(10000)函数并执行睡眠线程一分钟

config.foreachPartition(f => {
      f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch

        roqSeq.foreach( row => {
          producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
            def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
              //Callback code goes here
            }
          })
        })
          Thread.sleep(60000) // Sleep for 1 minute
        }
      )
    })
© www.soinside.com 2019 - 2024. All rights reserved.