我有一个元组(tuples_list)的列表。我想在这里进行地图操作,但地图操作的一部分,需要从一个相当大的矩阵(矩阵)的信息。没有写入矩阵,只是读取。这是一个SciPy的压缩稀疏行矩阵(csr_matrix)。
因此,地图功能可能看起来是这样的:
def map_function(list_element, matrix):
info = get_element_specific_info_from_matrix(list_element, matrix)
new_element = get_new_element(info)
return new_element
下面是我的代码在做的高度概括:
from pyspark import SparkContext
sc = SparkContext("local", "Process Name")
matrix = ...
tuples_list = ...
...
tuples_list = sc.parallelize(tuples_list)
results_list = tuples_list.map(lambda tup: map_function(tup, matrix))
results_list = results_list.collect() # error happens here
问题是,我一直运行到堆/内存的问题,我怀疑这是因为星火司机正在该矩阵是传递给其工人的副本:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readBroadcastFromFile.
: java.lang.OutOfMemoryError: Java heap space
我不能在矩阵的子集通,很遗憾。没有设想可以什么数据元素从矩阵需要。
我想知道些什么:
谢谢!
总之你不能。但是,为了回答你的问题,一步一步
我如何验证副本是否被提出,矩阵的?
其实有多个副本,在这两个序列化和反序列化形式。由于您使用PySpark串行版本存在,在某些时候,在两个JVM(这就是你的代码没有)和Python的一面。
如果复印后,我怎么能告诉火花,不制作副本?
你不能。 Spark是分布式处理系统及其设计的选择不作共享存储系统上丝毫意义。客人的做法,引起主管固有的重复 - - 工人的建筑特别有被主机间接造成的。最后是在PySpark,其中每个工人使用它自己的过程中引入的进一步孤立。
等等,这还不是全部 - local
模式是一个测试工具,而不是一个生产就绪引擎(更不用说local
is not even parallel)。
还有一些小窗户,可以减少重复 - 通过文件系统发布的数据,并使用内存映射的数据结构,但实际上,只要选择一个工具,它是正确的工作,并能充分利用的资源(非统一内存尤其是修正访问)。 Spark是不是其中之一。