spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

为什么我无法使用pyspark连接到kafka? Kafka_2.12-2.3.0和Spark_2.4.4或2.3.0或2.3.4]]

我无法使用下面的python代码从spark_2.4.4结构化流连接到kafka_2.12-2.3.0。我的scala版本是2.11.12,而pyspark.sql import SparkSession的OpenJDK是1.8.0_222 ...

回答 2 投票 0

火花结构化流:导入图像文件以创建简单的ML应用程序

我想构建一个结构化流应用程序,其目的是从URL检索图像并构建一个非常简单的ML模型,该模型将根据...的内容进行分类。

回答 1 投票 0

为什么我无法使用pyspark连接到kafka?

我无法使用下面的python代码从spark_2.4.4结构化流连接到kafka_2.12-2.3.0。我的scala版本是2.11.12,而pyspark.sql import SparkSession的OpenJDK是1.8.0_222 ...

回答 1 投票 0

如何在结构化流中解析JSON记录?

我正在开发一个Spark结构化的流应用程序,并且我试图解析以下格式的JSON。 {“名称”:“ xyz”,“年龄”:29,“详细信息”:[“城市”:“孟买”,“国家”:“印度”]} {“名称”:“ abc”,“年龄”: 25,“ ...

回答 1 投票 1

如何计算一天从Kafka主题获取的邮件数量?

我正在从Kafka主题中获取数据,并以Deltalake(parquet)格式存储它们。我希望找到特定日期获取的邮件数量。我的思考过程:我想阅读...

回答 2 投票 2

JSON解析在Spark结构化流中返回null

我正在开发一个Spark结构化的流应用程序,并且我试图解析以下格式的JSON。 {“名称”:“ xyz”,“年龄”:29,“详细信息”:[“城市”:“孟买”,“国家”:“印度”]} {“名称”:“ abc”,“年龄”: 25,“ ...

回答 1 投票 0

Spark结构化流-将数据存储到Azure datalake gen1中时出错

我陷入了一个问题,目前正在尝试找出解决方案。此问题与将流数据输出存储到Azure Datalake有关。以下是我在...

回答 1 投票 0

火花结构化流-状态流处理中具有窗口操作的事件处理

我不熟悉Spark结构化流处理,目前正在研究一个用例,其中结构化流应用程序将从Azure IoT中心-事件中心获取事件(例如,每隔20个……)>

回答 1 投票 0

使用Spark结构化流的累积计数

我想使用移动窗口来计算过去1小时内数据框列中的值的累计计数。我可以使用rangeBetween使用pyspark(非流式处理)窗口函数获得预期的输出,...

回答 1 投票 0

将GenericRecord转换为DF

我在流式传输中有一个要求,我必须将GenericRecord转换为DatFrame,以便可以使用EXPLODE和DF中可用的其他功能。所以首先,我看如何转换...

回答 1 投票 0

如何丰富流查询的数据并将结果写入Elasticsearch?

对于给定的数据集(originalData),我需要映射值,然后准备一个新数据集,结合来自elasticsearch的搜索结果。数据集 orignalData = spark .readStream()...

回答 2 投票 1

如何丰富流查询的数据并将结果写入Elasticsearch?

对于给定的数据集(originalData),我需要映射值,然后准备一个新数据集,结合来自elasticsearch的搜索结果。数据集 orignalData = spark .readStream()...

回答 1 投票 0

清除来自kafka的结构化火花抵消

当我测试时,我的代码如下。 .format(“ kafka”).option(“ kafka.bootstrap.servers”,“ ...”).option(“ subscribe”,“ ...”).option(“ startingOffsets”,“最早”)// .option(“ ...

回答 1 投票 0

可能在每个微型批次中查找Cassandra

我们正在使用结构化流,并尝试对源数据进行一些简化。如果id col在20天内重复,我们需要在最早的活动时间进行更新。 20天可能会有10-15亿行。我们...

回答 1 投票 0

点火连接因Spark结构化流而失败

我正在尝试将ignite集成到我的Spark结构化流应用程序中。以下是我为ignite包含的依赖项-ignite-core,-ignite-spark,-ignite-spring,-cache-api Spark ...

回答 1 投票 0


带有Apache Bahir的pyspark MQTT结构化流式传输

我正在使用spark 2.4,并且已经像这样运行pyspark:./bin/pyspark --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.2 pyspark成功运行。 (但是当我运行spark-sql-streaming -...

回答 1 投票 1

Delta Lake Create Table具有类似的结构

我在“ / mnt / events-bronze”位置有一个铜级三角洲湖泊表(events_bronze),数据从kafka传输到该表。现在,我希望能够从此表中流式传输并使用“ ...

回答 1 投票 0

无法找到LoginModule类:org.apache.kafka.common.security.plain.PlainLoginModule

环境:Spark 2.3.0,Scala 2.11.12,Kafka(无论最新版本是什么),我有一个安全的Kafka系统,我正在尝试将我的Spark Streaming Consumer连接到该系统。以下是我的build.sbt文件:...

回答 1 投票 0

具有结构化流协议的Apache Kafka

我正在尝试使用结构化流媒体来编写(protobuf的)Kafka使用者。我们称protobuf为A,它应该在Scala中反序列化为字节数组(Array [Byte])。我尝试了所有...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.