我才刚刚开始学习pyspark,并且正在研究如何使用缓存优化代码。链接cache()命令有意义吗?这是我的代码的样子
token_count_dict = dict(sorted_tokens_rdd.collect())
tokens = list(token_count_dict.keys())
popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
.filter(lambda x: x[1] in tokens)\
.distinct()\
.map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
.reduceByKey(lambda x, y: x+y)\
.map(lambda x: (x[0], a_function(x[1], token_count_dict[x[0][1]])))\
.sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
.map(lambda x: (x[0][0], (x[0][1], x[1])))\
.groupByKey()\
.map(lambda x: [x[0], list(x[1])])
print(popular_tokens.toDebugString().decode("utf-8"))
输出为:
(2) PythonRDD[149] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[148] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[147] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[146] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[145] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
| MapPartitionsRDD[144] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[143] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[142] at sortBy at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[141] at sortBy at <ipython-input-24-d694a6d94459>:5 []
| MapPartitionsRDD[138] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[137] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[136] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[135] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
| MapPartitionsRDD[134] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[133] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[132] at distinct at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[131] at distinct at <ipython-input-24-d694a6d94459>:5 []
| ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
| CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []
根据上述沿袭,我看到几个分支可以从缓存中受益(?)。那么,下面是更好的火花优化实践吗?
根据我所做的研究,共识似乎是沿袭分支的cache()。当我使用%% timeit对两个实现的运行进行计时时,没有区别。
popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
.cache()\
.filter(lambda x: x[1] in tokens)\
.distinct()\
.cache()\
.map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
.cache()\
.reduceByKey(lambda x, y: x+y)\
.map(lambda x: (x[0], get_rel_popularity(x[1], token_count_dict[x[0][1]])))\
.cache()\
.sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
.cache()\
.map(lambda x: (x[0][0], (x[0][1], x[1])))\
.cache()\
.groupByKey()\
.map(lambda x: [x[0], list(x[1])])
输出似乎仍然有很多分支
(2) PythonRDD[130] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[129] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[128] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[127] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[126] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[125] at RDD at PythonRDD.scala:48 []
| PythonRDD[124] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[123] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[122] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[121] at sortBy at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[120] at sortBy at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[117] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 7.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| MapPartitionsRDD[116] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[115] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[114] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[113] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[112] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 193.2 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| PythonRDD[111] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 188.7 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| MapPartitionsRDD[110] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[109] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[108] at distinct at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[107] at distinct at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[106] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 652.0 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
| CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []
感谢帮助菜鸟!
缓存是保存计算还是消耗存储之间的选择。您无法缓存所有内容,因为这将消耗内存和磁盘。内存有限,回读时缓存到磁盘涉及IO。我将建议缓存一个数据帧,该数据帧的构建成本高昂,并且不止一次使用。
如果仅使用一次,那么即使构建起来很昂贵,我也不会缓存它,因为必须将它构建一次才能使用。这就是为什么您看不到性能有任何好处的原因,因为您不重用缓存的数据。
在您的示例中,假设所有东西都被重用,在所有筛选,排序,映射和分组依据发生之后,我将缓存最终结果。