如何在kafka中摄取两个生产者的数据并使用spark结构化流加入?

问题描述 投票:1回答:1

我试图从两个生产者中读取数据 kafka 我的kafka主题是CSVStreamRetail和OrderItems。

val spark = SparkSession
      .builder
      .appName("Spark-Stream-Example")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp")
      .getOrCreate()

    val ordersSchema = new StructType()
      .add("order_id", IntegerType)
      .add("order_date", StringType)
      .add("order_customer_id", IntegerType)
      .add("order_status", StringType)

    val orderItemsSchema = new StructType()
      .add("order_item_id",IntegerType)
      .add("order_item_order_id",IntegerType)
      .add("order_item_product_id",IntegerType)
      .add("order_item_quantity",IntegerType)
      .add("order_item_subtotal",DoubleType)
      .add("order_item_product_price", DoubleType)

    import spark.implicits._

    val df1 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "CSVStreamRetail")
      .load()

    val df2 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "OrderItems")
      .load()

    val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
      .select("orders_data.*","timestamp")

    val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
      .select("order_items_data.*","timestamp")

    val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

    finalDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .start()
      .awaitTermination()

我的kafka主题是CSVStreamRetail和OrderItems,我收到的输出是一个空的数据框。

apache-kafka apache-spark-sql spark-structured-streaming
1个回答
0
投票

首先,请检查你是否在你的 kafka 在流-流连接的情况下,你应该至少在一个流中提供水印。我看到你想执行一个内部连接,所以我添加了 200 seconds 水印,现在它在输出数据帧中显示数据。

val spark = SparkSession
  .builder
  .appName("Spark-Stream-Example")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp")
  .getOrCreate()

val ordersSchema = new StructType()
  .add("order_id", IntegerType)
  .add("order_date", StringType)
  .add("order_customer_id", IntegerType)
  .add("order_status", StringType)

val orderItemsSchema = new StructType()
  .add("order_item_id",IntegerType)
  .add("order_item_order_id",IntegerType)
  .add("order_item_product_id",IntegerType)
  .add("order_item_quantity",IntegerType)
  .add("order_item_subtotal",DoubleType)
  .add("order_item_product_price", DoubleType)

import spark.implicits._

val df1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "CSVStreamRetail")
  .load()

val df2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "OrderItems")
  .load()

val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
  .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
  .select("orders_data.*","timestamp")
  .withWatermark("timestamp","200 seconds")

val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
  .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
  .select("order_items_data.*","timestamp")
  .withWatermark("timestamp","200 seconds")

val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

finalDF
  .writeStream
  .format("console")
  .option("truncate", "false")
  .start()
  .awaitTermination()

使用eventTimestamp来加入.如果这有帮助,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.