我在一个hdfs位置中有大约20K实木复合地板格式的JSON文件。我的工作是流式传输位置并读取数据帧中的所有文件,然后在另一个hdfs位置中写入相同的文件。
有人可以建议我该怎么做。我正在使用Azure Databricks平台和pyspark来完成此任务。
我不确定您是否要以“流式”方式或“批处理”方式进行操作。但是,您可以使用流功能来做到这一点,并一次触发作业。
(spark
.readStream # Read data as streaming
.schema(USER_SCHEMA) # For streaming, you must provide the input schema of data
.format("parquet")
.load(PARQUET_ORIGIN_LOCATION)
.writeStream
.format("delta")
.option("path", PARQUET_DESTINATION_LOCATION + 'data/') # Where to store the data
.option("checkpointLocation", PARQUET_DESTINATION_LOCATION + 'checkpoint/') # The check point location
.option("overwriteSchema", True) # Allows the schema to be overwritten
.queryName(QUERY_NAME) # Name of the query
.trigger(once=True) # For Batch Processing
.start()
)