我有一个带有两列的spark数据框,分别为'keyCol'列和'valCol'列。数据帧的大小巨大,将近1亿行。我想以小批量(即每分钟10000条记录)的形式向kafka主题写入/产生数据框。这个Spark作业每天将运行一次,从而创建此数据帧
如何在下面的代码中以每分钟10000个记录的小批量实现写入,或者请提出是否有更好/有效的方法来实现这一目标。
spark_df.foreachPartition(partitions ->{
Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
while (partitions) {
Row row = partitions.next();
producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//Callback code goes here
}
});
}
return;
});
您可以如下使用grouped(10000)
函数并执行睡眠线程一分钟
config.foreachPartition(f => {
f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch
roqSeq.foreach( row => {
producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
//Callback code goes here
}
})
})
Thread.sleep(60000) // Sleep for 1 minute
}
)
})