PyFlink 窗口聚合未触发

问题描述 投票:0回答:1

我有一个问题,我的窗口聚合累积了所有结果,但不返回它,并且我的结果流为空 我怀疑这与窗口触发有关,但不知道如何

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)


class MyTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, element, previous_element_timestamp):
        date_str = element[0].strftime('%Y-%m-%d')
        # print(element,datetime.strptime(date_str, '%Y-%m-%d').timestamp())
        return datetime.strptime(date_str, '%Y-%m-%d').timestamp()


joined_data_stream = joined_data_stream.assign_timestamps_and_watermarks(
    WatermarkStrategy
    .for_bounded_out_of_orderness(Duration.of_days(1))
    .with_timestamp_assigner(MyTimestampAssigner())
)


keyed_stream = joined_data_stream.key_by(lambda x: x[1])


class AverageAggregate(functions.AggregateFunction):

    def create_accumulator(self) -> [int, int]:
        return 0, 0
    def add(self, value, accumulator):
        print('ADDING')
        return accumulator[0]+value[3] , accumulator[1] + 1
    def merge(self, a: [int, int], b: [int, int]) -> [int, int]:
        print('MERGING')
        return a[0] + b[0], a[1] + b[1]
    def get_result(self, accumulator):
        print('GETTING',accumulator[0] / accumulator[1])
        return accumulator[0] / accumulator[1]


avg_stream = joined_data_stream.key_by(lambda x: x[1]).window(SlidingEventTimeWindows.of(Time.days(2),Time.days(1))) \
    .aggregate(AverageAggregate(),accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),output_type=Types.DOUBLE())

这是我现在拥有的所有代码,将不胜感激任何给出的提示

更新 问题是我的时间戳分配,替换这个

return datetime.strptime(date_str, '%Y-%m-%d').timestamp()

有了这个

return int(datetime.strptime(date_str, '%Y-%m-%d').timestamp()*1000)

全部修复

python flink-streaming data-stream pyflink
1个回答
0
投票

很高兴看到您找到了问题的解决方案, 如果您仍然不知道它为什么起作用,那是因为当您调用

.timestamp()
时,它会返回以秒为单位的时间戳 例如

>>> datetime.datetime.strptime('2024-01-01', '%Y-%m-%d').timestamp()
1704060000.0

而 flink 需要 ms,这就是为什么它在乘以 1000 后运行良好

© www.soinside.com 2019 - 2024. All rights reserved.