为什么PyFlink给我过去的时间

问题描述 投票:0回答:1

我有一个 Kafka 主题,每 2-3 秒生成一个条目 然后我有 PyFlink 作业,它将格式化条目并将它们发送到数据库

这是我的 Flink 环境设置

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(3, 10000))
env.set_parallelism(4)
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

kafka_consumer = FlinkKafkaConsumer(
    topics=SOURCE_TOPIC,
    deserialization_schema=deserialization_schema,
    properties=KAFKA_PROPERTIES
)
kafka_consumer.set_start_from_group_offsets()

ds = env.add_source(kafka_consumer, "DataFlowSource")

那么这是出现问题的部分

class FormatData(BroadcastProcessFunction):
    def process_element(self, value, ctx):
        # something happens on value here
        time = ctx.current_processing_time() / 1000
        yield metrics_stream_tag, (time, value)
        yield ("some other information for another table")

现在我确定每个值之间至少有 2 秒 然而,当我查看数据库或打印流时,我看到类似这样的东西

2024-04-30 11:48:16+00  5
2024-04-30 11:48:16+00  7
2024-04-30 11:48:16+00  12

然后它会持续10-25次然后时间就会变成当前时间

2024-04-30 11:50:22+00  5

然后重复

更多背景信息

  1. 当我打印而不是发送到数据库时,我可以看到每个条目之间至少有 2 秒,但时间值不会改变(与上面的想法相同)
  2. 这是我接收metrics_stream_tag的部分
metrics_stream = ds.get_side_output(metrics_stream_tag)
# metrics_stream.add_sink(psql_metrics_sink)
metrics_stream.print()

我尝试使用

datetime.now.timestamp()
但这并没有改变任何东西 我期望的是时间间隔为 2-3 秒,在更改之前不应该卡住 2 分钟

apache-kafka apache-flink pyflink stream-processing
1个回答
0
投票

所以我仍然不知道为什么会这样,但我知道解决方案 这一切都从输出类型开始,我将其设置为

Types.FLOAT()
当我使用
Types.DOUBLE()
Types.STRING()
时,这会导致这些奇怪的转换,而不是它作为魅力,所以如果有人在这里并且他们知道为什么,请告诉我

© www.soinside.com 2019 - 2024. All rights reserved.