Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB

问题描述 投票:0回答:3

我希望我的 Spark 应用程序从 DynamoDB 读取表,执行操作,然后将结果写入 DynamoDB。

将表读入 DataFrame

现在,我可以将表从 DynamoDB 作为

hadoopRDD
读取到 Spark 中,并将其转换为 DataFrame。但是,我必须使用正则表达式从
AttributeValue
中提取值。有更好/更优雅的方式吗?在 AWS API 中找不到任何内容。

package main.scala.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap

import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable

object Tester {

  // {S: 298905396168806365,} 
  def extractValue : (String => String) = (aws:String) => {
    val pat_value = "\\s(.*),".r

    val matcher = pat_value.findFirstMatchIn(aws)
                matcher match {
                case Some(number) => number.group(1).toString
                case None => ""
        }
  }


   def main(args: Array[String]) {
    val spark = SparkSession.builder().getOrCreate()
    val sparkContext = spark.sparkContext

      import spark.implicits._

      // UDF to extract Value from AttributeValue 
      val col_extractValue = udf(extractValue)

  // Configure connection to DynamoDB
  var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
      jobConf_add.set("dynamodb.input.tableName", "MyTable")
      jobConf_add.set("dynamodb.output.tableName", "MyTable")
      jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
      jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")


      // org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
      var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

      // Convert HadoopRDD to RDD
      val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
      case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
      }

      // Convert RDD to DataFrame and extract Values from AttributeValue
      val df_add = rdd_add.toDF()
                  .withColumn("PIN", col_extractValue($"_1"))
                  .withColumn("Address", col_extractValue($"_2"))
                  .select("PIN","Address")
   }
}

将 DataFrame 写入 DynamoDB

stackoverflow 和其他地方的许多答案仅指向 博客文章emr-dynamodb-hadoop github。这些资源都没有实际演示如何写入 DynamoDB。

我尝试将我的

DataFrame
转换为
RDD[Row]
,但没有成功。

df_add.rdd.saveAsHadoopDataset(jobConf_add)

将此 DataFrame 写入 DynamoDB 的步骤是什么? (如果你告诉我如何控制

overwrite
vs
putItem
,我会得到奖励积分;)

注意:

df_add
与 DynamoDB 中的
MyTable
具有相同的架构。

编辑:我遵循这个答案的建议,它指向这篇关于使用 Spark SQL for ETL

// Format table to DynamoDB format
  val output_rdd =  df_add.as[(String,String)].rdd.map(a => {
    var ddbMap = new HashMap[String, AttributeValue]()

    // Field PIN
    var PINValue = new AttributeValue() // New AttributeValue
    PINValue.setS(a._1)                 // Set value of Attribute as String. First element of tuple
    ddbMap.put("PIN", PINValue)         // Add to HashMap

    // Field Address
    var AddValue = new AttributeValue() // New AttributeValue
    AddValue.setS(a._2)                 // Set value of Attribute as String
    ddbMap.put("Address", AddValue)     // Add to HashMap

    var item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
  })             

  output_rdd.saveAsHadoopDataset(jobConf_add) 

但是,尽管遵循了文档,现在我还是得到了

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text
......你有什么建议吗?

编辑 2:更仔细地阅读这篇关于 Using Spark SQL for ETL:

的文章

获得 DataFrame 后,执行转换以获得与 DynamoDB 自定义输出格式知道如何编写的类型相匹配的 RDD。自定义输出格式需要一个包含 Text 和

DynamoDBItemWritable
类型的元组。

考虑到这一点,下面的代码正是 AWS 博客文章所建议的,除了我将

output_df
转换为 rdd,否则
saveAsHadoopDataset
不起作用。现在,我得到了
Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
。我已经束手无策了!

      // Format table to DynamoDB format
  val output_df =  df_add.map(a => {
    var ddbMap = new HashMap[String, AttributeValue]()

    // Field PIN
    var PINValue = new AttributeValue() // New AttributeValue
    PINValue.setS(a.get(0).toString())                 // Set value of Attribute as String
    ddbMap.put("PIN", PINValue)         // Add to HashMap

    // Field Address
    var AddValue = new AttributeValue() // New AttributeValue
    AddValue.setS(a.get(1).toString())                 // Set value of Attribute as String
    ddbMap.put("Address", AddValue)     // Add to HashMap

    var item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
  })             

  output_df.rdd.saveAsHadoopDataset(jobConf_add)   
scala apache-spark amazon-dynamodb amazon-emr
3个回答
7
投票

我跟踪了“Using Spark SQL for ETL”链接,发现了相同的“非法循环引用”异常。 该异常的解决方案非常简单(但我花了 2 天时间才弄清楚),如下所示。关键点是在数据框的 RDD 上使用映射函数,而不是数据框本身。

val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "<myTableName>")
ddbConf.set("dynamodb.throughput.write.percent", "1.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")


val df_ddb =  spark.read.option("header","true").parquet("<myInputFile>")
val schema_ddb = df_ddb.dtypes

var ddbInsertFormattedRDD = df_ddb.rdd.map(a => {
    val ddbMap = new HashMap[String, AttributeValue]()

    for (i <- 0 to schema_ddb.length - 1) {
        val value = a.get(i)
        if (value != null) {
            val att = new AttributeValue()
            att.setS(value.toString)
            ddbMap.put(schema_ddb(i)._1, att)
        }
    }

    val item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
}
)

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)

0
投票

希望聚会还不算太晚,节省别人的时间和精力。我一直在使用 emr-dynamodb-connector 来解决 Spark -> dynamodb 的问题(主要是依赖项冲突问题,例如缺少类或方法),我花了几个小时才发现我需要包含

aws-java-sdk-dynamodb
包由于其他 aws 包的版本限制,在我的 pom.xml 中。 AWS 发布了一个关于它的doc,但我发现它非常肤浅(步骤 3 缺少一些变量声明,而步骤 2 在我的情况下是不必要的)并且它没有向您展示如何选择与 Spark 和 Hadoop 兼容的版本)。


-2
投票

这是一个更简单的工作示例。

例如,使用 Hadoop RDD 从 Kinesis Stream 写入 DynamoDB:-

https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/part11/kinesis/consumer/KinesisSaveAsHadoopDataSet/TransactionConsumerDstreamToDynamoDBHadoopDataSet.scala

用于使用 Hadoop RDD 和不使用正则表达式的 Spark SQL 从 DynamoDB 读取数据。

val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
    //ddbConf.set("dynamodb.output.tableName", "student")
    ddbConf.set("dynamodb.input.tableName", "student")
    ddbConf.set("dynamodb.throughput.write.percent", "1.5")
    ddbConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
    ddbConf.set("dynamodb.regionid", "us-east-1")
    ddbConf.set("dynamodb.servicename", "dynamodb")
    ddbConf.set("dynamodb.throughput.read", "1")
    ddbConf.set("dynamodb.throughput.read.percent", "1")
    ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
    //ddbConf.set("dynamodb.awsAccessKeyId", credentials.getAWSAccessKeyId)
    //ddbConf.set("dynamodb.awsSecretAccessKey", credentials.getAWSSecretKey)


val data = spark.sparkContext.hadoopRDD(ddbConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

val simple2: RDD[(String)] = data.map { case (text, dbwritable) => (dbwritable.toString)}

spark.read.json(simple2).registerTempTable("gooddata")

spark.sql("select replace(replace(split(cast(address as string),',')[0],']',''),'[','') as housenumber from gooddata").show(false)

© www.soinside.com 2019 - 2024. All rights reserved.