我有一个问题,我的窗口聚合累积了所有结果,但不返回它,并且我的结果流为空 我怀疑这与窗口触发有关,但不知道如何
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, element, previous_element_timestamp):
date_str = element[0].strftime('%Y-%m-%d')
# print(element,datetime.strptime(date_str, '%Y-%m-%d').timestamp())
return datetime.strptime(date_str, '%Y-%m-%d').timestamp()
joined_data_stream = joined_data_stream.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_days(1))
.with_timestamp_assigner(MyTimestampAssigner())
)
keyed_stream = joined_data_stream.key_by(lambda x: x[1])
class AverageAggregate(functions.AggregateFunction):
def create_accumulator(self) -> [int, int]:
return 0, 0
def add(self, value, accumulator):
print('ADDING')
return accumulator[0]+value[3] , accumulator[1] + 1
def merge(self, a: [int, int], b: [int, int]) -> [int, int]:
print('MERGING')
return a[0] + b[0], a[1] + b[1]
def get_result(self, accumulator):
print('GETTING',accumulator[0] / accumulator[1])
return accumulator[0] / accumulator[1]
avg_stream = joined_data_stream.key_by(lambda x: x[1]).window(SlidingEventTimeWindows.of(Time.days(2),Time.days(1))) \
.aggregate(AverageAggregate(),accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),output_type=Types.DOUBLE())
这是我现在拥有的所有代码,将不胜感激任何给出的提示
更新 问题是我的时间戳分配,替换这个
return datetime.strptime(date_str, '%Y-%m-%d').timestamp()
有了这个
return int(datetime.strptime(date_str, '%Y-%m-%d').timestamp()*1000)
全部修复
很高兴看到您找到了问题的解决方案, 如果您仍然不知道它为什么起作用,那是因为当您调用
.timestamp()
时,它会返回以秒为单位的时间戳
例如
>>> datetime.datetime.strptime('2024-01-01', '%Y-%m-%d').timestamp()
1704060000.0
而 flink 需要 ms,这就是为什么它在乘以 1000 后运行良好