Flink 中 AggregateFunction 的 merge() 方法

问题描述 投票:0回答:2

我想知道

merge()
上的
AggregateFunction
方法何时被调用。根据我从答案herehere的理解,它仅适用于会话窗口,并且发生在可以与前一个窗口合并的每个事件上,因为会话窗口的每个事件都会创建一个新窗口。我正在使用 PyFlink,希望通过提供示例提供帮助。

让我们举一个我从 AverageAggregate 函数的文档和一些自定义代码中整理出来的示例:

class MyTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, value, record_timestamp) -> int:
        return int(value[1])


class AverageAggregate(AggregateFunction):
 
    def create_accumulator(self) -> Tuple[int, int]:
        return 0, 0

    def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) -> Tuple[int, int]:
        return accumulator[0] + value[1], accumulator[1] + 1

    def get_result(self, accumulator: Tuple[int, int]) -> float:
        return accumulator[0] / accumulator[1]

    def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]:
        return a[0] + b[0], a[1] + b[1]


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # define the source
    data_stream = env.from_collection([
        ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
        type_info=Types.TUPLE([Types.STRING(), Types.INT()]))

    # define the watermark strategy
    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
        .with_timestamp_assigner(MyTimestampAssigner())

    ds = (
        data_stream
        .assign_timestamps_and_watermarks(watermark_strategy)
        .key_by(lambda x: x[0], key_type=Types.STRING())
        .window(EventTimeSessionWindows.with_gap(Time.milliseconds(3)))
        .aggregate(AverageAggregate())
    )

    # print the results
    ds.print()

    # submit for execution
    env.execute()

根据我的理解,

merge()
方法应该在第二个事件
('hi', 2)
上运行,因为它在3毫秒的窗口大小内,然后再次针对输入
('hi', 4)
等等。但是在执行代码时,
merge()
方法甚至不会触发一次。因此,如果有人可以修改上面的示例代码以显示
merge()
正在执行并解释其工作原理,将不胜感激。

apache-flink flink-streaming pyflink
2个回答
0
投票

虽然它不是直接的 PyFlink 示例,但您可以查看 DataStream API 配方:https://docs.immerok.cloud/docs/how-to-guides/development/using-session-windows/#merging- data-in-one-session-window 有关

merge()
方法的信息。

免责声明:我为 Immerok 工作


0
投票

链接无效。你能提供一个完整的例子吗?非常感谢你

© www.soinside.com 2019 - 2024. All rights reserved.