spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark SQL的“MapGroupsWithState”中的数据集用法

我有“id和Map [String,List]”数据的事件。我正在通过id对这些数据进行分组。然后我用“mapgroupswithstate”来计算一些东西。我可以在mapgroupswithstate中使用from_json()方法吗?那么,我可以......

回答 1 投票 -1

处理数据 - Spark结构化流媒体

据我所知,火花结构化流媒体是使用检查点的容错。我想读卡夫卡。所以让我说我使用检查点,然后由于某种原因我的代码崩溃/我......

回答 1 投票 0

如何使用Java从Spark中读取kafka中的流嵌套JSON

我正在尝试使用Java从spark中读取kafka中的复杂嵌套JSON数据,并且无法形成发送到kafka的数据集实际JSON文件{“sample_title”:{“txn_date”:“2019-01-10”,“.. 。

回答 1 投票 0

如何更改_spark_metadata目录的位置?

我使用Spark Structured Streaming的流式查询使用以下代码将镶木地板文件写入S3:ds.writeStream()。format(“parquet”)。outputMode(OutputMode.Append())....

回答 1 投票 3

需要trigger.Once()元数据

嗨伙计们对经验丰富的人来说简单我有一个火花工作读取路径下的文件。我想使用结构化流媒体,即使源不是真正的流,而只是一个带有...的文件夹

回答 1 投票 0

如何将kafka时间戳值包含在spark结构流中?

我正在寻找将kafka的时间戳值添加到我的Spark结构化流模式的解决方案。我从kafka中提取了value字段并制作了数据帧。我的问题是,我需要......

回答 3 投票 0

为什么启动流式查询会导致“ExitCodeException exitCode = -1073741515”?

一直试图习惯新的结构化流媒体,但是一旦我开始.writeStream查询,它就会一直让我失误。知道是什么原因引起的吗?我能找到的最近的是......

回答 3 投票 3

PySpark Structured Streaming:将Query的输出传递给API端点

我在结构化流中有以下数据帧:TimeStamp | Room | Temperature | 00:01:29 | 1 | 55 |时间00:01:34 2 | 51 | 00:01:36 | 1 | 56 | 00:02:03 | 2 | 49 | ...

回答 2 投票 1

Spark Structured Streaming groupby窗口 - 我希望第一个时间戳上的第一个间隔开始

从Spark 2.31(HDP 3.0)上使用窗口聚合的简单完整示例中,我可以看到Spark创建了与某个整数对齐的区间。例如,这里我指定60秒......

回答 1 投票 1

有没有办法从Stream2中的列'B'中减去Stream1中的列'A'?

我正在使用火花结构流(pyspark)从Kafka中读取2个流(stream1和stream)。我必须计算stream1和stream 2的偏移量之间的差异。我正在尝试一些事情......

回答 1 投票 1

重新启动Spark Structured Streaming Job会消耗数百万条Kafka消息并死掉

我们在Spark 2.3.3上运行了一个Spark Streaming应用程序。基本上,它打开了一个Kafka Stream:kafka_stream = spark \ .readStream \ .format(“kafka”)\ .option(“kafka.bootstrap.servers”,“...

回答 1 投票 7

Spark Structured Streaming左外连接返回已匹配行的外空值

我基本上使用Spark的文档中给出的示例,内置测试流,其中一个流提前3秒(最初使用kafka但遇到了同样的问题)。 ...

回答 1 投票 5

如何正确管理从Spark Streaming生成的分区镶木地板文件

我的Spark结构化流媒体作业不断生成镶木地板文件,我想在到期后删除(比方说30天后)。我存储用分区键分区的镶木地板数据...

回答 2 投票 3

如何将Kafka与Spark结构化流与MongoDB Sink集成

我正在尝试将Kafka与Spark-Structured-Streaming集成到MongoDB Sink。如果我出错了,我需要帮助纠正我的代码集成了Kafka-Spark和Spark-Mongo。现在试着......

回答 1 投票 0

Spark Structured Streaming自动将时间戳转换为本地时间

我有UTC和ISO8601的时间戳,但使用结构化流,它会自动转换为本地时间。有没有办法阻止这种转换?我想在UTC中使用它。一世'...

回答 2 投票 11

如何使用基于窗口时间的结构化Spark流式传输消息来自Kafka的消息(不是立即10分钟)

我们有批处理来使用S / Spark执行更新/插入操作。但现在我们有用例让这更实时。以下是配置和我的方法。但它没有用。 ...

回答 1 投票 0

如何在结构化流中获取DataFrame?

我想从MQTT接收JSON字符串并将它们解析为DataFrames df。我该怎么做?这是我发送到MQTT队列以便在Spark中处理的Json消息的示例:{“id”:1,“...

回答 2 投票 -1

在spark结构化流中动态更改hdfs写入路径

我有一个Spark结构流应用程序,它从kafka读取数据并将其写入hdfs。我想根据当前日期动态更改hdfs写入路径,但它似乎是结构化的......

回答 1 投票 0

如何使用完全形成的SQL与spark结构化流

Spark结构化流媒体的文档说 - 从spark 2.3开始,可用于静态DataFrame / DataSet的spark上下文的所有方法也可用于结构化流...

回答 1 投票 0

检查点目录下的子目录,用于spark结构化流

spark结构化流的检查点目录创建了四个子目录。它们各有什么用? / warehouse / test_topic / checkpointdir1 / commits / warehouse / test_topic / checkpointdir1 / ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.