使用pyspark RDD分割出错误的csv文件。 EMR。纱线内存异常错误

问题描述 投票:0回答:1

我已经在这段代码上工作了一段时间。以下列出了我在EMR上使用的代码和大多数群集属性。该代码的目的是基于一些基本迭代将某些csv文件在某个行号处分成两部分(我在下面的代码中包括了一个简单的分割)。

我经常收到此错误“ Container killed by YARN for exceeding memory limits”,并遵循了以下设计原则(下面的链接)来解决,但我只是不知道为什么这会导致内存问题。我有超过22GB的纱线开销,文件的大小在MB到个位数GB范围内。

我有时使用r5a.12xlarges无济于事。我只是真的没有在此代码中看到任何形式的内存泄漏。似乎也很慢,只能在16个小时内将大约20GB的内容输出到S3。这是并行化此拆分操作的好方法吗?有内存泄漏吗?有什么用?

https://aws.amazon.com/premiumsupport/knowledge-center/emr-spark-yarn-memory-limit/

[
    {
        "Classification": "spark",
        "Properties": {
            "spark.maximizeResourceAllocation": "true"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.yarn.executor.memoryOverheadFactor":".2"
        }
    },
    {
        "Classification": "spark-env",
        "Configurations": [
            {
                "Configurations": [],
                "Properties": {
                    "PYSPARK_PYTHON": "python36"
                },
                "Classification": "export"
            }
        ],
        "Properties": {
        }
    }
]
   def writetxt(txt: Union[List[str], pandas.DataFrame], path: str) -> None:
        s3 = boto3.resource('s3')
        s3path = S3Url(path)
        object = s3.Object(s3path.bucket, s3path.key)
        if isinstance(txt, pandas.DataFrame):
            csv_buffer = StringIO()
            txt.to_csv(csv_buffer)
            object.put(Body=csv_buffer.getvalue())
        else:
            object.put(Body='\n'.join(txt).encode())

    def main(
            x: Iterator[Tuple[str, str]],
            output_files: str
    ) -> None:
        filename, content = x
        filename = os.path.basename(S3Url(filename).key)
        content = content.splitlines()

        # Split the csv file
        columnAttributes, csvData = data[:100], data[100:]

        writetxt(csvData, os.path.join(output_files, 'data.csv', filename))
        writetxt(columnAttributes, os.path.join(output_files, 'attr.csv', filename))


    if __name__ == "__main__":
        parser = argparse.ArgumentParser(description='Split some mishapen csv files.')
        parser.add_argument('input_files', type=str,
                            help='The location of the input files.')
        parser.add_argument('output_files', type=str,
                            help='The location to put the output files.')
        parser.add_argument('--nb_partitions', type=int, default=4)
        args = parser.parse_args()

        # creating the context
        sc = SparkContext(appName="Broadcom Preprocessing")

        # We use minPartitions because otherwise small files get put in the same partition together
        # by default, which we have a lot of
        # We use foreachPartition to reduce the number of function calls, which slow down spark
        distFiles = sc.wholeTextFiles(args.input_files, minPartitions=args.nb_partitions) \
            .foreach(partial(main, output_files=args.output_files))
apache-spark pyspark rdd
1个回答
0
投票

我认为您的内存问题是因为您正在使用Python代码进行实际的数据拆分。 Spark进程在JVM中运行,但是当您调用自定义Python代码时,相关数据必须序列化到Python进程(在每个工作节点上)才能执行。这增加了很多开销。我相信您可以通过Spark操作完全完成您要尝试做的事情-意味着最终程序将完全在基于JVM的Spark进程中运行。

尝试这样的事情:

from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import *

input_path = "..."

split_num = 100

# load filenames & contents
filesDF = spark.createDataFrame( sc.wholeTextFiles(input_path), ['filename','contents'] )

# break into individual lines & number them
linesDF = filesDF.select( "filename", \
                          row_number().over(Window.partitionBy("filename").orderBy("filename")).alias("line_number"), \
                          explode(split(col("contents"), "\n")).alias("contents") )

# split into headers & body
headersDF = linesDF.where(col("line_number") == lit(1))
bodyDF = linesDF.where(col("line_number") > lit(1))

# split the body in 2 based
splitLinesDF = bodyDF.withColumn("split", when(col("line_number") < lit(split_num), 0).otherwise(1))
split_0_DF = splitLinesDF.where(col("split") == lit(0)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")
split_1_DF = splitLinesDF.where(col("split") == lit(1)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")

# collapse all lines back down into a file
firstDF = split_0_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))
secondDF = split_1_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))

# pandas-UDF for more memory-efficient transfer of data from Spark to Python
@pandas_udf(returnType=IntegerType())
def writeFile( filename, contents ):
  <save to S3 here>

# write each row to a file
firstDF.select( writeFile( col("filename"), col("contents") ) )
secondDF.select( writeFile( col("filename"), col("contents") ) )

最后,您需要使用一些自定义python代码将每个拆分文件保存到S3(或者,您可以只使用Scala / Java编写所有代码)。通过熊猫UDF执行此操作比将标准python函数传递给.foreach(...)效率要高得多。在内部,spark会将数据按块序列化为Arrow格式(每个分区一个),这将非常有效。

此外,您似乎正在尝试在单个请求中将整个对象放入S3。如果数据太大,它将失败。您应该查看S3流媒体上传功能。

© www.soinside.com 2019 - 2024. All rights reserved.