spark-redshift 相关问题


Apache Spark Structured Streaming 中 Spark UI 上的查询和阶段卡住了

我在 EMR 集群 (6.14) 上使用 Apache Spark Structured Streaming (3.1.2)。 Spark 结构化流将数据从 Apache Kafka 流式传输到 Delta Lake 表。当我打开 Spark UI 时,我看到以下内容


AWS CloudWatch Logs 未创建

我正在尝试使用 AWS Glue 运行 ETL 作业,将数据从 Redshift 获取到 S3。 当我运行爬网程序时,它成功连接到 Redshift 并获取架构信息。相关日志已创建...


Apache Spark 中的 join 和 cogroup 有什么区别

Apache Spark 中的 join 和 cogroup 有什么区别?每种方法的用例是什么?


kedro ipython,如何访问spark会话

我能够在 kedro ipython 会话中加载 Spark 数据集。 首先,我按照此处所述配置了 Spark 会话。 然后我用 ipython --ext kedro.extras.exten 启动了 kedro ipython 会话...


在 Spark 的作业之间移动执行器的开销?

我正在阅读一篇有关 Spark 作业调度的论文,我对他们对 Spark 的概述感到困惑: Spark作业由一个DAG组成,其节点是作业的执行阶段。每个阶段代表...


未找到 Redshift COPY 命令分隔符

我正在尝试将一些文本文件加载到 Redshift。它们以制表符分隔,最终行值之后除外。这会导致未找到分隔符错误。我只看到一种设置字段分隔符的方法...


在 Redshift 中,如何复制表、添加 dist 和排序键以及保留列编码?

我想在 Redshift 中复制一个表,这样我就可以保留现有的列编码,同时添加 DIST 和 SORT 键。有没有一种简单的可重复的方法来做到这一点? 以前我有


将 pandas 数据帧转换为 Spark 数据帧时收到错误

由于spark没有开箱即用的支持读取excel文件,所以我首先将excel文件读入pandas数据帧,然后尝试将pandas数据帧转换为spark数据帧,但我得到了...


写入 cassandra 时从 Spark 结构化流数据帧中过滤错误记录

我知道我的 Spark Scala 数据帧的第 n 行存在一些问题(假设数据类型不正确)。当我尝试使用 Spark 结构化流在 cassandra 中写入此数据帧时,它失败了......


Redshift 中的 JSON 字段

我想将“住房”键拆分到不同的行上。所以在这个例子中我预计有 4 行。每行内都有以下列: id(这是桌子上的钥匙),housing.id,


如何在RedShift中从数组中的每个json中提取json元素?

我有一个带有超级列扫描的表格,其值类似于 [{"A": 1}, {"A": 2}], [{"A": 3}, {"A": 4}, {“A”:5}]。 我如何用 val 制作一个列...


如何在intellij中设置和运行scala-spark?

我正在尝试运行使用 Intellij 来构建用 scala 编写的 Spark 应用程序。当我执行scala程序时,出现以下错误: 线程“main”java.lang 中出现异常。


Redshift 慢速 COPY 命令

我有一堆数据(100 列和大约 30M 行),我想从 S3 复制。复制速度非常慢 -3-4 小时。我在目标表上定义了一个 dist 键和排序键。 我想知道什么


Glue Dynamic Frame 比普通 Spark 慢得多

在下图中,我们使用三种不同配置运行相同的胶水作业,以了解如何写入 S3: 我们使用动态帧写入S3 我们用纯spark框架写信给S...


如何使用 Redshift 查询编辑器 v2 创建查询计划?

查询编辑器 v2 似乎没有 v1 中的计划按钮。我已按照建议的步骤使用 AWS EventBridge 创建计划并已成功附加所述计划...


将 Spark-Submit 的路径传递到 Python 脚本中

