pyspark-sql 相关问题

将此标记用于与PySpark中的SQL模块相关的问题。

有没有办法从Stream2中的列'B'中减去Stream1中的列'A'?

我正在使用火花结构流(pyspark)从Kafka中读取2个流(stream1和stream)。我必须计算stream1和stream 2的偏移量之间的差异。我正在尝试一些事情......

回答 1 投票 1

To_Date函数始终返回null

我有一个字符串格式的列,我通过:session.sql(“从dba选择milestoneactualdate”)此列包含像“20190101”这样的数据。我想通过以下方式将此字符串转换为日期:session.sql(“...

回答 1 投票 0

from_json Pyspark SQL函数:找不到键的默认值?

我像往常一样使用from_json Pyspark SQL函数,例如:>>>从pyspark.sql.functions导入pyspark.sql.types作为t >>>导入from_json >>> df = sc.parallelize(['{“a” :1}” ...

回答 1 投票 1

传递列表中的列名称

我有一个列名列表,每次都有所不同。列名存储在列表中。所以,我需要从列表中传递列名(在下面的示例中将其id和programid)传递给when ...

回答 1 投票 2

Pyspark:使用configParser读取HDFS上的属性文件

我正在使用ConfigParser来读取传递给我的pyspark程序的键值。当我从hadoop集群的边缘节点执行时,代码工作正常,配置文件在本地目录中...

回答 1 投票 0

ImportError:没有名为Window的模块,但是从导入工作

Window类的pyspark文档如下所示:class pyspark.sql.Window但如果执行代码:import pyspark.sql.Window会出现以下错误:ImportError:No module ...

回答 1 投票 0

Spark SQL在资源空闲时执行缓慢

我有一个Spark SQL,用于执行<10分钟,现在在群集迁移后3小时运行,需要深入了解它实际执行的操作。我是新来的,请不要介意我是不是......

回答 1 投票 2

如何使用基于窗口时间的结构化Spark流式传输消息来自Kafka的消息(不是立即10分钟)

我们有批处理来使用S / Spark执行更新/插入操作。但现在我们有用例让这更实时。以下是配置和我的方法。但它没有用。 ...

回答 1 投票 0

如何从pyspark中的数据框中选择一系列行

我有一个10609行的数据帧,我希望一次将100行转换为JSON并将它们发送回Web服务。我试过使用SQL的LIMIT子句,比如temptable = spark.sql(“select ...

回答 1 投票 0

使用字符位置向后搜索以查找子字符串的位置

你好,我是Pyspark的新人。我想在Pyspark做类似的事情。 Oracle:INSTR('Corporate Floor','或', - 3,2)结果:2 [该函数从最后一个字符向后计数到第三个...

回答 1 投票 1

如何只读取文件列而不是行分隔符?

基本上我的输入文件是从Abinitio DML文件处理的。但我想使用Pyspark读取该文件。在Abinitio中,它们是使用列数划分的行。即使行分隔符'\ n'......

回答 1 投票 0

如何在Python代码的函数中获取今天的datepart?

我需要在Python中运行一个函数来检查今天的日期和月份,以及另一个指示符/标志,以确定是仅运行每月记录的代码还是运行两者的代码......

回答 1 投票 0

如何在使用字典时使用replace()方法替换列值?

在使用replace方法替换df中的列的值时,我们如何使用字典来执行相同的操作。我遇到了语法问题。 person = spark.createDataFrame([(0,“...

回答 1 投票 0

使用ANALYZE TABLE命令的Spark与Hive差异 -

在Hive表上从Spark运行的ANALYZE TABLE命令不会提供与从Hive发出的相同命令相同的性能改进。例如,我已将数据框插入空...

回答 1 投票 0

如何在Spark SQL中的多个列上进行数据透视?

我需要在pyspark数据帧中转动多个列。样本数据帧,>>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),( 101,1,23,10),(101,2,45,13),(101,3,67,14),(...

回答 3 投票 14

如何修复:pyspark.sql.utils.IllegalArgumentException:列功能的类型不正确?

我是pyspark的新手并试图在简单的代码下运行。 #创建LabeledPoint的RDD bcData = MLUtils.loadLibSVMFile(sc,“breast-cancer.txt”)#将其转换为DataFrame bcDataFrame = ss ....

回答 1 投票 0

有没有办法将生成的groupby流加入到kafka-spark-structured流的原始流中?

我正在阅读Kafka主题的流。我在事件时间执行窗口组操作。现在,我想将这个结果流从groupBy加回到原始流。 #我在读 ...

回答 1 投票 1

无法从PySpark sql获取地图对象

我在PySpark中很新 - 我有一个问题。我已经构建了一个代码来读取镶木地板文件,通过SQL查询查询它 - 当我使用udf(udf获取列表对象并返回字典对象)时,...

回答 1 投票 1

如何通过Spark SQL连接BigQuery?

我有一个简单的python代码,其中包括使用具有我的凭据的JSON文件连接bigQuery。 data = pd.read_gbq(SampleQuery,project_id ='XXXXXXXX',private_key ='filename.json')这里......

回答 1 投票 1

AWS Glue上的Spark SQL:pyspark.sql.utils.AnalysisException

我在AWS Glue脚本中使用Spark SQL来转换S3中的一些数据。这是脚本逻辑数据格式CSV编程语言:Python 1)使用Glue的目录将数据从S3中拉入胶水中......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.