所有执行者都死掉了EMR群集上的MinHash LSH PySparkroxSimilarityJoin自联接

问题描述 投票:0回答:1

在(name_id,name)组合的数据帧上调用Spark的MinHashLSH的roximatedSimilarityJoin时遇到问题。

我尝试解决的问题摘要:

我有一个大约3000万个公司名称的唯一(name_id,name)组合的数据框。其中一些名称指的是同一家公司,但(i)拼写错误,和/或(ii)包括其他名称。不可能对每个组合执行模糊字符串匹配。为了减少模糊字符串匹配组合的数量,我在Spark中使用MinHashLSH。我打算采用的方法是使用具有相对较大的Jaccard阈值的roximateSimilarityJoin(自连接),这样我就可以对匹配的组合运行模糊匹配算法,以进一步改善歧义消除效果。

我所采取步骤的摘要:

  1. 使用CountVectorizer为每个名称创建字符计数向量,
  2. 使用了MinHashLSH及其近似值,并进行以下设置:
    • numHashTables = 100
    • 阈值= 0.3(Jaccard阈值,近似相似加入)
  3. [大约近似加入之后,我删除重复的组合(对于这些组合,认为存在匹配的组合(i,j)和(j,i),然后删除(j,i))]
  4. 删除重复的组合后,我使用FuzzyWuzzy包运行模糊字符串匹配算法,以减少记录数并改善名称的歧义性。
  5. 最终,我在其余的边(i,j)上运行connectedComponents算法以匹配哪些公司名称属于同一公司。

使用的代码部分:

    id_col = 'id'
    name_col = 'name'
    num_hastables = 100
    max_jaccard = 0.3
    fuzzy_threshold = 90
    fuzzy_method = fuzz.token_set_ratio

    # Calculate edges using minhash practices
    edges = MinHashLSH(inputCol='vectorized_char_lst', outputCol='hashes', numHashTables=num_hastables).\
        fit(data).\
        approxSimilarityJoin(data, data, max_jaccard).\
        select(col('datasetA.'+id_col).alias('src'),
               col('datasetA.clean').alias('src_name'),
               col('datasetB.'+id_col).alias('dst'),
               col('datasetB.clean').alias('dst_name')).\
        withColumn('comb', sort_array(array(*('src', 'dst')))).\
        dropDuplicates(['comb']).\
        rdd.\
        filter(lambda x: fuzzy_method(x['src_name'], x['dst_name']) >= fuzzy_threshold if x['src'] != x['dst'] else False).\
        toDF().\
        drop(*('src_name', 'dst_name', 'comb'))

edges的解释计划] >>

