spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

pyspark 流式写入 kafka 不起作用

我想将这个简单的pyspark数据帧发送到kafka,我做了所有事情,但我总是收到错误。我用简单的 python 生产者脚本尝试它,它可以工作,并且 pyspark 读取流可以工作我的...

回答 1 投票 0

Python中Apache Spark的输出Dstream

我正在尝试将用于构建实时数据管道的技术,并且在将内容导出到文件时遇到了一些问题。 我已经设置了一个本地 kafka 集群,并且点头......

回答 1 投票 0

使用 Scala 在 Spark 流应用程序中编写优化 UDF 的最佳方式是什么?

我正在使用 Spark Streaming 应用程序,我需要使用来自一个 Kafka 主题的数据并需要推送到另一个 Kafka 主题。 我创建了一个 UDF 函数来执行一些业务逻辑

回答 1 投票 0

如何使用 Spark Dataframe 将列行转换为字符串变量

我需要将单列行转换为字符串变量,以便在从数据库表加载时在 where 条件中使用,而不是从表中加载整个数据。 像下面这样的示例数据框...

回答 1 投票 0

降低 Spark Streaming Info 日志的频率

当我部署 Spark Streaming 作业并通过 Spark2-submit 运行它时,我每秒都会收到这样的日志消息: 21/02/24 21:45:27 INFO yarn.Client: application_1612163470992_382017 ...

回答 1 投票 0

Databricks Spark Streaming 停止且没有任何错误消息

向 Lake 写入以下消息后,数据流式传输停止: 流媒体停止 输出[18]: 当我执行单元格时

回答 1 投票 0

DeltaFileNotFoundException:在目录 DataBricks 中找不到文件

我想请求您的帮助。 我一直在使用 DataBricks。 我们开发了一些脚本,它们正在流媒体中工作。 假设我们有两个作业正在运行和编写...

回答 1 投票 0

NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.poll

当我使用 SparkRDD API 从 Kafka 读取数据时遇到 NoSuchMethodError。当有第一个可用记录并且存在异常时,堆栈跟踪看起来像这样

回答 1 投票 0

kubernetes 上的 Spark 如何在发生故障/重新启动时恢复驱动程序

我有一个 Spark Streaming 作业,目前在 Kubernetes 中以集群模式运行。驱动程序和执行程序运行良好,我对它们没有任何问题。 然而,当思考失败时,如何才能...

回答 1 投票 0

如何在没有 json Schema 的情况下使用 Spark Stream 从 Kafka 获取 Dataframe?

我是火花新手 我正在尝试使用 Spark Stream 阅读 kafka 主题。 从 Kafka 流式传输的数据的“value”字段是一个 json 字符串。 我想将此“值”字段转换为数据框 和

回答 1 投票 0

Spark Structured Streaming dropDuplicates 在重新启动应用程序后无法按预期工作

我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...

回答 2 投票 0

如何在没有 json Schema 的情况下使用 SparkStream 从 Kafka 获取 Dataframe

我是火花新手 我正在尝试使用 Spark Stream 阅读 kafka 主题。 从 Kafka 流式传输的数据的“value”字段是一个 json 字符串。 我想将此“值”字段转换为数据框 和

回答 1 投票 0

Spark 结构化流 dropDuplicates 未按预期工作

我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...

回答 1 投票 0

将函数的使用者转换为 Stream JavaRdd

消费者applyMapping后如何处理数据流?还是按1000个一包加工? 私有JavaRDD丰富WithExternalData(JavaRDD>rdd){...

回答 1 投票 0

使用 Spark Dataframe 将列行获取到字符串变量中

我正在尝试将列行转换为字符串变量。 输入数据框。 部门名称 员工姓名 发展 阿斯特丽德 发展 弗雷哈 发展 威尔玛 销售量 马哈 销售量 爱丽丝 人员 约翰 人员 沼泽

回答 1 投票 0

具有 Spark 结构化流的动态过滤器

我正在开发 Spark Streaming 项目,目标是创建一个简单的应用程序,以便在数据流满足条件时通知用户(例如,当股票价格 > x 时发送通知)。 df =...

回答 1 投票 0

jdbc postgres 在 Spark 3.2.4 中批量写入但不流式传输

出于某种原因,jdbc postgresql 对于接收批处理数据效果很好,但它不适用于我的新版本的 Spark 3.2.4、Scala 2.12.15 和 hadoop 3.3.4 的流数据。 罐子文件: 卡夫卡-客户-3....

回答 1 投票 0

Spark 流 - withWatermark() 具有重复行为

我正在从 Kafka 读取数据(最早开始偏移)并在控制台上写入数据进行一些测试。水印持续时间为 10 秒。遵循 Spark 文档 - https://spark.apache.org/

回答 1 投票 0

Spark Streaming 输出模式仅处理新消息

我使用 nats-spark-connector ( https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced ) 连接到 NATS Jetstream 并使用 Spark 使用消息和进程Java代码。是...

回答 1 投票 0

需要在pysaprk流作业中执行聚合操作

我每隔 1 分钟就将多个传感器的数据流接收到数据块中。如果传感器“ABC”和“DEF”可用于每个 pyspark 流,则需要创建新的传感器名称“PQRS”

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.