我想知道
merge()
上的 AggregateFunction
方法何时被调用。根据我从答案here和here的理解,它仅适用于会话窗口,并且发生在可以与前一个窗口合并的每个事件上,因为会话窗口的每个事件都会创建一个新窗口。我正在使用 PyFlink,希望通过提供示例提供帮助。
让我们举一个我从 AverageAggregate 函数的文档和一些自定义代码中整理出来的示例:
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])
class AverageAggregate(AggregateFunction):
def create_accumulator(self) -> Tuple[int, int]:
return 0, 0
def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) -> Tuple[int, int]:
return accumulator[0] + value[1], accumulator[1] + 1
def get_result(self, accumulator: Tuple[int, int]) -> float:
return accumulator[0] / accumulator[1]
def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]:
return a[0] + b[0], a[1] + b[1]
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
ds = (
data_stream
.assign_timestamps_and_watermarks(watermark_strategy)
.key_by(lambda x: x[0], key_type=Types.STRING())
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(3)))
.aggregate(AverageAggregate())
)
# print the results
ds.print()
# submit for execution
env.execute()
根据我的理解,
merge()
方法应该在第二个事件('hi', 2)
上运行,因为它在3毫秒的窗口大小内,然后再次针对输入('hi', 4)
等等。但是在执行代码时, merge()
方法甚至不会触发一次。因此,如果有人可以修改上面的示例代码以显示 merge()
正在执行并解释其工作原理,将不胜感激。
虽然它不是直接的 PyFlink 示例,但您可以查看 DataStream API 配方:https://docs.immerok.cloud/docs/how-to-guides/development/using-session-windows/#merging- data-in-one-session-window 有关
merge()
方法的信息。
免责声明:我为 Immerok 工作
链接无效。你能提供一个完整的例子吗?非常感谢你