spark-streaming-kafka 相关问题


Apache Spark Structured Streaming 中 Spark UI 上的查询和阶段卡住了

我在 EMR 集群 (6.14) 上使用 Apache Spark Structured Streaming (3.1.2)。 Spark 结构化流将数据从 Apache Kafka 流式传输到 Delta Lake 表。当我打开 Spark UI 时,我看到以下内容


在结构化流 API 中跨多个集群使用共享 Kafka 主题执行 Spark 作业

我正在开发一个 Spark 项目,我需要在两个不同的集群上运行作业,两个集群都使用相同的 Kafka 主题。我希望这些作业能够有效地共享负载并平衡


Camel Kafka接收器连接器配置和依赖项

我正在尝试使用“camel-azure-storage-datalake-kafka-connector”从 Kafka 连接到 Azure ADLS Gen2 我有一个运行 Docker 的 Linux 机器,其中包含 debezium/zookeeper、debezium/kafka 和 debe...


如何在 Kubernetes multipod 部署中使用 spring kafka 处理 Kafka 容器生命周期

我正在使用 Spring kafka 实现,我需要通过 REST API 启动和停止我的 kafka 消费者。为此,我正在使用 KafkaListenerEndpointRegistry endpointRegistry 端点注册表。


Kafka Java Consumer Client 是单线程的吗

我们正在开始使用 Kafka, 在阅读本文时 - https://docs.confluence.io/kafka-clients/java/current/overview.html - 它似乎暗示客户端是单线程的。 * 由于这个...


使用kafka密钥的kafka s3连接器分区

如何使用 kafka msg key 作为 s3 连接器中的分区标准或 我怎样才能获得密钥并将其存储在 s3 对象中 谢谢!


Apache Spark 中的 join 和 cogroup 有什么区别

Apache Spark 中的 join 和 cogroup 有什么区别?每种方法的用例是什么?


Kafka Connect S3 Sink 添加元数据

我正在尝试将元数据添加到 kafka 的输出到 S3 存储桶中。 目前,输出只是来自 kafka 主题的消息的值。 我想用下面的东西把它包起来......


kedro ipython,如何访问spark会话

我能够在 kedro ipython 会话中加载 Spark 数据集。 首先,我按照此处所述配置了 Spark 会话。 然后我用 ipython --ext kedro.extras.exten 启动了 kedro ipython 会话...


在 Spark 的作业之间移动执行器的开销?

我正在阅读一篇有关 Spark 作业调度的论文,我对他们对 Spark 的概述感到困惑: Spark作业由一个DAG组成,其节点是作业的执行阶段。每个阶段代表...


即使部署在 kubernetes pod 上,kafka 主题仍然是不可变的吗?

我在 kubernetes pod 上部署了 kafka 主题和模式注册表,我尝试修改/更改 kafka 主题和模式注册表的清单文件,然后模式注册表的行为在


使用 kafka-go 和循环平衡器时,数据始终进入分区 0

我正在使用 kafka-go 库将消息写入 Kafka。我正在使用循环平衡器,但数据始终进入分区 0。我尝试忽略所有消息的分区字段,但是...


从 Kafka Consumer 传递数据

