我已经在这段代码上工作了一段时间。以下列出了我在EMR上使用的代码和大多数群集属性。该代码的目的是基于一些基本迭代将某些csv文件在某个行号处分成两部分(我在下面的代码中包括了一个简单的分割)。
我经常收到此错误“ Container killed by YARN for exceeding memory limits
”,并遵循了以下设计原则(下面的链接)来解决,但我只是不知道为什么这会导致内存问题。我有超过22GB的纱线开销,文件的大小在MB到个位数GB范围内。
我有时使用r5a.12xlarges无济于事。我只是真的没有在此代码中看到任何形式的内存泄漏。似乎也很慢,只能在16个小时内将大约20GB的内容输出到S3。这是并行化此拆分操作的好方法吗?有内存泄漏吗?有什么用?
https://aws.amazon.com/premiumsupport/knowledge-center/emr-spark-yarn-memory-limit/
[
{
"Classification": "spark",
"Properties": {
"spark.maximizeResourceAllocation": "true"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.yarn.executor.memoryOverheadFactor":".2"
}
},
{
"Classification": "spark-env",
"Configurations": [
{
"Configurations": [],
"Properties": {
"PYSPARK_PYTHON": "python36"
},
"Classification": "export"
}
],
"Properties": {
}
}
]
def writetxt(txt: Union[List[str], pandas.DataFrame], path: str) -> None:
s3 = boto3.resource('s3')
s3path = S3Url(path)
object = s3.Object(s3path.bucket, s3path.key)
if isinstance(txt, pandas.DataFrame):
csv_buffer = StringIO()
txt.to_csv(csv_buffer)
object.put(Body=csv_buffer.getvalue())
else:
object.put(Body='\n'.join(txt).encode())
def main(
x: Iterator[Tuple[str, str]],
output_files: str
) -> None:
filename, content = x
filename = os.path.basename(S3Url(filename).key)
content = content.splitlines()
# Split the csv file
columnAttributes, csvData = data[:100], data[100:]
writetxt(csvData, os.path.join(output_files, 'data.csv', filename))
writetxt(columnAttributes, os.path.join(output_files, 'attr.csv', filename))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Split some mishapen csv files.')
parser.add_argument('input_files', type=str,
help='The location of the input files.')
parser.add_argument('output_files', type=str,
help='The location to put the output files.')
parser.add_argument('--nb_partitions', type=int, default=4)
args = parser.parse_args()
# creating the context
sc = SparkContext(appName="Broadcom Preprocessing")
# We use minPartitions because otherwise small files get put in the same partition together
# by default, which we have a lot of
# We use foreachPartition to reduce the number of function calls, which slow down spark
distFiles = sc.wholeTextFiles(args.input_files, minPartitions=args.nb_partitions) \
.foreach(partial(main, output_files=args.output_files))
我认为您的内存问题是因为您正在使用Python代码进行实际的数据拆分。 Spark进程在JVM中运行,但是当您调用自定义Python代码时,相关数据必须序列化到Python进程(在每个工作节点上)才能执行。这增加了很多开销。我相信您可以通过Spark操作完全完成您要尝试做的事情-意味着最终程序将完全在基于JVM的Spark进程中运行。
尝试这样的事情:
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import *
input_path = "..."
split_num = 100
# load filenames & contents
filesDF = spark.createDataFrame( sc.wholeTextFiles(input_path), ['filename','contents'] )
# break into individual lines & number them
linesDF = filesDF.select( "filename", \
row_number().over(Window.partitionBy("filename").orderBy("filename")).alias("line_number"), \
explode(split(col("contents"), "\n")).alias("contents") )
# split into headers & body
headersDF = linesDF.where(col("line_number") == lit(1))
bodyDF = linesDF.where(col("line_number") > lit(1))
# split the body in 2 based
splitLinesDF = bodyDF.withColumn("split", when(col("line_number") < lit(split_num), 0).otherwise(1))
split_0_DF = splitLinesDF.where(col("split") == lit(0)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")
split_1_DF = splitLinesDF.where(col("split") == lit(1)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")
# collapse all lines back down into a file
firstDF = split_0_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))
secondDF = split_1_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))
# pandas-UDF for more memory-efficient transfer of data from Spark to Python
@pandas_udf(returnType=IntegerType())
def writeFile( filename, contents ):
<save to S3 here>
# write each row to a file
firstDF.select( writeFile( col("filename"), col("contents") ) )
secondDF.select( writeFile( col("filename"), col("contents") ) )
最后,您需要使用一些自定义python代码将每个拆分文件保存到S3(或者,您可以只使用Scala / Java编写所有代码)。通过熊猫UDF执行此操作比将标准python函数传递给.foreach(...)
效率要高得多。在内部,spark会将数据按块序列化为Arrow格式(每个分区一个),这将非常有效。
此外,您似乎正在尝试在单个请求中将整个对象放入S3。如果数据太大,它将失败。您应该查看S3流媒体上传功能。