== Physical Plan ==
*(5) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
+- Exchange hashpartitioning(datasetA#232, datasetB#263, 200)
   +- *(4) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
      +- *(4) Project [datasetA#232, datasetB#263]
         +- *(4) BroadcastHashJoin [entry#233, hashValue#234], [entry#264, hashValue#265], Inner, BuildRight, (UDF(datasetA#232.vectorized_char_lst, datasetB#263.vectorized_char_lst) < 0.3)
            :- *(4) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#225) AS datasetA#232, entry#233, hashValue#234]
            :  +- *(4) Filter isnotnull(hashValue#234)
            :     +- Generate posexplode(hashes#225), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#225], false, [entry#233, hashValue#234]
            :        +- *(1) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#225]
            :           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
            :                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
            :                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
            :                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
            :                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
            :                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                   +- Exchange hashpartitioning(name#11, 200)
            :                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
            :                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                            +- Exchange RoundRobinPartitioning(8)
            :                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
            :                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false], input[2, vector, true]))
               +- *(3) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#256) AS datasetB#263, entry#264, hashValue#265]
                  +- *(3) Filter isnotnull(hashValue#265)
                     +- Generate posexplode(hashes#256), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#256], false, [entry#264, hashValue#265]
                        +- *(2) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#256]
                           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
                                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
                                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
                                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
                                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
                                                   +- Exchange hashpartitioning(name#11, 200)
                                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
                                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
                                                            +- Exchange RoundRobinPartitioning(8)
                                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
                                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>

data的外观:

+-------+--------------------+--------------------+--------------------+--------------------+
|     id|                name|               clean|            char_lst| vectorized_char_lst|
+-------+--------------------+--------------------+--------------------+--------------------+
|3633038|MURATA MACHINERY LTD|    MURATA MACHINERY|[M, U, R, A, T, A...|(33,[0,1,2,3,4,5,...|
|3632811|SOCIETE ANONYME D...|SOCIETE ANONYME D...|[S, O, C, I, E, T...|(33,[0,1,2,3,4,5,...|
|3632655|FUJIFILM CORPORATION|            FUJIFILM|[F, U, J, I, F, I...|(33,[3,10,12,13,2...|
|3633318|HEINE OPTOTECHNIK...|HEINE OPTOTECHNIK...|[H, E, I, N, E,  ...|(33,[0,1,2,3,4,5,...|
|3633523|SUNBEAM PRODUCTS INC|    SUNBEAM PRODUCTS|[S, U, N, B, E, A...|(33,[0,1,2,4,5,6,...|
|3633300|           HIVAL LTD|               HIVAL|     [H, I, V, A, L]|(33,[2,3,10,11,21...|
|3632657|             NSK LTD|                 NSK|           [N, S, K]|(33,[5,6,16],[1.0...|
|3633240|REHABILITATION IN...|REHABILITATION IN...|[R, E, H, A, B, I...|(33,[0,1,2,3,4,5,...|
|3632732|STUDIENGESELLSCHA...|STUDIENGESELLSCHA...|[S, T, U, D, I, E...|(33,[0,1,2,3,4,5,...|
|3632866|ENERGY CONVERSION...|ENERGY CONVERSION...|[E, N, E, R, G, Y...|(33,[0,1,3,5,6,7,...|
|3632895|ERGENICS POWER SY...|ERGENICS POWER SY...|[E, R, G, E, N, I...|(33,[0,1,3,4,5,6,...|
|3632897| MOLI ENERGY LIMITED|         MOLI ENERGY|[M, O, L, I,  , E...|(33,[0,1,3,5,7,8,...|
|3633275| NORDSON CORPORATION|             NORDSON|[N, O, R, D, S, O...|(33,[5,6,7,8,14],...|
|3633256|  PEROXIDCHEMIE GMBH|       PEROXIDCHEMIE|[P, E, R, O, X, I...|(33,[0,3,7,8,9,11...|
|3632695|      POWER CELL INC|          POWER CELL|[P, O, W, E, R,  ...|(33,[0,1,7,8,9,10...|
|3633037|        ERGENICS INC|            ERGENICS|[E, R, G, E, N, I...|(33,[0,3,5,6,8,9,...|
|3632878|  FORD MOTOR COMPANY|          FORD MOTOR|[F, O, R, D,  , M...|(33,[1,4,7,8,13,1...|
|3632573|    SAFT AMERICA INC|        SAFT AMERICA|[S, A, F, T,  , A...|(33,[0,1,2,3,4,6,...|
|3632852|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632698|   KRUPPKOPPERS GMBH|        KRUPPKOPPERS|[K, R, U, P, P, K...|(33,[0,6,7,8,12,1...|
|3633150|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632761|AMERICAN TELEPHON...|AMERICAN TELEPHON...|[A, M, E, R, I, C...|(33,[0,1,2,3,4,5,...|
|3632757|HITACHI KOKI COMP...|        HITACHI KOKI|[H, I, T, A, C, H...|(33,[1,2,3,4,7,9,...|
|3632836|HUGHES AIRCRAFT C...|     HUGHES AIRCRAFT|[H, U, G, H, E, S...|(33,[0,1,2,3,4,6,...|
|3633152|            SOSY INC|                SOSY|        [S, O, S, Y]|(33,[6,7,18],[2.0...|
|3633052|HAMAMATSU PHOTONI...|HAMAMATSU PHOTONI...|[H, A, M, A, M, A...|(33,[1,2,3,4,5,6,...|
|3633450|       AKZO NOBEL NV|          AKZO NOBEL|[A, K, Z, O,  , N...|(33,[0,1,2,5,7,10...|
|3632713| ELTRON RESEARCH INC|     ELTRON RESEARCH|[E, L, T, R, O, N...|(33,[0,1,2,4,5,6,...|
|3632533|NEC ELECTRONICS C...|     NEC ELECTRONICS|[N, E, C,  , E, L...|(33,[0,1,3,4,5,6,...|
|3632562| TARGETTI SANKEY SPA| TARGETTI SANKEY SPA|[T, A, R, G, E, T...|(33,[0,1,2,3,4,5,...|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 30 rows

使用的硬件:

  1. 主节点:m5.2xlarge8 vCore,32 GiB内存,仅EBS存储EBS存储空间:128 GiB
  2. 从节点(10x):m5.4x大16 vCore,64 GiB内存,仅EBS存储EBS存储空间:500 GiB
  3. 使用的火花提交设置:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.shuffle.partitions=2000" --conf "spark.executor.cores=4" --conf "spark.executor.memory=14g" --conf "spark.driver.memory=14g" --conf "spark.driver.maxResultSize=14g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py

Web UI中的任务错误

ExecutorLostFailure (executor 21 exited caused by one of the running tasks) Reason: Slave lost
ExecutorLostFailure (executor 31 exited unrelated to the running tasks) Reason: Container marked as failed: container_1590592506722_0001_02_000002 on host: ip-172-31-47-180.eu-central-1.compute.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.

(部分执行者日志:


20/05/27 16:29:09 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:29:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:15 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:17 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (0  time so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:33 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:29:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (1  time so far)
20/05/27 16:29:42 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:46 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:53 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:29:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (2  times so far)
20/05/27 16:30:00 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:30:05 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:10 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:15 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (3  times so far)
20/05/27 16:30:19 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:22 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:29 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:32 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (4  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:30:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (5  times so far)
20/05/27 16:30:55 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:59 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:31:03 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (6  times so far)
20/05/27 16:31:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:31:22 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:24 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (7  times so far)
20/05/27 16:31:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:41 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:31:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (8  times so far)
20/05/27 16:31:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:48 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:32:02 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (9  times so far)
20/05/27 16:32:04 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:08 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:32:19 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:20 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:21 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (10  times so far)
20/05/27 16:32:26 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (11  times so far)
20/05/27 16:32:38 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:45 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:51 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:32:56 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (12  times so far)
20/05/27 16:32:58 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:03 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:33:08 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:13 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (13  times so far)
20/05/27 16:33:15 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:20 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:26 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:31 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (14  times so far)
20/05/27 16:33:36 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:33:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:51 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (15  times so far)
20/05/27 16:33:54 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:34:03 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:04 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:08 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (16  times so far)
20/05/27 16:34:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:34:16 INFO PythonUDFRunner: Times: total = 774701, boot = 3, init = 10, finish = 774688
20/05/27 16:34:21 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (17  times so far)
20/05/27 16:34:30 INFO PythonUDFRunner: Times: total = 773372, boot = 2, init = 9, finish = 773361
20/05/27 16:34:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (18  times so far)
20/05/27 16:34:46 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (19  times so far)
20/05/27 16:35:01 INFO PythonUDFRunner: Times: total = 776905, boot = 3, init = 11, finish = 776891
20/05/27 16:35:05 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (20  times so far)
20/05/27 16:35:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (21  times so far)
20/05/27 16:35:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (22  times so far)
20/05/27 16:35:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (23  times so far)
20/05/27 16:36:10 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (24  times so far)
20/05/27 16:36:29 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:36:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:37:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:37:25 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:37:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:38:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:38:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:38:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:38:59 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:39:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:39:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:39:58 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:40:18 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:40:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:40:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:41:16 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:41:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:41:55 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:42:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:42:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:42:59 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/05/27 16:42:59 INFO DiskBlockManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1590592506722_0001/spark-73af8e3b-f428-47d4-9e13-fed4e19cc2cd
2020-05-27T16:41:16.336+0000: [GC (Allocation Failure) 2020-05-27T16:41:16.336+0000: [ParNew: 272234K->242K(305984K), 0.0094375 secs] 9076907K->8804915K(13188748K), 0.0094895 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:34.686+0000: [GC (Allocation Failure) 2020-05-27T16:41:34.686+0000: [ParNew: 272242K->257K(305984K), 0.0084179 secs] 9076915K->8804947K(13188748K), 0.0084840 secs] [Times: user=0.09 sys=0.01, real=0.01 secs] 
2020-05-27T16:41:35.145+0000: [GC (Allocation Failure) 2020-05-27T16:41:35.145+0000: [ParNew: 272257K->1382K(305984K), 0.0095541 secs] 9076947K->8806073K(13188748K), 0.0096080 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.077+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.077+0000: [ParNew: 273382K->2683K(305984K), 0.0097177 secs] 9078073K->8807392K(13188748K), 0.0097754 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.513+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.513+0000: [ParNew: 274683K->3025K(305984K), 0.0093345 secs] 9079392K->8807734K(13188748K), 0.0093892 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:05.481+0000: [GC (Allocation Failure) 2020-05-27T16:42:05.481+0000: [ParNew: 275025K->4102K(305984K), 0.0092950 secs] 9079734K->8808830K(13188748K), 0.0093464 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:18.711+0000: [GC (Allocation Failure) 2020-05-27T16:42:18.711+0000: [ParNew: 276102K->2972K(305984K), 0.0098928 secs] 9080830K->8807700K(13188748K), 0.0099510 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:36.493+0000: [GC (Allocation Failure) 2020-05-27T16:42:36.493+0000: [ParNew: 274972K->3852K(305984K), 0.0094324 secs] 9079700K->8808598K(13188748K), 0.0094897 secs] [Times: user=0.11 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:40.880+0000: [GC (Allocation Failure) 2020-05-27T16:42:40.880+0000: [ParNew: 275852K->2568K(305984K), 0.0111794 secs] 9080598K->8807882K(13188748K), 0.0112352 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
Heap
 par new generation   total 305984K, used 261139K [0x0000000440000000, 0x0000000454c00000, 0x0000000483990000)
  eden space 272000K,  95% used [0x0000000440000000, 0x000000044fc82cf8, 0x00000004509a0000)
  from space 33984K,   7% used [0x00000004509a0000, 0x0000000450c220a8, 0x0000000452ad0000)
  to   space 33984K,   0% used [0x0000000452ad0000, 0x0000000452ad0000, 0x0000000454c00000)
 concurrent mark-sweep generation total 12882764K, used 8805314K [0x0000000483990000, 0x0000000795e63000, 0x00000007c0000000)
 Metaspace       used 77726K, capacity 79553K, committed 79604K, reserved 1118208K
  class space    used 10289K, capacity 10704K, committed 10740K, reserved 1048576K

Screenshot of executors

我尝试过的事情:

  • 更改spark.sql.shuffle.partitions
  • 更改spark.default.parallelism
  • 重新划分数据框
  • 我该如何解决这个问题?

    提前感谢!

Thijs

在(name_id,name)组合的数据帧上调用Spark的MinHashLSH的roximatedSimilarityJoin时遇到问题。我尝试解决的问题的摘要:我的数据帧大约为30 ...

pyspark apache-spark-sql garbage-collection amazon-emr minhash
1个回答
0
投票

approxSimilarityJoin仅在输入到MinHash中的令牌足够不同的情况下才能很好地并行化。由于单个字符标记经常出现在许多记录中;在您的字符列表中包含NGram转换,以减少每个标记的出现频率;这将大大减少数据偏斜,并应解决内存紧张问题。

MinHash模拟创建令牌填充的随机排列的过程,并选择为给定样本集显示的最小令牌。由于您使用单个字符作为标记,因此假设您选择了一个MinHash种子,该种子使字符e成为随机排列的第一个字符。在这种情况下,每行中包含字母e的行都将具有匹配的MinHash,并将其改组给同一工作人员以进行集合比较。这将导致极端的数据偏斜和内存不足错误。

© www.soinside.com 2019 - 2024. All rights reserved.