火花显示并收集返回不同的结果

问题描述 投票:0回答:1

我有一个根据kafka来源计算出的数据框。问题在于该DF上的“显示”和“收集”返回不同的结果。似乎“收集”在第一次返回时就已经缓存了结果,因此一直都是相同的结果。有人可以帮忙吗?

scala> order_online_total.collect
res113: Array[org.apache.spark.sql.Row] = Array([295,1583477534])

scala> order_online_total.show
20/03/06 15:28:04 WARN [dispatcher-event-loop-3] TaskSetManager: Stage 692 contains a task of very large size (316 KB). The maximum recommended task size is 100 KB.
+------------+----------+
|total_orders|   updated|
+------------+----------+
|        1260|1583479684|
+------------+----------+

代码:

val ds_order = spark.readStream.format("kafka").option("kafka.bootstrap.servers","xxx").option("subscribe","xxx").load()
ds_order.selectExpr("cast(value as string) as value").map(x => x.toString.substring(1 , x.toString.size-1))(Encoders.STRING)
          .selectExpr("get_json_object(value,'$.data[0].order_id') as order_id","get_json_object(value,'$.data[0].the_date') as the_date","get_json_object(value,'$.data[0].order_code') as order_code","get_json_object(value,'$.data[0].order_type') as order_type","get_json_object(value,'$.data[0].status') as status","get_json_object(value,'$.data[0].customer_id') as customer_id","get_json_object(value,'$.data[0].recipient_id') as recipient_id","get_json_object(value,'$.data[0].ship_entity_id') as ship_entity_id","get_json_object(value,'$.ts') as ts","get_json_object(value,'$.type') as type")
          .where(s"type in ('INSERT','UPDATE') and ts between $starttime and $endtime and the_date between '$starttimestr' and '$endtimestr'")
          .groupBy("order_id","order_code","order_type","status","customer_id","recipient_id","ship_entity_id","ts").count
          .writeStream.trigger(Trigger.ProcessingTime(s"${interval} seconds")).queryName("order_append")
          .format("memory").outputMode("complete").start()
spark.sql("""select tb4.order_id,tb4.order_code,tb4.status,tb4.order_type,tb4.customer_id,tb4.recipient_id,tb4.ship_entity_id from
                          (select tb3.order_id,tb3.order_code,tb3.status,tb3.order_type,tb3.customer_id,tb3.recipient_id,tb3.ship_entity_id,row_number() over (partition by tb3.order_id order by tb3.ts desc) as rank
                          from order_append tb3) tb4
                          where tb4.rank = 1""").registerTempTable("order_static")
val order_online_total = spark.sql("""select case when count(1)>0 then count(1) else 0 end as total_orders,unix_timestamp() as updated from order_static where order_type='online'""")
order_online.show
order_online.collect
scala apache-spark
1个回答
0
投票

@ ernest_k

val ds_order = spark.readStream.format("kafka").option("kafka.bootstrap.servers","xxx").option("subscribe","xxx").load()
ds_order.selectExpr("cast(value as string) as value").map(x => x.toString.substring(1 , x.toString.size-1))(Encoders.STRING)
          .selectExpr("get_json_object(value,'$.data[0].order_id') as order_id","get_json_object(value,'$.data[0].the_date') as the_date","get_json_object(value,'$.data[0].order_code') as order_code","get_json_object(value,'$.data[0].order_type') as order_type","get_json_object(value,'$.data[0].status') as status","get_json_object(value,'$.data[0].customer_id') as customer_id","get_json_object(value,'$.data[0].recipient_id') as recipient_id","get_json_object(value,'$.data[0].ship_entity_id') as ship_entity_id","get_json_object(value,'$.ts') as ts","get_json_object(value,'$.type') as type")
          .where(s"type in ('INSERT','UPDATE') and ts between $starttime and $endtime and the_date between '$starttimestr' and '$endtimestr'")
          .groupBy("order_id","order_code","order_type","status","customer_id","recipient_id","ship_entity_id","ts").count
          .writeStream.trigger(Trigger.ProcessingTime(s"${interval} seconds")).queryName("order_append")
          .format("memory").outputMode("complete").start()
spark.sql("""select tb4.order_id,tb4.order_code,tb4.status,tb4.order_type,tb4.customer_id,tb4.recipient_id,tb4.ship_entity_id from
                          (select tb3.order_id,tb3.order_code,tb3.status,tb3.order_type,tb3.customer_id,tb3.recipient_id,tb3.ship_entity_id,row_number() over (partition by tb3.order_id order by tb3.ts desc) as rank
                          from order_append tb3) tb4
                          where tb4.rank = 1""").registerTempTable("order_static")
val order_online_total = spark.sql("""select case when count(1)>0 then count(1) else 0 end as total_orders,unix_timestamp() as updated from order_static where order_type='online'""")
order_online.show
order_online.collect
© www.soinside.com 2019 - 2024. All rights reserved.