我想将我在 Spark-submit 命令行命令中使用的路径传递到我的 Python 脚本中,以便在写出文件时使用。 (注意:不是当前工作目录,也不是


无法将 Spark 数据帧写入 Mongo

使用 mongo-connector 版本 10.0.1 以下是我的配置 .config("spark.mongodb.write.connection.uri","mongodb://127.0.0.1:27017/") .config("spark.mongodb.write.database&


为什么Langchain SQL数据库连接只检测到少数现有表?

我已成功连接到 Redshift 数据库(如下所示)并获取所有表名称; conn = psycopg2.connect(主机、数据库、端口、用户名、密码) cursor.execute("从 pg_ta 选择表名...


通过全局初始化脚本启用 Databricks 集群日志

我想通过全局初始化脚本为工作区中的所有集群(新的或旧的)设置集群日志传送。 我尝试通过自定义 Spark conf 添加底层 Spark 属性 - /databricks/dri...


有没有办法将图像的内容(存储在spark Dataframe中)与pyspark并行写入文件?

我有一个 Spark Dataframe,其中每一行都包含两个项目:文件名(带有扩展名,例如 .jpg)和文件的内容(以字节为单位)。 我想写一个过程...


Spark SQL 不支持 JSONPATH 通配符的任何解决方法

spark.sql("""select get_json_object('{"k":{"value":"abc"}}', '$.*.value') as j""").show() 这会导致 null,而它应该返回 'a...


在结构化流 API 中跨多个集群使用共享 Kafka 主题执行 Spark 作业

我正在开发一个 Spark 项目,我需要在两个不同的集群上运行作业,两个集群都使用相同的 Kafka 主题。我希望这些作业能够有效地共享负载并平衡


如何在 Cloud Composer 2 的 KerbenetesPodOperator 中指定非默认计算类

我正在 Cloud Composer 2 中使用 KurbenetesPodOperator 创建 pod 来执行 Spark 作业。 默认情况下,当您使用


在 Spark DataFrame python 中将二进制字符串的列转换为 int

所以我有一个数据框,其中有一列,如下所示: +----------+ |some_colum| +----------+ | 10| | 00| | 00| | 10| | 10| | 00| | 10| | 00| | ...


从spark/scala项目代码中资源文件夹中的sql文件读取查询

我在 IntelliJ 中的文件夹结构如下 src-->主-->资源-->sql-->samplequery.sql 我在文件夹 src--> main-->scala-... 中有 scala 对象文件samplequeryexecute


pyspark 检查点在本地计算机上失败

我刚刚开始在本地计算机上使用独立版本学习 pyspark。我无法让检查站工作。我把剧本归结为这个...... Spark = SparkSession.builder.appName("PyTest").master("


Databricks Spark:java.lang.OutOfMemoryError:GC 开销超出限制 i

我正在 Databricks 集群中执行 Spark 作业。我通过 Azure 数据工厂管道触发作业,它以 15 分钟的间隔执行,因此在成功执行三到四次之后...


如何使用诗歌从 test.pypi.org 安装软件包?

我想在我的项目中使用包的预发布版本(https://test.pypi.org/project/delta-spark/2.1.0rc1/)。 我正在使用诗歌来管理我的 pyproject.toml。我该怎么做呢? 换句话说...


我在安装 pyspark 时遇到错误,如何修复它?

我想安装并练习pyspark。但是在安装和进入 pyspark-shell 过程中,出现以下错误。 C:\Windows\System32>spark-shell 将默认日志级别设置为“WARN”。 至


JUNIT 测试用例-Spark JDBC

我是 Java 编程的新手。我有一个从Oracle数据库读取数据的方法。现在我需要帮助使用 JUnit 框架为以下代码编写测试用例。 数据集 df = Spark.read().


将 Fastq 文件直接读取到 Pandas Dataframe 中

我正在尝试将 Fastq 文件直接读入 pandas 数据帧,类似于下面的链接: 将 FASTQ 文件读入 Spark 数据帧 我到处搜索,但找不到可行的选择。 电流...


火花计数未给出正确结果

我是 Spark 的新手,最近了解到它会在调用某个操作时执行所有转换。在搜索过程中,我找到了一个简单的代码来测试它,结果并不符合预期。 他...


如何在 PySpark 中按条件聚合相邻行进行分组

我在 Spark 数据框中进行条件分组时遇到问题 下面是完整的例子 我有一个数据框,已按用户和时间排序 活动地点用户 0 观看


如何在 AWS EMR 上配置/安装 JDBC SQLServerDriver for Spark 3.5?

我正在开发一个 PySpark ETL 管道应用程序,以便最终部署在 AWS EMR 上。数据从 Microsoft SQL Server 数据库中提取或提取。当我在本地运行代码时,我使用本地 mas...


Spark JDBC 写入 Teradata - 如何编写并行查询

我有一个大约 2000 万行和 5 列的数据帧,我想将其写入 Teradata。我面临的问题是它需要一个绝对年龄来加载,因为我们可以使用一个分区,因为表将......


在 PySpark 中执行不带 OrderBy 的窗口函数

我有一个数据框,其中数据的顺序已经正确。 现在我需要在数据帧上执行诸如超前/滞后之类的窗口函数,但是根据 Spark,orderBy 是强制性的,它不允许我喜欢 lea...


有没有办法在不使用collect()的情况下将数据帧值收集为列表

我面临着如何在不使用收集方法的情况下有效过滤 Spark DataFrame 的挑战,这可能会导致大型数据集上的性能问题。具体来说,我需要过滤


Spark中同规格硬件上本地处理和集群处理有什么区别?

本地模式 vs 集群模式 我是一个刚刚使用EMR的新手。 我正在使用 AWS EMR。 有主节点、核心节点、任务节点。 为什么要使用多核/任务?我不能只用一个吗?难道是……


© www.soinside.com 2019 - 2024. All rights reserved.