[我的文件夹中包含多个json文件(first.json,second.json)。使用scala,我将所有jsonfiles数据加载到spark的rdd / dataset,然后对数据应用过滤器。
这里的问题是,如果我们有600个数据,那么我们需要将所有数据加载到rdd / dataset中,然后应用过滤器
寻找一种在我可以从文件夹本身读取而不加载到Spark存储器的同时可以过滤记录的解决方案。
基于blockheight属性完成过滤。
每个文件中的Json结构:
[{“ IsFee”:false,“ BlockDateTime”:“ 2015-10-14T09:02:46”,“ Address”:“ 0xe8fdc802e721426e0422d18d371ab59a41ddaeac”,“ BlockHeight”:381859,“ Type”:“ IN”,“ Value “:0.61609232637203584,” TransactionHash“:” 0xe6fc01ff633b4170e0c8f2df7db717e0608f8aaf62e6fbf65232a7009b53da4e“,” UserName“:null,” ProjectName“:null,” CreatedUser“:null,” Id“:0:” -26T22:32:45.2686137 + 05:30“,” UpdatedUserId“:0:” UpdatedTime“:” 2019-08-26T22:32:45.2696126 + 05:30“},{” IsFee“:false,” BlockDateTime“: “ 2015-10-14T09:02:46”,“地址”:“ 0x52bc44d5378309ee2abf1539bf71de1b7d7be3b5”,“ BlockHeight”:381859,“类型”:“ OUT”,“值”:-0.61609232637203584,“ TransactionHash”:“ 0xe6f6f6a8f6a6f6a6f6a6f6a6f6a6f6a6f6a6e6f6e6f6a6a6e6e6f8a6e6e6f8a6e6e6f8a6a6e6f6a6a6a6e6f6a6a6a6a6a6a6e6f6a6e6e6f6a6e6a6e6e6e6f6a6e6a6e6f6a6f6a6e6fa6f6a6e6f6a6fb6bfbbbbbbbbb1b2b2b1b1b1b1b1b1b;网域; UserName“:空,” ProjectName“:空,” CreatedUser“:空,” Id“:0,” CreatedUserId“:0,” CreatedTime“:” 2019-08-26T22:32:45.3141203 + 05:30“,” UpdatedUserId“:0,” UpdatedTime“:” 2019-08-26T22:32:45.3141203 + 05:30“}]
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
object BalanceAndTransactionDownload {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("xxx").getOrCreate()
val currencyDataSchema = StructType(Array(
StructField("Type", StringType, true),
StructField("TransactionHash", StringType, true),
StructField("BlockHeight", LongType, true),
StructField("BlockDateTime", TimestampType, true),
StructField("Value", DecimalType(38, 18), true),
StructField("Address", StringType, true),
StructField("IsFee", BooleanType, true)
))
val projectAddressesFile = args(0)
val blockJSONFilesContainer = args(1)
val balanceFolderName = args(2)
val downloadFolderName = args(3)
val blockHeight = args(4)
val projectAddresses = spark.read.option("multiline", "true").json(projectAddressesFile)
val currencyDataFile = spark.read.option("multiline", "true").schema(currencyDataSchema).json(blockJSONFilesContainer) // This is where i want to filter out the data
val filteredcurrencyData = currencyDataFile.filter(currencyDataFile("BlockHeight") <= blockHeight)
filteredcurrencyData.join(projectAddresses, filteredcurrencyData("Address") === projectAddresses("address")).groupBy(projectAddresses("address")).agg(sum("Value").alias("Value")).repartition(1).write.option("header", "true").format("com.databricks.spark.csv").csv(balanceFolderName)
filteredcurrencyData.join(projectAddresses, filteredcurrencyData("Address") === projectAddresses("address")).drop(projectAddresses("address")).drop(projectAddresses("CurrencyId")).drop(projectAddresses("Id")).repartition(1).write.option("header", "true").format("com.databricks.spark.csv").csv(downloadFolderName)
}
}
位于数据存储区上的文件应进行分区。您似乎按块高度过滤。因此,您可以拥有多个文件夹,例如:blockheight=1
,blockheight=2
等。并且在这些文件夹中包含json文件。在这种情况下,spark不会读取所有json文件,而是扫描所需的文件夹。