如何使用scala从文件夹读取所有json文件时应用过滤器?

问题描述 投票:0回答:1

[我的文件夹中包含多个json文件(first.json,second.json)。使用scala,我将所有jsonfiles数据加载到spark的rdd / dataset,然后对数据应用过滤器。

这里的问题是,如果我们有600个数据,那么我们需要将所有数据加载到rdd / dataset中,然后应用过滤器

寻找一种在我可以从文件夹本身读取而不加载到Spark存储器的同时可以过滤记录的解决方案。

基于blockheight属性完成过滤。

每个文件中的Json结构:

first.json:

[{“ IsFee”:false,“ BlockDateTime”:“ 2015-10-14T09:02:46”,“ Address”:“ 0xe8fdc802e721426e0422d18d371ab59a41ddaeac”,“ BlockHeight”:381859,“ Type”:“ IN”,“ Value “:0.61609232637203584,” TransactionHash“:” 0xe6fc01ff633b4170e0c8f2df7db717e0608f8aaf62e6fbf65232a7009b53da4e“,” UserName“:null,” ProjectName“:null,” CreatedUser“:null,” Id“:0:” -26T22:32:45.2686137 + 05:30“,” UpdatedUserId“:0:” UpdatedTime“:” 2019-08-26T22:32:45.2696126 + 05:30“},{” IsFee“:false,” BlockDateTime“: “ 2015-10-14T09:02:46”,“地址”:“ 0x52bc44d5378309ee2abf1539bf71de1b7d7be3b5”,“ BlockHeight”:381859,“类型”:“ OUT”,“值”:-0.61609232637203584,“ TransactionHash”:“ 0xe6f6f6a8f6a6f6a6f6a6f6a6f6a6f6a6f6a6e6f6e6f6a6a6e6e6f8a6e6e6f8a6e6e6f8a6a6e6f6a6a6a6e6f6a6a6a6a6a6a6e6f6a6e6e6f6a6e6a6e6e6e6f6a6e6a6e6f6a6f6a6e6fa6f6a6e6f6a6fb6bfbbbbbbbbb1b2b2b1b1b1b1b1b1b;网域; UserName“:空,” ProjectName“:空,” CreatedUser“:空,” Id“:0,” CreatedUserId“:0,” CreatedTime“:” 2019-08-26T22:32:45.3141203 + 05:30“,” UpdatedUserId“:0,” UpdatedTime“:” 2019-08-26T22:32:45.3141203 + 05:30“}]

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

object BalanceAndTransactionDownload {
    def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("xxx").getOrCreate()

    val currencyDataSchema = StructType(Array(
        StructField("Type", StringType, true),
        StructField("TransactionHash", StringType, true),
        StructField("BlockHeight", LongType, true),
        StructField("BlockDateTime", TimestampType, true),
        StructField("Value", DecimalType(38, 18), true),
        StructField("Address", StringType, true),
        StructField("IsFee", BooleanType, true)
    ))

    val projectAddressesFile = args(0)
    val blockJSONFilesContainer = args(1)
    val balanceFolderName = args(2)
    val downloadFolderName = args(3)
    val blockHeight = args(4)
    val projectAddresses = spark.read.option("multiline", "true").json(projectAddressesFile)
    val currencyDataFile = spark.read.option("multiline", "true").schema(currencyDataSchema).json(blockJSONFilesContainer) // This is where i want to filter out the data
    val filteredcurrencyData = currencyDataFile.filter(currencyDataFile("BlockHeight") <= blockHeight)
    filteredcurrencyData.join(projectAddresses, filteredcurrencyData("Address") === projectAddresses("address")).groupBy(projectAddresses("address")).agg(sum("Value").alias("Value")).repartition(1).write.option("header", "true").format("com.databricks.spark.csv").csv(balanceFolderName)
    filteredcurrencyData.join(projectAddresses, filteredcurrencyData("Address") === projectAddresses("address")).drop(projectAddresses("address")).drop(projectAddresses("CurrencyId")).drop(projectAddresses("Id")).repartition(1).write.option("header", "true").format("com.databricks.spark.csv").csv(downloadFolderName)
    }
}
apache-spark-2.0
1个回答
0
投票

位于数据存储区上的文件应进行分区。您似乎按块高度过滤。因此,您可以拥有多个文件夹,例如:blockheight=1blockheight=2等。并且在这些文件夹中包含json文件。在这种情况下,spark不会读取所有json文件,而是扫描所需的文件夹。

© www.soinside.com 2019 - 2024. All rights reserved.