Apache Spark SQL是Spark上的“SQL和结构化数据处理”工具,Spark是一种快速通用的集群计算系统。
我有 Apache Spark 3.5.1,我正在尝试启动 Spark 连接服务器。 当我运行这个命令时: ./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1 我明白了...
我想计算值的部分,只有两个分区(其中 type == red 和 where type != red) 身份证 |类型 |价值 ---------------------------- 1 | 红色| ...
使用 Apache Spark 进行索引/切片以返回在 Spark.sql 查询中使用的结果
以下简单的 Spark sql 查询会产生结果 df = sql("""从所有之前选择 FlagFileType""") 在不讨论无聊的细节的情况下,我编写了一个简单的索引
任务卡在 Spark (sedona) 中的 Join -> groupby 的“获取结果”
在 Spark 3.4.1、sedona、jupyter lab 上使用 docker 镜像工作 我正在尝试从具有地理信息的大型数据集中计算船舶最多船舶点的月份。 我有 2 个数据框,关于...
使用 Scala Spark sql 库确定聚合数据集中的条件是否为真
我正在尝试聚合数据集并确定数据集中的行的条件是否为真。 假设我有一个包含这些值的数据集 客户 ID 旅行类型 行驶距离 1 车 10 1...
我想在 Apache Spark 连接中包含空值。 默认情况下,Spark 不包含带有 null 的行。 这是默认的 Spark 行为。 val 数字Df = Seq( (“123”), (“456”), (努...
我希望本质上进行旋转,而不需要最后进行聚合以保持数据帧完好无损并且不创建分组对象 举个例子: +--------+------------+--------+--...
任何人都可以解释一下我的 Spark DAG 中的 Spark 阶段中交换的含义吗?我的大部分阶段要么以交换开始,要么以交换结束。 1). WholeStageCodeGen -> 交换 2)。交换 -> WholeStageCodeG...
Spark SQL Row_number() 按排序描述分区
我已经使用 Window 在 Spark 中成功创建了 row_number() 和partitionBy(),但想按降序排序,而不是默认的升序。 这是我的工作代码: 来自 pyspark
我有一个DataFrame[Rand:double,Flag:int,Value:int]。我想用这些数据执行以下操作。 在“Flag”列中,计算出现超过 3 个连续 0 的总次数。
来自 pyspark.sql.functions 导入* 从 pyspark.sql.types 导入 * 我正在尝试将数据帧转换为 df.column.cast(ShortType()) 但当我尝试插入数据 99999 时,它正在转换为 null
使用 Scala 编写的 Spark 应用程序。 具有六大功能。每个函数将两个 Dataframe 作为输入,处理它们并发出一个结果 DF。我正在缓存每个函数的结果'...
我有一个数据框(Spark): id值 3 0 3 1 3 0 4 1 4 0 4 0 我想创建一个新的数据框: 3 0 3 1 4 1 我需要删除每个 id 1(值)之后的所有行。我...
如何解决使用 withColumn() 使用 substring() 从字符串中删除第一个字符时出现的“Column is not iterable”错误?
我正在尝试从数据帧 df_rotina_pro 中的某些列中删除第一个字符。但我收到以下错误: 列不可迭代。 代码: 变换后的_df = (
如何修改笔记本中的状态和错误代码值。 目前它显示状态成功,但什么时候会进入笔记本内部。 源中没有数据,因此笔记本退出。我需要什么
Spark 将 Parquet 写入 S3 最后一个任务需要很长时间
我正在将一个 parquet 文件从 DataFrame 写入 S3。 当我查看 Spark UI 时,我可以看到除 1 个写入阶段之外的所有任务(例如 199/200)。这最后一项任务似乎要花很长时间......
在下面的 PySpark 数据框中,我尝试将列中的值从上到下相乘(就像我使用 F.product 函数一样),但我没有逐行计算,而是具有相同的子组
我想计算每行的列子集的最大值,并将其添加为现有数据框的新列。 我设法以非常尴尬的方式做到这一点: def add_colmax(df,subset_c...
所以我有这个csv文件,它有两列:id(int),name(string)。当我通过以下代码将文件读入 pyspark 时: 架构 = StructType([ StructField("id", IntegerType(),...
我想将数据流从基于 mosquitto 的 MQTT 主题读取到我的 Spark 3.0 应用程序中。我尝试通过以下方式使用 Bahir 库: 数据集 df = SparkSession .