我正在学习快照机制在Flink中的工作方式。
据我了解,JobManager将以固定的间隔将障碍插入每个数据源,并且每个操作员一旦从其所有数据源接收到第n个障碍,都将对其进行快照。
如果我是对的,似乎这种机制在某些情况下可能会使用越来越多的内存。
这里是一个例子:
说有两个数据源:Source 1
和Source 2
,以及一个运算符。
Source 1 -----\
------ Operator
Source 2 -----/
[Source 1
正在生成整数流:1、2、3、4、5 ...
[Source 2
正在生成字符流:a,b,c,d,e ...
[操作员执行此操作:它需要Source 1
的两个输入和Source 2
的一个输入来生成输出:1a2、3b4、5c6、7d8 ...
假设JobManager将障碍插入两个数据源,如下所示:
1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
现在开始。
[Source 1
和Source 2
的两个“ BARRIER A”进入操作员时,Flink将为操作员创建快照,其当前状态为1
和a
,因为1
和[ a
已在BARRIER A输入操作员时进入操作员。
然后,当两个“ BARRIER B”进入操作员时,操作员已完成其第一个任务:生成1a2
,Flink将创建另一个快照:NA
,b
。 NA
表示当前没有来自Source 1
的新输入。
同时,每个快照将存储到RAM,FS或RocksDB(取决于我们如何配置Flink)。
如果我是对的,我认为Flink在此示例中将生成越来越多的快照。因为Source 1
的消耗速度始终是Source 2
的速度的两倍。
我误会了吗?
有趣的思想实验。
如果只限于使用Flink API的标准部分,则无法实现一种用户功能,该功能将从源2读取的每个输入都从源1读取两个输入。例如,在实现CoProcessFunction
时,您将受Flink运行时的支配,Flink运行时将根据其内部逻辑从任一流提供事件。这两个流将彼此竞争,可能在不同的线程甚至不同的进程中运行。当流聚合时,如果未按您希望的顺序提供来自两个输入的事件,则必须将它们缓冲在Flink状态,直到准备好处理它们为止。
[这可能会导致大量缓冲需求的常见情况是,在实施事件时间联接时,其中一个流的时间戳要远远领先于其他流(例如,以外汇汇率加入金融交易,使用交易所交易时有效的汇率(如果汇率流落后)。但是这种缓冲可以在RocksDB中完成,而不必给内存施加压力。
请注意,此状态缓冲完全在您的应用程序中发生-Flink没有灵活的网络缓冲区,这些缓冲区可能在反压期间膨胀。
另一点是,快照永远不会存储在本地文件系统或RocksDB中。如果选择使用RocksDB状态后端,则每个任务管理器的活动工作状态将存储在本地RocksDB实例中,但是状态备份(快照)将存储在分布式文件系统中。
关于您所描述的情况,
1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
这不会发生。没有什么办法安排这两个源以这种方式同步-它们将比该图建议的独立得多。由于Flink在流水线级之间只有少量固定的网络缓冲,因此在执行图中出现的任何背压都将迅速传播回一个或两个源。发生这种情况时,背压源将无法将任何事件推送到管道中,直到背压缓解为止-但与此同时,另一个源可能正在继续取得进展。屏障将由两个源同时大致独立地插入到两个流中,但是如果源2经常遇到背压(例如),则看起来可能更像这样:
1, BARRIER, A, 2, B, 3, BARRIER, C, 4, D, BARRIER, 5 ...
a, BARRIER, A, BARRIER, b, B, BARRIER, BARRIER, c ...