我的数据位于 Azure Data Lake 内的文件夹和子文件夹中。每条数据都带有一个文件名,在ADLS中,我们可以查看修改时间。现在,我正在计算特定时间范围内存储的数据的总存储大小。如何访问该特定时间范围内的文件并计算总数据大小?
在提供您想要获取大小的范围的开始时间和结束时间后,下面的代码应该可以工作 -
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Set up Spark session
spark = SparkSession.builder.appName("FileCount").getOrCreate()
# ADLS Gen2 storage account details
account_name = "<account-name"
container_name = "<container_name>"
relative_path = "ADL_STG_NEW/attrep_change*"
# Define the start and end timestamps
start_timestamp = datetime.strptime("2023-11-16 00:00:00", "%Y-%m-%d %H:%M:%S")
end_timestamp = datetime.strptime("2023-11-17 00:00:00", "%Y-%m-%d %H:%M:%S")
# Convert timestamps to milliseconds for comparison
start_timestamp_ms = int(start_timestamp.timestamp()) * 1000
end_timestamp_ms = int(end_timestamp.timestamp()) * 1000
print(start_timestamp_ms)
print(end_timestamp_ms)
# ADLS Gen2 path
adls_base_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{relative_path}"
def get_dir_content(ls_path):
for dir_path in dbutils.fs.ls(ls_path):
if dir_path.isDir() and ls_path != dir_path.path:
yield dir_path.path
list(get_dir_content(adls_base_path))
for adls_path in list(get_dir_content(adls_base_path)):
file_list = dbutils.fs.ls(adls_path)
filtered_files = [
(file.name, file.modificationTime, file.size) for file in file_list
if start_timestamp_ms <= file.modificationTime <= end_timestamp_ms
]
file_count = len(filtered_files)
total_size = sum(file_info[2] for file_info in filtered_files)
total_size = total_size/(1024*1024)
print(f"{adls_path},{total_size},{file_count}")
start_timestamp_ms = int(start_timestamp.timestamp()) * 1000
end_timestamp_ms = int(end_timestamp.timestamp()) * 1000
file_list = dbutils.fs.ls(adls_path)
filtered_files = [
(file.name, file.modificationTime, file.size) for file in file_list
if start_timestamp_ms <= file.modificationTime <= end_timestamp_ms
]
file_count = len(filtered_files)
print(f"Number of files created between {start_timestamp} and {end_timestamp}: {file_count}")
total_size = sum(file_info[2] for file_info in filtered_files)
total_size = total_size/(1024*1024)
print(f"{relative_path},{total_size},{file_count}")
print(f"Total size of files: {total_size} MegaBytes")