在pyspark中绑定cache()命令?

问题描述 投票:-1回答:1

我才刚刚开始学习pyspark,并且正在研究如何使用缓存优化代码。链接cache()命令有意义吗?这是我的代码的样子

token_count_dict = dict(sorted_tokens_rdd.collect())
tokens = list(token_count_dict.keys())

popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
    .filter(lambda x: x[1] in tokens)\
    .distinct()\
    .map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
    .reduceByKey(lambda x, y: x+y)\
    .map(lambda x: (x[0], a_function(x[1], token_count_dict[x[0][1]])))\
    .sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .groupByKey()\
    .map(lambda x: [x[0], list(x[1])])

print(popular_tokens.toDebugString().decode("utf-8"))

输出为:

(2) PythonRDD[149] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[148] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[147] at partitionBy at <unknown>:0 []
 +-(2) PairwiseRDD[146] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
    |  PythonRDD[145] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
    |  MapPartitionsRDD[144] at mapPartitions at PythonRDD.scala:122 []
    |  ShuffledRDD[143] at partitionBy at <unknown>:0 []
    +-(2) PairwiseRDD[142] at sortBy at <ipython-input-24-d694a6d94459>:5 []
       |  PythonRDD[141] at sortBy at <ipython-input-24-d694a6d94459>:5 []
       |  MapPartitionsRDD[138] at mapPartitions at PythonRDD.scala:122 []
       |  ShuffledRDD[137] at partitionBy at <unknown>:0 []
       +-(2) PairwiseRDD[136] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
          |  PythonRDD[135] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
          |  MapPartitionsRDD[134] at mapPartitions at PythonRDD.scala:122 []
          |  ShuffledRDD[133] at partitionBy at <unknown>:0 []
          +-(2) PairwiseRDD[132] at distinct at <ipython-input-24-d694a6d94459>:5 []
             |  PythonRDD[131] at distinct at <ipython-input-24-d694a6d94459>:5 []
             |  ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
             |      CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

根据上述沿袭,我看到几个分支可以从缓存中受益(?)。那么,下面是更好的火花优化实践吗?

根据我所做的研究,共识似乎是沿袭分支的cache()。当我使用%% timeit对两个实现的运行进行计时时,没有区别。

popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
    .cache()\
    .filter(lambda x: x[1] in tokens)\
    .distinct()\
    .cache()\
    .map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
    .cache()\
    .reduceByKey(lambda x, y: x+y)\
    .map(lambda x: (x[0], get_rel_popularity(x[1], token_count_dict[x[0][1]])))\
    .cache()\
    .sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
    .cache()\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .cache()\
    .groupByKey()\
    .map(lambda x: [x[0], list(x[1])])

输出似乎仍然有很多分支

(2) PythonRDD[130] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[129] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[128] at partitionBy at <unknown>:0 []
 +-(2) PairwiseRDD[127] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
    |  PythonRDD[126] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
    |  PythonRDD[125] at RDD at PythonRDD.scala:48 []
    |  PythonRDD[124] at RDD at PythonRDD.scala:48 []
    |  MapPartitionsRDD[123] at mapPartitions at PythonRDD.scala:122 []
    |  ShuffledRDD[122] at partitionBy at <unknown>:0 []
    +-(2) PairwiseRDD[121] at sortBy at <ipython-input-23-5914874b5d65>:5 []
       |  PythonRDD[120] at sortBy at <ipython-input-23-5914874b5d65>:5 []
       |  PythonRDD[117] at RDD at PythonRDD.scala:48 []
       |      CachedPartitions: 2; MemorySize: 7.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
       |  MapPartitionsRDD[116] at mapPartitions at PythonRDD.scala:122 []
       |  ShuffledRDD[115] at partitionBy at <unknown>:0 []
       +-(2) PairwiseRDD[114] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
          |  PythonRDD[113] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
          |  PythonRDD[112] at RDD at PythonRDD.scala:48 []
          |      CachedPartitions: 2; MemorySize: 193.2 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
          |  PythonRDD[111] at RDD at PythonRDD.scala:48 []
          |      CachedPartitions: 2; MemorySize: 188.7 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
          |  MapPartitionsRDD[110] at mapPartitions at PythonRDD.scala:122 []
          |  ShuffledRDD[109] at partitionBy at <unknown>:0 []
          +-(2) PairwiseRDD[108] at distinct at <ipython-input-23-5914874b5d65>:5 []
             |  PythonRDD[107] at distinct at <ipython-input-23-5914874b5d65>:5 []
             |  PythonRDD[106] at RDD at PythonRDD.scala:48 []
             |      CachedPartitions: 2; MemorySize: 652.0 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
             |      CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

感谢帮助菜鸟!

apache-spark caching pyspark rdd
1个回答
0
投票

缓存是保存计算还是消耗存储之间的选择。您无法缓存所有内容,因为这将消耗内存和磁盘。内存有限,回读时缓存到磁盘涉及IO。我将建议缓存一个数据帧,该数据帧的构建成本高昂,并且不止一次使用。

如果仅使用一次,那么即使构建起来很昂贵,我也不会缓存它,因为必须将它构建一次才能使用。这就是为什么您看不到性能有任何好处的原因,因为您不重用缓存的数据。

在您的示例中,假设所有东西都被重用,在所有筛选,排序,映射和分组依据发生之后,我将缓存最终结果。

© www.soinside.com 2019 - 2024. All rights reserved.