我有一个 Kafka 主题,每 2-3 秒生成一个条目 然后我有 PyFlink 作业,它将格式化条目并将它们发送到数据库
这是我的 Flink 环境设置
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(3, 10000))
env.set_parallelism(4)
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
kafka_consumer = FlinkKafkaConsumer(
topics=SOURCE_TOPIC,
deserialization_schema=deserialization_schema,
properties=KAFKA_PROPERTIES
)
kafka_consumer.set_start_from_group_offsets()
ds = env.add_source(kafka_consumer, "DataFlowSource")
那么这是出现问题的部分
class FormatData(BroadcastProcessFunction):
def process_element(self, value, ctx):
# something happens on value here
time = ctx.current_processing_time() / 1000
yield metrics_stream_tag, (time, value)
yield ("some other information for another table")
现在我确定每个值之间至少有 2 秒 然而,当我查看数据库或打印流时,我看到类似这样的东西
2024-04-30 11:48:16+00 5
2024-04-30 11:48:16+00 7
2024-04-30 11:48:16+00 12
然后它会持续10-25次然后时间就会变成当前时间
2024-04-30 11:50:22+00 5
然后重复
更多背景信息
metrics_stream = ds.get_side_output(metrics_stream_tag)
# metrics_stream.add_sink(psql_metrics_sink)
metrics_stream.print()
我尝试使用
datetime.now.timestamp()
但这并没有改变任何东西
我期望的是时间间隔为 2-3 秒,在更改之前不应该卡住 2 分钟
所以我仍然不知道为什么会这样,但我知道解决方案 这一切都从输出类型开始,我将其设置为
Types.FLOAT()
当我使用 Types.DOUBLE()
或 Types.STRING()
时,这会导致这些奇怪的转换,而不是它作为魅力,所以如果有人在这里并且他们知道为什么,请告诉我