Apache Flink:如何为动态表启用“ upsert模式”?

问题描述 投票:3回答:2

我已经看到一些有关动态表的“ upsert模式”,这些动态表基于Flink文档和官方Flink博客中的唯一键。但是,我没有看到有关如何在动态表上启用此模式的任何示例/文档。

示例:

  • Blog post

    通过更新模式在流上定义动态表时,我们可以在表上指定唯一键属性。在那种情况下,针对密钥属性执行更新和删除操作。下图显示了更新模式

  • Documentation

    转换为upsert流的动态表需要(可能是复合的)唯一键

所以我的问题是:

  • 如何在Flink中的动态表上指定唯一键属性?
  • 如何将动态表置于更新/更新/“替换”模式,而不是追加模式?
apache-flink flink-streaming flink-sql
2个回答
4
投票

链接的资源描述了两种不同的情况。

  • blog post讨论升序DataStream -> Table转换。
  • documentation描述逆向上插入Table -> DataStream转换。

以下讨论基于Flink 1.4.0(2018年1月)。

Upsert DataStream -> Table转换

]本机不支持通过在键上进行upsert将DataStream转换为Table,但是在路线图上。同时,您可以使用附加Table和带有用户定义的聚合函数的查询来模拟此行为。

[如果您具有跟踪用户登录的模式Table的追加Logins (user, loginTime, ip),则可以使用以下查询将其转换为键入在Table上的upsert user

SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user

LAST_VAL聚合函数是始终返回最新添加值的user-defined aggregation function

对upsert DataStream -> Table转换的本地支持将以相同的方式工作,尽管提供了更简洁的API。

Upsert Table -> DataStream转换

不支持将Table转换为高位DataStream。这也正确反映在文档中:

请注意,将动态表转换为DataStream时仅支持追加和撤回流。

我们故意选择不支持upsert Table -> DataStream转换,因为只有在关键属性已知的情况下,才能处理upsert DataStream。这些取决于查询,并不总是一目了然。开发人员有责任确保正确解释了关键属性。否则可能会导致程序出错。为避免出现问题,我们决定不提供upsert Table -> DataStream转换。

相反,用户可以将Table转换为撤消DataStream。此外,我们支持UpsertTableSink,可将upsert DataStream写入外部系统,例如数据库或键值存储。


0
投票

Flink 1.8仍然缺乏这种支持。期望将来会增加这些功能:1)LAST_VAL 2)Upsert Stream 动态表。

ps。 LAST_VAL()似乎不可能在UDTF中实现。聚合函数不提供附加的事件/过程时间上下文。阿里巴巴的Blink提供了LAST_VAL的替代实现,但是它需要另一个字段来提供订单信息,而不是直接提供事件/过程时间。这使得SQL代码很难看。 (https://help.aliyun.com/knowledge_detail/62791.html

我的LAST_VAL解决方案(例如,获取最新IP)是:

  1. concat(ts,ip)asordered_ip
  2. MAX(ordered_ip)为ordered_ip
  3. extract(ordered_ip)as ip
© www.soinside.com 2019 - 2024. All rights reserved.