spark streaming:从kafka读取CSV字符串,写入镶木地板

问题描述 投票:0回答:1

从Kafka(写到镶木地板)有很多在线阅读json的例子 - 但我无法弄清楚如何将模式应用于kafka的CSV字符串。

流数据:

customer_1945,cusaccid_995,27999941    
customer_1459,cusaccid_1102,27999942

架构:

schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())

阅读流:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
  .option("subscribe", "test") \
  .load()

我用它来做JSON:

interval=df \
  .select(from_json(col("value").cast("string"), schema).alias("json")) \
  .select("json.*")

在使用指定的模式将其写入镶木地板之前:

query=interval     \
  .writeStream  \
  .format("parquet") \
  .option("checkpointLocation", "/user/whatever/checkpoint24") \
  .start("/user/ehatever/interval24")

因为我不能将from_json()用于CSV - 我不知道如何将模式应用于数据帧,以便我可以使用类似的writeStream()命令。

python csv apache-spark apache-kafka spark-structured-streaming
1个回答
1
投票

这就是我做到的。没有from_json,提取csv字符串:

interval=df.select(col("value").cast("string")) .alias("csv").select("csv.*")

然后将其拆分为列。这可以使用上面的相同声明写成镶木地板文件

interval2=interval \
      .selectExpr("split(value,',')[0] as customer_id" \
                 ,"split(value,',')[1] as customer_acct_id" \
                 ,"split(value,',')[2] as serv_acct_id" \
                 ,"split(value,',')[3] as installed_service_id" \
                 ,"split(value,',')[4] as meter_id" \
                 ,"split(value,',')[5] as channel_number" \
                 ... etc
                 )
© www.soinside.com 2019 - 2024. All rights reserved.