我试图从两个生产者中读取数据 kafka
我的kafka主题是CSVStreamRetail和OrderItems。
val spark = SparkSession
.builder
.appName("Spark-Stream-Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
val ordersSchema = new StructType()
.add("order_id", IntegerType)
.add("order_date", StringType)
.add("order_customer_id", IntegerType)
.add("order_status", StringType)
val orderItemsSchema = new StructType()
.add("order_item_id",IntegerType)
.add("order_item_order_id",IntegerType)
.add("order_item_product_id",IntegerType)
.add("order_item_quantity",IntegerType)
.add("order_item_subtotal",DoubleType)
.add("order_item_product_price", DoubleType)
import spark.implicits._
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "CSVStreamRetail")
.load()
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "OrderItems")
.load()
val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
.select("orders_data.*","timestamp")
val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
.select("order_items_data.*","timestamp")
val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))
finalDF
.writeStream
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
我的kafka主题是CSVStreamRetail和OrderItems,我收到的输出是一个空的数据框。
首先,请检查你是否在你的 kafka
在流-流连接的情况下,你应该至少在一个流中提供水印。我看到你想执行一个内部连接,所以我添加了 200 seconds
水印,现在它在输出数据帧中显示数据。
val spark = SparkSession
.builder
.appName("Spark-Stream-Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
val ordersSchema = new StructType()
.add("order_id", IntegerType)
.add("order_date", StringType)
.add("order_customer_id", IntegerType)
.add("order_status", StringType)
val orderItemsSchema = new StructType()
.add("order_item_id",IntegerType)
.add("order_item_order_id",IntegerType)
.add("order_item_product_id",IntegerType)
.add("order_item_quantity",IntegerType)
.add("order_item_subtotal",DoubleType)
.add("order_item_product_price", DoubleType)
import spark.implicits._
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "CSVStreamRetail")
.load()
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "OrderItems")
.load()
val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
.select("orders_data.*","timestamp")
.withWatermark("timestamp","200 seconds")
val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
.select("order_items_data.*","timestamp")
.withWatermark("timestamp","200 seconds")
val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))
finalDF
.writeStream
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
使用eventTimestamp来加入.如果这有帮助,请告诉我。