apache-spark-sql 相关问题

Apache Spark SQL是Spark上的“SQL和结构化数据处理”工具,Spark是一种快速通用的集群计算系统。

Spark Connect 3.5.1 无法启动连接服务器

我有 Apache Spark 3.5.1,我正在尝试启动 Spark 连接服务器。 当我运行这个命令时: ./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1 我明白了...

回答 1 投票 0

spark 数据帧基于条件的列总和

我想计算值的部分,只有两个分区(其中 type == red 和 where type != red) 身份证 |类型 |价值 ---------------------------- 1 | 红色| ...

回答 2 投票 0

使用 Apache Spark 进行索引/切片以返回在 Spark.sql 查询中使用的结果

以下简单的 Spark sql 查询会产生结果 df = sql("""从所有之前选择 FlagFileType""") 在不讨论无聊的细节的情况下,我编写了一个简单的索引

回答 1 投票 0

任务卡在 Spark (sedona) 中的 Join -> groupby 的“获取结果”

在 Spark 3.4.1、sedona、jupyter lab 上使用 docker 镜像工作 我正在尝试从具有地理信息的大型数据集中计算船舶最多船舶点的月份。 我有 2 个数据框,关于...

回答 1 投票 0

使用 Scala Spark sql 库确定聚合数据集中的条件是否为真

我正在尝试聚合数据集并确定数据集中的行的条件是否为真。 假设我有一个包含这些值的数据集 客户 ID 旅行类型 行驶距离 1 车 10 1...

回答 1 投票 0

在 Apache Spark Join 中包含空值

我想在 Apache Spark 连接中包含空值。 默认情况下,Spark 不包含带有 null 的行。 这是默认的 Spark 行为。 val 数字Df = Seq( (“123”), (“456”), (努...

回答 7 投票 0

没有聚合的 pyspark 数据透视表

我希望本质上进行旋转,而不需要最后进行聚合以保持数据帧完好无损并且不创建分组对象 举个例子: +--------+------------+--------+--...

回答 1 投票 0

Spark阶段交换的意义

任何人都可以解释一下我的 Spark DAG 中的 Spark 阶段中交换的含义吗?我的大部分阶段要么以交换开始,要么以交换结束。 1). WholeStageCodeGen -> 交换 2)。交换 -> WholeStageCodeG...

回答 1 投票 0

Spark SQL Row_number() 按排序描述分区

我已经使用 Window 在 Spark 中成功创建了 row_number() 和partitionBy(),但想按降序排序,而不是默认的升序。 这是我的工作代码: 来自 pyspark

回答 6 投票 0

Pyspark 出现次数及其分布

我有一个DataFrame[Rand:double,Flag:int,Value:int]。我想用这些数据执行以下操作。 在“Flag”列中,计算出现超过 3 个连续 0 的总次数。

回答 1 投票 0

数据帧转换不会抛出溢出异常并产生 null

来自 pyspark.sql.functions 导入* 从 pyspark.sql.types 导入 * 我正在尝试将数据帧转换为 df.column.cast(ShortType()) 但当我尝试插入数据 99999 时,它正在转换为 null

回答 3 投票 0

Spark 重新计算缓存的 Dataframes

使用 Scala 编写的 Spark 应用程序。 具有六大功能。每个函数将两个 Dataframe 作为输入,处理它们并发出一个结果 DF。我正在缓存每个函数的结果'...

回答 1 投票 0

根据 Spark 数据帧 Scala 中的列值过滤行

我有一个数据框(Spark): id值 3 0 3 1 3 0 4 1 4 0 4 0 我想创建一个新的数据框: 3 0 3 1 4 1 我需要删除每个 id 1(值)之后的所有行。我...

回答 4 投票 0

如何解决使用 withColumn() 使用 substring() 从字符串中删除第一个字符时出现的“Column is not iterable”错误?

我正在尝试从数据帧 df_rotina_pro 中的某些列中删除第一个字符。但我收到以下错误: 列不可迭代。 代码: 变换后的_df = (

回答 1 投票 0

Azure databricks 作业运行运行时的自定义值

如何修改笔记本中的状态和错误代码值。 目前它显示状态成功,但什么时候会进入笔记本内部。 源中没有数据,因此笔记本退出。我需要什么

回答 1 投票 0

Spark 将 Parquet 写入 S3 最后一个任务需要很长时间

我正在将一个 parquet 文件从 DataFrame 写入 S3。 当我查看 Spark UI 时,我可以看到除 1 个写入阶段之外的所有任务(例如 199/200)。这最后一项任务似乎要花很长时间......

回答 4 投票 0

在 PySpark 中的聚合数据上滚动累积乘积

在下面的 PySpark 数据框中,我尝试将列中的值从上到下相乘(就像我使用 F.product 函数一样),但我没有逐行计算,而是具有相同的子组

回答 1 投票 0

PySpark:计算列子集的行最大值并添加到现有数据帧

我想计算每行的列子集的最大值,并将其添加为现有数据框的新列。 我设法以非常尴尬的方式做到这一点: def add_colmax(df,subset_c...

回答 2 投票 0

Spark SQL/DataFrame 中的字符串编码问题

所以我有这个csv文件,它有两列:id(int),name(string)。当我通过以下代码将文件读入 pyspark 时: 架构 = StructType([ StructField("id", IntegerType(),...

回答 2 投票 0

Spark 3.0 - 从 MQTT 流读取数据

我想将数据流从基于 mosquitto 的 MQTT 主题读取到我的 Spark 3.0 应用程序中。我尝试通过以下方式使用 Bahir 库: 数据集 df = SparkSession .

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.