如何使用 PySpark 计算 ADLS 中的目录大小?

问题描述 投票:0回答:6

我想计算包含子文件夹和子文件的目录(例如 XYZ)大小。 我想要所有文件和 XYZ 内所有内容的总大小。

我可以找到特定路径内的所有文件夹。但我想要所有的尺寸在一起。 我也看到了

display(dbutils.fs.ls("/mnt/datalake/.../XYZ/.../abc.parquet"))

给我 abc 文件的数据大小。 但我想要 XYZ 的完整尺寸。

python apache-spark pyspark databricks azure-databricks
6个回答
6
投票

dbutils.fs.ls
没有像
cp
mv
rm
那样的递归功能。因此,你需要迭代自己。这是一个可以为您完成任务的片段。从 Databricks Notebook 运行代码。

from dbutils import FileInfo
from typing import List

root_path = "/mnt/datalake/.../XYZ"

def discover_size(path: str, verbose: bool = True):
  def loop_path(paths: List[FileInfo], accum_size: float):
    if not paths:
      return accum_size
    else:
      head, tail = paths[0], paths[1:]
      if head.size > 0:
        if verbose:
          print(f"{head.path}: {head.size / 1e6} MB")
        accum_size += head.size / 1e6
        return loop_path(tail, accum_size)
      else:
        extended_tail = dbutils.fs.ls(head.path) + tail
        return loop_path(extended_tail, accum_size)

  return loop_path(dbutils.fs.ls(path), 0.0)

discover_size(root_path, verbose=True)  # Total size in megabytes at the end

如果该位置已安装在 dbfs 中。然后你可以使用

du -h
方法(尚未测试)。如果您在笔记本中,请使用以下命令创建一个新单元格:

%sh
du -h /mnt/datalake/.../XYZ

2
投票

@Emer 的答案很好,但很快就会得到

RecursionError: maximum recursion depth exceeded
,因为它对每个文件进行递归(如果你有 X 个文件,你将有 X 个叠瓦式递归)。

这里与仅针对文件夹的递归相同:

%python
from dbutils import FileInfo
from typing import List

def discover_size2(path: str, verbose: bool = True):
  def loop_path(path: str):
    accum_size = 0.0
    path_list = dbutils.fs.ls(path)
    if path_list:
      for path_object in path_list:
        if path_object.size > 0:
          if verbose:
            print(f"{path_object.path}: {path_object.size / 1e6} MB")
          accum_size += path_object.size / 1e6
        else:
          # Folder: recursive discovery
          accum_size += loop_path(path_object.path)
    return accum_size

  return loop_path(path)

2
投票

对于任何仍然使用 @robin loche 的方法达到递归限制的人,这是一个纯粹的迭代答案:

# from dbutils import FileInfo # Not required in databricks
# from dbruntime.dbutils import FileInfo # may work for some people

def get_size_of_path(path):
    return sum([file.size for file in get_all_files_in_path(path)])

def get_all_files_in_path(path, verbose=False):
    nodes_new = []

    nodes_new = dbutils.fs.ls(path)
    files = []

    while len(nodes_new) > 0:
        current_nodes = nodes_new
        nodes_new = []
        for node in current_nodes:
            if verbose:
                print(f"Processing {node.path}")
            children = dbutils.fs.ls(node.path)
            for child in children:
                if child.size == 0 and child.path != node.path:
                    nodes_new.append(child)
                elif child.path != node.path:
                    files.append(child)
    return files

path = "s3://some/path/"

print(f"Size of {path} in gb: {get_size_of_path(path) / 1024 / 1024 / 1024}")

1
投票

尝试使用 dbutils ls 命令,获取数据框中的文件列表,并在大小列上使用聚合函数 SUM() 进行查询:

val fsds = dbutils.fs.ls("/mnt/datalake/.../XYZ/.../abc.parquet").toDF

fsds.createOrReplaceTempView("filesList")

display(spark.sql("select COUNT(name) as NoOfRows, SUM(size) as sizeInBytes from fileListPROD"))

0
投票

喜欢埃默的答案

小补充:

如果遇见

“ModuleNotFoundError:没有名为“dbutils”的模块”

尝试这个

from dbruntime.dbutils
而不是
from dbutils
。这对我有用!

--时正


0
投票

@Emer 的修改版本对我有用:

from dbruntime.dbutils import FileInfo
from typing import List, Set

def calculate_size_mb(path: str, verbose: bool = True):
    visited_paths = set()

    def loop_path(paths: List[FileInfo], accum_size: float):
        nonlocal visited_paths

        if not paths:
            return accum_size
        else:
            head, tail = paths[0], paths[1:]
            if head.path not in visited_paths:
                visited_paths.add(head.path)
                if head.size > 0:
                    if verbose:
                        print(f"{head.path}: {head.size / 1e6} MB")
                    accum_size += head.size / 1e6
                extended_tail = dbutils.fs.ls(head.path) + tail
                return loop_path(extended_tail, accum_size)
            else:
                # If the path has already been visited, skip it
                return loop_path(tail, accum_size)

    return loop_path(dbutils.fs.ls(path), 0.0)

delta_path = "dbfs:/FileStore/sales3-10m.delta"
calculate_size_mb(delta_path, verbose=True)
© www.soinside.com 2019 - 2024. All rights reserved.