我正在尝试在Spark中创建一个从Kafka获取数据的流。当我检查RDD中的记录计数时,似乎计数与Web UI不同。
我为DStream中的所有RDD执行一个函数(代码是用Python生成的:]
rdds = KafkaUtils.createStream(...)
rdds = rdds.repartition(1)
rdds.foreachRDD(doJob)
还有doJob函数,我有一个循环和一个计数器
def doJob(time, p_rdd):
if not p_rdd.isEmpty:
batch_count = 0
...
...
rdd_collected = p_rdd.collect()
for record in rdd_collected:
...
...
batch_count = batch_count + 1
log("Count: " + str(batch_count))
我的期望是batch_count应该与http:// webui.adress / my_app_id / Streaming页面->已完成的批处理部分->输入大小相同。但似乎并非如此。我应该在Web ui的哪里检查RDD记录计数,我丢失了什么?
谢谢。
我认为这是错误的。必须相同。我使用了这种逻辑,它似乎与我已处理的记录正确匹配。
您可以在UI的同一流表中的http://webui.adress/my_app_id/Streaming
下的Completed Batches -> Records
处进行检查。