我想从Kafka获取数据,此方法成功获取记录但无法传递给变量。这是我的代码 公共无效 subscribeFromKafka() 抛出异常 { 列表结果=新


将 pandas 数据帧转换为 Spark 数据帧时收到错误

由于spark没有开箱即用的支持读取excel文件,所以我首先将excel文件读入pandas数据帧,然后尝试将pandas数据帧转换为spark数据帧,但我得到了...


写入 cassandra 时从 Spark 结构化流数据帧中过滤错误记录

我知道我的 Spark Scala 数据帧的第 n 行存在一些问题(假设数据类型不正确)。当我尝试使用 Spark 结构化流在 cassandra 中写入此数据帧时,它失败了......


依赖更新后构建kafka生产者失败

在我的 SpringBoot Java 项目中,我使用的是 kafka,特别是 ReactiveKafka。我正在更新依赖项,特别是这些依赖项: springboot 2.6.6 -> 3.1.5 弹簧卡夫卡 2.8.0 -> 3.0.11 反应堆-


如何在intellij中设置和运行scala-spark?

我正在尝试运行使用 Intellij 来构建用 scala 编写的 Spark 应用程序。当我执行scala程序时,出现以下错误: 线程“main”java.lang 中出现异常。


Kafka UI 无法连接到 Broker

我是容器化新手。我正在尝试设置我的本地环境,我的 java 应用程序想要连接到 Kafka。无法使用 Docker,所以决定使用 Podman。我有三个容器在同一个上运行


Spring Boot 3.1.X及以上版本的Kafka客户端连接问题

我最近将我的一项 Spring Boot 服务升级到 3.1.x,升级后我遇到了 kafka 问题。它似乎无法连接并不断向我提供以下日志。 2024-01-03T06:18...


有没有办法将AWS Cloudwatch日志输入Kafka主题

我正在努力寻找这方面的任何方向。我有一个内部系统可以处理日志以进行监控。我希望从 Cloudwatch 发送错误并在 kafka 主题上发布,其中...


Kafka:如何使用 Java API 从主题中删除记录?

我正在寻找一种从 Kafka 主题中删除(完全删除)已使用记录的方法。我知道有几种方法可以做到这一点,通过更改主题的保留时间或删除...


Glue Dynamic Frame 比普通 Spark 慢得多

在下图中,我们使用三种不同配置运行相同的胶水作业,以了解如何写入 S3: 我们使用动态帧写入S3 我们用纯spark框架写信给S...


将 Spark-Submit 的路径传递到 Python 脚本中

我想将我在 Spark-submit 命令行命令中使用的路径传递到我的 Python 脚本中,以便在写出文件时使用。 (注意:不是当前工作目录,也不是


无法将 Spark 数据帧写入 Mongo

使用 mongo-connector 版本 10.0.1 以下是我的配置 .config("spark.mongodb.write.connection.uri","mongodb://127.0.0.1:27017/") .config("spark.mongodb.write.database&


Kafka 总是有一个消费者消费一组中的主题消息

我有两个具有相同组ID的消费者服务器订阅了相同的主题。 一台 kafka 服务器仅运行一个分区。 据我所知,消息应该在这两个中随机消耗


具有手动偏移提交功能的 Kafka 消费者客户端一次只允许客户端

我目前正在使用一个Java Kafka消费者,它手动提交偏移量(enable.auto.commit = false),我发现即使我生成了多个实例,我发现这样的设置也是如此


如何仅删除已消费的消息以及如何在kafka主题中显示未消费的消息?

我们将一个项目从ActiveMQ迁移到Kafka。 过去我们向很多队列写入了太多的消息,消费完之后,ActiveMQ会自动删除消费的消息。仅未消耗


通过全局初始化脚本启用 Databricks 集群日志

我想通过全局初始化脚本为工作区中的所有集群(新的或旧的)设置集群日志传送。 我尝试通过自定义 Spark conf 添加底层 Spark 属性 - /databricks/dri...


有没有办法将图像的内容(存储在spark Dataframe中)与pyspark并行写入文件?

我有一个 Spark Dataframe,其中每一行都包含两个项目:文件名(带有扩展名,例如 .jpg)和文件的内容(以字节为单位)。 我想写一个过程...


Spark SQL 不支持 JSONPATH 通配符的任何解决方法

spark.sql("""select get_json_object('{"k":{"value":"abc"}}', '$.*.value') as j""").show() 这会导致 null,而它应该返回 'a...


如何在 Cloud Composer 2 的 KerbenetesPodOperator 中指定非默认计算类

我正在 Cloud Composer 2 中使用 KurbenetesPodOperator 创建 pod 来执行 Spark 作业。 默认情况下,当您使用


在 Spark DataFrame python 中将二进制字符串的列转换为 int

所以我有一个数据框,其中有一列,如下所示: +----------+ |some_colum| +----------+ | 10| | 00| | 00| | 10| | 10| | 00| | 10| | 00| | ...


Python KafkaTimeoutError:等待未来超时

我正在使用 Kafka 将日志发送到主题。发送消息时,我总是收到此错误 消息:“测试日志” 参数:() --- 记录错误 --- 回溯(最近一次调用最后一次): 文件“...


从spark/scala项目代码中资源文件夹中的sql文件读取查询

我在 IntelliJ 中的文件夹结构如下 src-->主-->资源-->sql-->samplequery.sql 我在文件夹 src--> main-->scala-... 中有 scala 对象文件samplequeryexecute


pyspark 检查点在本地计算机上失败

我刚刚开始在本地计算机上使用独立版本学习 pyspark。我无法让检查站工作。我把剧本归结为这个...... Spark = SparkSession.builder.appName("PyTest").master("


Databricks Spark:java.lang.OutOfMemoryError:GC 开销超出限制 i

我正在 Databricks 集群中执行 Spark 作业。我通过 Azure 数据工厂管道触发作业,它以 15 分钟的间隔执行,因此在成功执行三到四次之后...


如何使用诗歌从 test.pypi.org 安装软件包?

我想在我的项目中使用包的预发布版本(https://test.pypi.org/project/delta-spark/2.1.0rc1/)。 我正在使用诗歌来管理我的 pyproject.toml。我该怎么做呢? 换句话说...


我在安装 pyspark 时遇到错误,如何修复它?

我想安装并练习pyspark。但是在安装和进入 pyspark-shell 过程中,出现以下错误。 C:\Windows\System32>spark-shell 将默认日志级别设置为“WARN”。 至


JUNIT 测试用例-Spark JDBC

我是 Java 编程的新手。我有一个从Oracle数据库读取数据的方法。现在我需要帮助使用 JUnit 框架为以下代码编写测试用例。 数据集 df = Spark.read().


debezium 日期/时间字段值超出范围:0000-12-30T00:00:00Z

我们使用 Debezium 将数据同步到 在源表中我们有列timestamptz start_at,当值为0时start_at='0001-01-01 00:00:00.000000 +00:00',但是当我们检查kafka中的数据时,它是


将 Fastq 文件直接读取到 Pandas Dataframe 中

我正在尝试将 Fastq 文件直接读入 pandas 数据帧,类似于下面的链接: 将 FASTQ 文件读入 Spark 数据帧 我到处搜索,但找不到可行的选择。 电流...


Groovy 抛出 可能的解决方案:解析 LinkedHashMap 时出现 parseText(java.lang.String) 错误

我正在尝试检查kafka输出消息中是否存在该密钥,如果存在则进行进一步的操作。 卡夫卡主题的输出消息如下 [“随机名称_547hcg”:{ “访问_...


如何删除AWS MSK集群中的kafka状态存储

我有一个使用 AWS 上的 MSK 集群的 kafkaStreams 应用程序。 我需要清理状态存储(在我的应用程序中使用一些 KTable 后创建)。 我找不到任何方法来访问文件系统......


自消息发布或 sinse 服务器启动以来,kafka 是否计数 log.retatantion

如果我将 log.retantion 设置为 24 小时,则在 1.1.24 15:30 发布了一条消息。 然后服务器宕机了25小时,24年1月16日16:30再次启动,消息会立即删除吗...


火花计数未给出正确结果

我是 Spark 的新手,最近了解到它会在调用某个操作时执行所有转换。在搜索过程中,我找到了一个简单的代码来测试它,结果并不符合预期。 他...


如何在 PySpark 中按条件聚合相邻行进行分组

我在 Spark 数据框中进行条件分组时遇到问题 下面是完整的例子 我有一个数据框,已按用户和时间排序 活动地点用户 0 观看


如何在 AWS EMR 上配置/安装 JDBC SQLServerDriver for Spark 3.5?

我正在开发一个 PySpark ETL 管道应用程序,以便最终部署在 AWS EMR 上。数据从 Microsoft SQL Server 数据库中提取或提取。当我在本地运行代码时,我使用本地 mas...


Spark JDBC 写入 Teradata - 如何编写并行查询

我有一个大约 2000 万行和 5 列的数据帧,我想将其写入 Teradata。我面临的问题是它需要一个绝对年龄来加载,因为我们可以使用一个分区,因为表将......


在 PySpark 中执行不带 OrderBy 的窗口函数

我有一个数据框,其中数据的顺序已经正确。 现在我需要在数据帧上执行诸如超前/滞后之类的窗口函数,但是根据 Spark,orderBy 是强制性的,它不允许我喜欢 lea...


当消费者多于分区时,Kafka 消费者分区重新平衡

假设最初我们有一个包含 3 个分区的主题和一个包含 3 个消费者的消费者组,从该主题进行消费。如果我们在消费者组中再添加一个消费者,分区会重新平衡吗


© www.soinside.com 2019 - 2024. All rights reserved.