flink-streaming 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。

Flink Kafka:期望类型是 PojoTypeInfo

我的客户类已经使用 maven-avro 插件创建。当我尝试运行这个程序时,我收到错误作为线程“主”java.lang.IllegalStateException 中的异常:期待类型...

回答 3 投票 0

Flink 表获取类型信息

我有一个 flink 表,假设 CREATE TABLE source(id int, name string) with (...) 和目标表,假设 CREATE TABLE destination(id int, unique_name string) with (...)。 unique_name 是

回答 2 投票 0

Flink Streaming - 如何安排数据流在 X 分钟后重新处理?

我有事件的输入数据流, 在处理它们时,我想参考一些要再次重新处理的事件 几分钟后。 有没有办法实现它? 这是一个简化的

回答 1 投票 0

我们可以使用 Flink REST API 和 Flink“应用程序”部署模式吗?

我首先声明了一个使用 Flink“应用程序”模式的工作,然后尝试使用 Flink REST API 在该集群上上传一个 jar。上传 jar API 返回 404 Not Found 错误。 F是真的吗...

回答 1 投票 0

无法通过使用 SampleClass 的字段作为参数在类型为 <SampleClass> 的 Flink 窗口流上执行“maxBy”不起作用

假设这是我的示例流: SingleOutputStreamOperator> sampleStream = previousStream .keyBy(值 -> v...

回答 0 投票 0

将 AsyncDataStream 和 RichAsyncFunction 与 SingleOutputStreamOperator 结合使用

我正在使用表单中的 SingleOutputStreamOperator 对象聚合键控流 stream = env.fromSource(...)...sideOutput(...).window(...).aggregate(...) 获得聚合后,我 ...

回答 0 投票 0

Flink RichSinkFunction 构造函数 VS open()

假设我需要使用 RichSinkFunction 实现自定义接收器,并且我需要一些变量,例如接收器中的 DBConnection。我应该在哪里初始化DBConnection?我看到的大部分文章都在...

回答 2 投票 0

在 Flink 管道中高效处理 JSON 消息

我今天正在为我正在从事的 Flink 工作寻求一些建议(并希望是一个解决方案)。作业本身从 Kafka 主题读取 JSON 字符串并将其读入 JsonObject inst...

回答 0 投票 0

在并行 Flink DataStream 中应用多个过滤器

我想运行一个 Flink Streaming 应用程序,它适用于一次读取多次写入类比。基本上,我想从 firehose 读取,并行应用不同的过滤器以获取 reach 记录

回答 2 投票 0

在下沉到 Kafka 主题之前调用 DataStream 上的 forward()

环境 弗林克 = v1.14.0 设想 来自 Kafka 的源数据。 通过 HTTP 获取额外数据并广播它。 通过 CoProcessFunction 合并来自第一个和第二个数据源的数据 通过 KafkaSin 下沉到 Kafka...

回答 0 投票 0

如何在 flink cdc 中保持 sink 顺序

想用flink cdc消费更新后的mysql数据,想sink到其他表,不知道是否保持更新顺序。例如: update1:更新表 set id=2 w...

回答 0 投票 0

Flink Datastream API:聚合固定时间点之间的延迟流

我有一个 JSON 格式的高频数据流,具有以下架构。 unique_row_id:字符串, 用户:字符串, session_id:字符串, session_start:日期时间, session_end:日期时间, 数量:

回答 1 投票 0

我怎样才能实现一个复杂的 flink 模式,从单流到多流,然后加入回单流

我的问题是你将如何根据下图实现flink streaming ... 这是步骤 我在 kafka 中获取流(流中的每个消息都有一个 id)... 我在一个过程中处理流

回答 1 投票 0

java.util.ServiceConfigurationError: io.grpc.NameResolverProvider: Provider io.grpc.netty.shaded.io.grpc.netty.UdsNameResolverProvider 不是子类型

在数据处理集群上提交 flink 作业时获取 java.util.ServiceConfigurationError: io.grpc.NameResolverProvider: Provider io.grpc.netty.shaded.io.grpc.netty.UdsNameResolverProvide...

回答 0 投票 0

Apache Flink 1.14 - StreamingFileSink 没有将所有文件复制到 S3

StreamingFileSink 没有将所有文件复制到 S3。它通过复制文件正确开始,直到文件计数等于并行度,然后停止复制。我也试过 FileSink,但我得到了 s...

回答 0 投票 0

如何端到端地测试 flink 作业的定时器行为

有点类似这个问题:testing flink jobs with MiniCluster to trigger the timer using processing time 我写了一个 Flink 作业图,并在 KeyedProcessStream 中定义了一个计时器...

回答 0 投票 0

无法在pyflink中测试字数统计程序

我的核心逻辑如下: 从 pyflink.datastream.stream_execution_environment 导入 StreamExecutionEnvironment,RuntimeExecutionMode 从 pyflink.common 导入类型 字数统计类(

回答 0 投票 0

io.grpc.StatusRuntimeException:DEADLINE_EXCEEDED:截止日期在操作完成之前已过期

在 dataproc 集群中,我正在提交从 Pubsub 读取数据的 Flink 作业。 当我执行 flink 作业时,出现以下错误。 该程序完成了以下例外...

回答 0 投票 0

Flink文件读取堆内存异常

我在flink中读取一个文件为: val avroInputFormat = new AvroInputFormat[GenericRecord](new org.apache.flink.core.fs.Path(url), classOf[GenericRecord]) env.createInput(...)

回答 1 投票 0

Apache flink。RocksDB后端从保存点进行懒惰加载

我们想使用Apache Flink与RocksDB后端(HDFS)进行有状态的流处理。然而,我们的应用状态(键控状态)将以TB为单位。据我所知,当我们 ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.