flink-streaming 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。

Flink Operator 设计建议

我有一些数据需要处理和聚合,最好避免数据倾斜,但我不确定如何设计能够做到这一点的拓扑。 例如,我的数据如下所示: 结构对象...

回答 0 投票 0

Flink直接消费过滤

我有一个 Flink 消费者,它使用一个 protobuf 结构,只需要几个字段就可以发送给其他运营商。目前,我正在使用一个简单的自定义 deser 模式,它只返回 pb 和...

回答 0 投票 0

Flink RocksDB 自定义选项工厂配置错误禁用块缓存

我正在运行 Flink 1.15.2 并尝试在 RocksDB 中定义一个自定义选项工厂以禁用块缓存。 按照此博客文章中的示例:https://shopify.engineering/optimizing-a...

回答 1 投票 0

Flink 不要用 EventTimeWindows 关闭窗口

为什么这段代码不给出任何东西? 如果我更改为 TumblingProcessingTimeWindows - 一切正常。 我没有在文档中找到我必须添加的其他内容?触发器?驱逐者? 允许延迟...

回答 2 投票 0

微型集群上的 Flink 指标

我希望将石墨指标添加到我的 flink 微型集群中,但我很困惑如何开始这个,因为我能找到的大部分文档都是关于通过配置向集群添加指标的......

回答 0 投票 0

EventTimeWindows 的 Flink 最低要求

使用 EventTimeWindows 的最低要求是什么? 为什么这段代码不给出任何东西? 如果我更改为 TumblingProcessingTimeWindows - 一切正常。 我没有在文档中找到什么

回答 0 投票 0

Flink KafkaConsumer / KafkaSource with AZ Awareness

我想创建一个具有 KafkaSource 的 Flink 流应用程序。 KafkaSource 应该连接到与创建连接的任务管理器位于同一 AZ 中的代理。 我怎样才能达到...

回答 0 投票 0

如何从程序中停止 flink 流作业

我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用 FlinkKafkaProducer09 和 FlinkKafkaConsumer09 从同一 kafka 主题读取数据...

回答 5 投票 0

Flink StreamingFileSink - ParquetAvroWriters

我正在使用 Flink - 流式文件接收器来写入传入数据 S3 存储桶。我的代码与 forRowFormat 选项完美配合。 现在我正在尝试设置 forBulkFormat 选项以在 parquet 中写入数据 ...

回答 2 投票 0

Flink,如何避免聚合窗口末尾的尖峰

我有一个 flink 作业 (1.14),每隔几个小时计算一次窗口聚合。我发现所有事件都有相同的窗口范围,因此数百万个聚合结果都在 ...

回答 0 投票 0

PyFlink 模块 java.base 不会对未命名模块“打开 java.lang”

我想运行 Flink 文档中的简单示例。 开始后我得到异常: 无法使字段 private final byte[] java.lang.String.value 可访问: 模块 java.base 没有“

回答 0 投票 0

在 apache flink 中,我们应该更新每个收集还是每个输入的状态?

想象一个案例,输入是一个文件名,我们想使用 flink RichFlatMapFunction 更新文件的状态和输出行(每个文件包含 10k 行)。我想知道在哪里...

回答 1 投票 0

如何使用 big keyed 在 Flink Apache 上工作?

在我的流应用程序中,它每秒将接收 70k 条数据记录。每条记录都有一个密钥 (FQDN)。 示例记录即将到来的数据: { “unixTime”:1680064946, "FQDN" : "...

回答 0 投票 0

在作业中动态更改 Flink 作业主题

用例是——一个从不断变化的主题列表中读取、处理它们并写入不同的 Kafka 主题的管道。 所以目前有 150 个 Kafka 主题,但这个列表可以改变。 有

回答 1 投票 0

Flink Kafka:期望类型是 PojoTypeInfo

我的客户类已经使用 maven-avro 插件创建。当我尝试运行这个程序时,我收到错误作为线程“主”java.lang.IllegalStateException 中的异常:期待类型...

回答 3 投票 0

Flink 表获取类型信息

我有一个 flink 表,假设 CREATE TABLE source(id int, name string) with (...) 和目标表,假设 CREATE TABLE destination(id int, unique_name string) with (...)。 unique_name 是

回答 2 投票 0

Flink Streaming - 如何安排数据流在 X 分钟后重新处理?

我有事件的输入数据流, 在处理它们时,我想参考一些要再次重新处理的事件 几分钟后。 有没有办法实现它? 这是一个简化的

回答 1 投票 0

我们可以使用 Flink REST API 和 Flink“应用程序”部署模式吗?

我首先声明了一个使用 Flink“应用程序”模式的工作,然后尝试使用 Flink REST API 在该集群上上传一个 jar。上传 jar API 返回 404 Not Found 错误。 F是真的吗...

回答 1 投票 0

无法通过使用 SampleClass 的字段作为参数在类型为 <SampleClass> 的 Flink 窗口流上执行“maxBy”不起作用

假设这是我的示例流: SingleOutputStreamOperator> sampleStream = previousStream .keyBy(值 -> v...

回答 0 投票 0

将 AsyncDataStream 和 RichAsyncFunction 与 SingleOutputStreamOperator 结合使用

我正在使用表单中的 SingleOutputStreamOperator 对象聚合键控流 stream = env.fromSource(...)...sideOutput(...).window(...).aggregate(...) 获得聚合后,我 ...

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.