flink-streaming 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。

为什么 Flink 在每次调用测试工具中的 processElement 后都会重置我的状态?

我正在使用 Flink 的 KeyedOneInputStreamOperatorTestHarness 并调用 processElement 两次。 processElement 将更新状态以计算所看到的元素数量。 在这种情况下,拨打电话后

回答 1 投票 0

将相同的方法引用传递给 apache flink 中的过滤器会抛出 classcastException

以下代码抛出 ClasscastException (java.lang.ClassCastException: class java.lang.Integer无法转换为 class java.lang.String ) 最终数据流源 st1 =

回答 1 投票 0

Flink 与 Kafka Source 和 Iceberg Sink 不写

使用Flink我尝试从Kafka读取数据,将Protobuf事件转换为Json字符串并将其写入Iceberg中的表中。 我按照官方文档编写了代码,但我必须有

回答 2 投票 0

Flink Stream 处理处理部分失败并避免重新处理

我有 Flink 流处理应用程序,它从 Pulsar Topic 读取消息流,处理它们并将文件存储在 S3 中。它执行以下操作。 每隔一段时间阅读 Pulsar 主题的消息...

回答 1 投票 0

Flink SQL Streaming - 如何在记录更改不确定的情况下有效地连接表

卡卡主题(输入:table1,table2,输出:table3) Flink SQL 流作业 创建临时视图distinct_table1 AS 选择 * 从(选择*, ROW_NUMBER() OVER(按 id 分区,按change_date d 排序...

回答 1 投票 0

了解水印

我只是想表达我对 BoundedOutOfOrder Watermarks 在 FLINK 以及任何流处理框架中如何工作的理解。 事件处理顺序: 11:00 11:01 11:0...

回答 1 投票 0

在PyFlink中使用SourceFunction和SinkFunction

我是 PyFlink 的新手。我已经用Java完成了官方培训练习:https://github.com/apache/flink-training 然而,我正在进行的项目必须使用Python作为编程语言。我...

回答 2 投票 0

MetricQueryService - 某些指标将不会被报告

升级Flink v.1.16.1(从v.1.13.2)后,我看到以下日志: 2023-03-06 INFO org.apache.flink.runtime.metrics.dump.MetricQueryService [] - 某些指标将不会被报告...

回答 1 投票 0

Jobmanager重启后Flink无法从checkpoint恢复

我正在 Docker-compose 上运行 flink 集群,其中包含 1 个 jobmanager 和 1 个 taskmanager。我通过重启Jobmanager的容器来测试checkpoint的机制。但我发现状态没有恢复

回答 1 投票 0

Flink 流媒体管道未向 Kafka 提交偏移量

我们有多个Flink流应用程序。这些应用程序每 30 秒左右向 Kafka 提交一次偏移量。我们看到 Flink 根本没有向 Kafka 提交偏移量。这会导致不匹配...

回答 1 投票 0

Flink 中的容错

我们如何配置 Flink 应用程序以仅启动/重新启动崩溃的 Pod/(子)任务,而不是重新启动整个作业,即重新启动作业/管道中的所有任务/子任务,包括...

回答 2 投票 0

Apache Flink dataStream.sinkTo() 不接受 KafkaSink<String> 作为参数。正在等待Sink<String, ?, ?, ?>

我是 Apache Flink 的新手。我正在尝试从 Kafka 流式传输数据,在 Flink 上执行某些操作并将数据发布到 Kafka 中的其他某个主题。 下面是添加的依赖项 ...

回答 1 投票 0

使用flink sql join 2 source时如何读取rocksdb状态

我的sql定义为 如果 TABLE_1 不存在则创建表( 标头 VARCHAR NOT NULL, id VARCHAR 不为空, `时间戳` TIMESTAMP_LTZ(3) NULL, 类型 VARCHAR NOT ...

回答 1 投票 0

在 Flink 中为具有接口字段的类实现 TypeInformation

我有一个相当嵌套的数据类型,它通过 Kafka 主题进入 flink 中。 JSON 被反序列化为使用接口的 Java 类层次结构(与 JsonSubTypes 一起,请参阅 https://www.bae...

回答 1 投票 0

Flink 在应用程序重启/错误修复时是否考虑 Kafka 偏移?

我在 Kafka-Flink 应用程序中遇到了一个代码错误,我需要重新部署整个 Flink 应用程序。我知道检查点和保存点,但由于我的应用程序必须重新启动,所以它会...

回答 1 投票 0

Apache Flink Azure ABFS 文件接收器错误(流)- UnsupportedFileSystemException:方案“文件”没有文件系统

我们将 Apache Flink 版本 1.17.1 与 Scala 结合使用。 我们正在尝试将流数据写入 ABFS 文件系统。 请参阅 Scala 中的简单示例代码。 对象简单流{ val 环境 =

回答 1 投票 0

Apache Flink 通过 Jenkins 和 Spinnaker 提交作业时抛出异常

我们正在使用配置为独立 Kubernetes pod 的 Apache Flink 1.16.1,以便我们的应用程序之一从融合的 Kafka 主题中读取数据以进行事件关联。我们正在使用flink的Table AP...

回答 1 投票 0

Flink + RocksDB 需要很长时间才能恢复大型 s3 检查点

我正在运行一个低并行度(4 个插槽)作业,其检查点可能会变得非常大。 在示例中,我将展示检查点为 142 GB,保存在 S3 中,需要 40 分钟才能恢复...

回答 1 投票 0

为什么flink作业的maxparallelism不能在不丢失状态的情况下更新?

我刚刚读到,Flink 作业的最大并行度(由 setMaxParallelism 定义)无法在不丢失状态的情况下更改。这让我有点惊讶,不难想象一个场景......

回答 2 投票 0

如何将数据发送到 Kafka,其中特定于某个键的数据仅在 Flink 流作业中使用 KafkaSink 发送到同一分区?

我有一个要求,只有当数据具有相同的密钥时,我才希望将数据发送到同一分区。 例如: {“field1”:33,“field2”:44,“field3”:55,“唯一...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.