在 Apache Spark 中,有没有办法强制 DataFrame 在特定节点上执行?

问题描述 投票:0回答:1

首先,让我描述一下我的设置。 我有两台通过以太网连接的电脑。 PC A 同时执行主节点和工作节点功能,而 PC B 仅作为工作节点运行。 由于某些限制,我无法使用 HDFS、HBase 或数据库等分布式存储系统。因此,我需要直接从本地文件夹(/root/data)创建DataFrame, A 和 B 都有。

A 在 /root/data 中有从 1.txt 到 2000.txt 的文件, B 在/root/data 中有从2001.txt 到4000.txt 的文件。 我的任务包括使用 Pandas_udf 计算每个文本文件中字符的出现次数。 生成的 DataFrame 应包含 4000 行,其中字符数作为列值。重要的是,文件 1 到 2000.txt 必须在 A 上处理,而文件 2001 到 4000.txt 必须在 B 上处理。 下面是我的代码。

spark = SparkSession.builder.config(conf=conf).appName("wordcount").getOrCreate()

file_paths = [f"/root/data/{i}.txt" for i in range(1, 4001)]

data = [(path,) for path in file_paths]
df = spark.createDataFrame(data, ["path"])

df = df.withColumn("count", my_udf_wordCount(df['path']))

save_path = "/root/data/result"
df.write.format('com.databricks.spark.csv') \
        .mode('overwrite') \
        .option("header", "true") \
        .save(save_path)

@pandas_udf(IntegerType())
def my_udf_wordCount(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for path_series in iterator:
        for path in path_series:
            file_path = path.strip()
            with open(file_path, 'r') as file:
                text = file.read()
                words = text.split()
                word_count = len(words)
                yield pd.Series(word_count)

当然,如果我提交这样的代码, 它会导致错误,因为对 1 到 4000.txt 的处理会混合到节点 A 和 B 中。 当 4000 个请求在 A 和 B 之间混合时,我会遇到错误, (例如, /root/data/2001~4000.txt 路径之一发送到节点 A 或 1~2001.txt 路径发送到 B)。

我想要的是根据“路径”列将任务分发到节点, 它对应于 4000 个 DataFrame 中每个的文件。 具有“/root/data/2.txt”的任务应分发到节点 A,而具有“/root/data/3000.txt”的任务应发送到节点 B。 如何修改代码来实现这一点?

此外, 从 1 到 4000 的任务都应该从单个 DataFrame 执行, 并且任务必须作为包含 Node A 和 Node B 的集群提交。没有单独的选项可以为 Node A 或 Node B 提交作业。关于容错的部分可以暂时忽略。

如何将任务分发到DataFrame中“Path”对应路径下有文件的节点?

apache-spark pyspark networking distributed-computing
1个回答
0
投票

尝试使用 Scala 的 SparkContext.makeRDD() 创建一个由 (file_name, node_host) 元组组成的 RDD,并使用 RDD.mapPartitions() 函数处理文件。

# processes a single file, returns an iterator with a single (path, count) tuple.
def word_count(iterator):
    path = next(iterator)
    with open(path) as f:
       ...

# creates a partition for each row/path and assignes a preferred location to it. 
count_by_path = sc._jsc.makeRDD([("/root/data/1.txt", "host_1"), ... , ("/root/data/4000.txt", "host_2")])
        .mapPartitions(word_count)

spark.createDataFrame(count_by_path, ['path', 'count'])
    .write
    . ...

请参阅下面的资源以了解更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.