Q:数据集 平铺到空的数据集Spark Java [重复]。

问题描述 投票:0回答:1

我想删除数据集的重复值。

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|            a^a^a^b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|         c^c^d^e^f^f|       FR| iOS|   2020-04-01|

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|                  a |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|      item_guid_list|                  b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  c |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  d |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  e |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  f |       FR| iOS|   2020-04-01|

不过

当我使用flatmap

结果是

++
||
++
||
||
||
||
||

我的代码是

            StructType structType = new StructType();
            structType.add("e_key", DataTypes.StringType);
            structType.add("f_timestamp_day", DataTypes.StringType);
            structType.add("key", DataTypes.StringType);
            structType.add("value", DataTypes.StringType);
            structType.add("f_country", DataTypes.StringType);
            structType.add("f_os", DataTypes.StringType);
            structType.add("received_date", DataTypes.StringType);


            Dataset<Row> drop_duplicate_feature = 
            explode_events.flatMap(
                (FlatMapFunction<Row, Row>)row->{
                    List<Row> list = new ArrayList<Row>();
                    String value = row.getString(3);
                    String[] array_of_value = value.split("\\^");
                    array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
                    for(int index = 0; index < array_of_value.length; index++){
                        list.add(
                            RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
                        );
                    }
                    return list.iterator();
                }
                , RowEncoder.apply(structType)
            );

我使用flatmap生成不同的行,并将其添加到列表中。

为什么RowEncoder.apply()不工作?

apache-spark apache-spark-sql spark-java flatmap
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.