Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
Flink Kafka:期望类型是 PojoTypeInfo
我的客户类已经使用 maven-avro 插件创建。当我尝试运行这个程序时,我收到错误作为线程“主”java.lang.IllegalStateException 中的异常:期待类型...
我有一个 flink 表,假设 CREATE TABLE source(id int, name string) with (...) 和目标表,假设 CREATE TABLE destination(id int, unique_name string) with (...)。 unique_name 是
Flink Streaming - 如何安排数据流在 X 分钟后重新处理?
我有事件的输入数据流, 在处理它们时,我想参考一些要再次重新处理的事件 几分钟后。 有没有办法实现它? 这是一个简化的
我们可以使用 Flink REST API 和 Flink“应用程序”部署模式吗?
我首先声明了一个使用 Flink“应用程序”模式的工作,然后尝试使用 Flink REST API 在该集群上上传一个 jar。上传 jar API 返回 404 Not Found 错误。 F是真的吗...
无法通过使用 SampleClass 的字段作为参数在类型为 <SampleClass> 的 Flink 窗口流上执行“maxBy”不起作用
假设这是我的示例流: SingleOutputStreamOperator> sampleStream = previousStream .keyBy(值 -> v...
将 AsyncDataStream 和 RichAsyncFunction 与 SingleOutputStreamOperator 结合使用
我正在使用表单中的 SingleOutputStreamOperator 对象聚合键控流 stream = env.fromSource(...)...sideOutput(...).window(...).aggregate(...) 获得聚合后,我 ...
Flink RichSinkFunction 构造函数 VS open()
假设我需要使用 RichSinkFunction 实现自定义接收器,并且我需要一些变量,例如接收器中的 DBConnection。我应该在哪里初始化DBConnection?我看到的大部分文章都在...
我今天正在为我正在从事的 Flink 工作寻求一些建议(并希望是一个解决方案)。作业本身从 Kafka 主题读取 JSON 字符串并将其读入 JsonObject inst...
我想运行一个 Flink Streaming 应用程序,它适用于一次读取多次写入类比。基本上,我想从 firehose 读取,并行应用不同的过滤器以获取 reach 记录
在下沉到 Kafka 主题之前调用 DataStream 上的 forward()
环境 弗林克 = v1.14.0 设想 来自 Kafka 的源数据。 通过 HTTP 获取额外数据并广播它。 通过 CoProcessFunction 合并来自第一个和第二个数据源的数据 通过 KafkaSin 下沉到 Kafka...
想用flink cdc消费更新后的mysql数据,想sink到其他表,不知道是否保持更新顺序。例如: update1:更新表 set id=2 w...
Flink Datastream API:聚合固定时间点之间的延迟流
我有一个 JSON 格式的高频数据流,具有以下架构。 unique_row_id:字符串, 用户:字符串, session_id:字符串, session_start:日期时间, session_end:日期时间, 数量:
我怎样才能实现一个复杂的 flink 模式,从单流到多流,然后加入回单流
我的问题是你将如何根据下图实现flink streaming ... 这是步骤 我在 kafka 中获取流(流中的每个消息都有一个 id)... 我在一个过程中处理流
在数据处理集群上提交 flink 作业时获取 java.util.ServiceConfigurationError: io.grpc.NameResolverProvider: Provider io.grpc.netty.shaded.io.grpc.netty.UdsNameResolverProvide...
Apache Flink 1.14 - StreamingFileSink 没有将所有文件复制到 S3
StreamingFileSink 没有将所有文件复制到 S3。它通过复制文件正确开始,直到文件计数等于并行度,然后停止复制。我也试过 FileSink,但我得到了 s...
有点类似这个问题:testing flink jobs with MiniCluster to trigger the timer using processing time 我写了一个 Flink 作业图,并在 KeyedProcessStream 中定义了一个计时器...
我的核心逻辑如下: 从 pyflink.datastream.stream_execution_environment 导入 StreamExecutionEnvironment,RuntimeExecutionMode 从 pyflink.common 导入类型 字数统计类(
io.grpc.StatusRuntimeException:DEADLINE_EXCEEDED:截止日期在操作完成之前已过期
在 dataproc 集群中,我正在提交从 Pubsub 读取数据的 Flink 作业。 当我执行 flink 作业时,出现以下错误。 该程序完成了以下例外...
我在flink中读取一个文件为: val avroInputFormat = new AvroInputFormat[GenericRecord](new org.apache.flink.core.fs.Path(url), classOf[GenericRecord]) env.createInput(...)
Apache flink。RocksDB后端从保存点进行懒惰加载
我们想使用Apache Flink与RocksDB后端(HDFS)进行有状态的流处理。然而,我们的应用状态(键控状态)将以TB为单位。据我所知,当我们 ...