使用spark.readStream .format(“s3-sqs”)获取空值以获取SQS消息

问题描述 投票:0回答:1

我正在尝试从Amazon SQS队列中读取消息。权限正常,我可以看到记录计数 - 但所有记录都是空的。无法弄清楚为什么我得到空值。我可以在SQS队列中看到消息,并且可以从本地Python实例获取消息,它们实际上是反映模式的JSON记录(尽管不是100%确定我已正确实现了模式)。

使用“rate”格式也可以在代码中一直使用。

文档在这方面非常稀少。

任何建议,将不胜感激。

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

val awsAccessKey = "blahblah"    
val awsSecretKey = "blahblahblahblah"
val awsRegion = "us-east-1"

val SQSQueue = "https://sqs.us-east1.amazonaws.com/blahblahblahblah/blahblahblahblah"

// SQS Event Structure
val sqsSchema = new StructType()
      .add(StructField("Records", ArrayType(new StructType()
      .add(StructField("eventVersion", StringType))
      .add(StructField("eventSource", StringType))
      .add(StructField("awsRegion", StringType))
      .add(StructField("eventTime", StringType))
      .add(StructField("eventName", StringType))
      .add(StructField("userIdentity",StringType))
      .add(StructField("eventName", StringType))
      .add("userIdentity", new StructType()
          .add(StructField("principalId", StringType)))
          .add("requestParameters", new StructType()
          .add(StructField("sourceIPAddress", StringType)))
     .add("responseElements", new StructType()
          .add(StructField("x-amz-request-id", StringType))
          .add(StructField("x-amz-id-2", StringType))
    )
.add("s3", new StructType()
    .add(StructField("s3SchemaVersion", StringType))
    .add(StructField("configurationId", StringType))
    .add("bucket",  new StructType()
      .add(StructField("name", StringType))
         .add("ownerIdentity", new StructType()
              .add(StructField("principalId", StringType)))
      .add(StructField("arn", StringType)))
    .add("object", new StructType()
         .add(StructField("key",StringType))
         .add(StructField("size", IntegerType))
         .add(StructField("eTag", StringType))
         .add(StructField("sequencer", StringType))
         )
     ))))

val df = spark.readStream
    .format("s3-sqs")
    //.format("rate") // this works
    .option("queueUrl", SQSQueue)
    .option("region",awsRegion)
    .option("awsAccessKey",awsAccessKey)
    .option("fileFormat", "json")
    .schema(sqsSchema)
    //.option("sqsFetchInterval", "1m")
    .load()

df.writeStream
      .queryName("sqs_records")    // this query name will be the table name
      .outputMode("append")
      .format("memory")
      .start()

val records = spark.sql("select * from sqs_records")

> records.count
    res142: Long = 4894

>%sql
    select * from sqs_records

Records
null
null
...
scala apache-spark spark-streaming amazon-sqs databricks
1个回答
0
投票

我遇到了同样的问题,您应用的架构是数据的架构,而不是发送到sqs的s3事件记录。由于代码将s3事件架构应用于您的数据,您将获得null,我确信这些架构不匹配。

© www.soinside.com 2019 - 2024. All rights reserved.