使用 Flink (Pyflink) 聚合后数据未发送到 Kafka 主题

问题描述 投票:0回答:1

我正在研究一个简单的数据聚合示例。它只是按记录 ID 进行分组并计算 1 秒滚动窗口内的平均值。

  1. 使用 flink faker 连接器和 Table api 生成源数据,
  2. 将其转换为数据流,同时分配时间戳和水印
  3. 执行进程窗口函数进行聚合,并且
  4. 沉入 Kafka 主题。

它一直有效到步骤 3,但记录不会发送到 Kafka 主题。可能与水印分配有关?

您能告诉我如何解决吗?附件是作业状态屏幕截图和源脚本。

Pyflink 应用程序

import os
import time
from typing import Iterable, Tuple

from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import DataStream
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.window import TumblingEventTimeWindows, Time
from pyflink.datastream.functions import ProcessWindowFunction
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.connectors.kafka import (
    KafkaSink,
    KafkaRecordSerializationSchema,
    DeliveryGuarantee,
)
from pyflink.datastream.formats.json import JsonRowSerializationSchema

from models import SensorReading

RUNTIME_ENV = os.getenv("RUNTIME_ENV", "local")
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:29092")


class AggreteProcessWindowFunction(ProcessWindowFunction):
    def process(
        self, key: str, context: ProcessWindowFunction.Context, elements: Iterable[Tuple[int, int]]
    ) -> Iterable[Row]:
        id, count, temperature = SensorReading.process_elements(elements)
        yield Row(id=id, timestamp=context.window().end, temperature=round(temperature / count, 2))


def define_workflow(source_stream: DataStream):
    sensor_stream = (
        source_stream.key_by(lambda e: e[0])
        .window(TumblingEventTimeWindows.of(Time.seconds(1)))
        .process(AggreteProcessWindowFunction())
    )
    return sensor_stream


if __name__ == "__main__":
    """
    ## local execution
    python src/c01_sensor_reading.py

    ## cluster execution
    docker exec jobmanager /opt/flink/bin/flink run \
        --python /tmp/src/c01_sensor_reading.py \
        --pyFiles file:///tmp/src/models.py \
        -d
    """

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    if RUNTIME_ENV == "local":
        CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
        jar_files = ["flink-faker-0.5.3.jar", "flink-sql-connector-kafka-1.17.1.jar"]
        jar_paths = tuple(
            [f"file://{os.path.join(CURRENT_DIR, 'jars', name)}" for name in jar_files]
        )
        print(jar_paths)
        env.add_jars(*jar_paths)

    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    t_env.execute_sql(
        """
        CREATE TABLE sensor_source (
            `id`      INT,
            `rand`    INT
        )
        WITH (
            'connector' = 'faker',
            'rows-per-second' = '10',
            'fields.id.expression' = '#{number.numberBetween ''0'',''20''}',
            'fields.rand.expression' = '#{number.numberBetween ''0'',''100''}'
        );
        """
    )

    class DefaultTimestampAssigner(TimestampAssigner):
        def extract_timestamp(self, value, record_timestamp):
            return int(time.time_ns() / 1000000)

    source_stream = t_env.to_append_stream(
        t_env.from_path("sensor_source"), Types.TUPLE([Types.INT(), Types.INT()])
    ).assign_timestamps_and_watermarks(
        WatermarkStrategy.no_watermarks().with_timestamp_assigner(DefaultTimestampAssigner())
    )

    sensor_sink = (
        KafkaSink.builder()
        .set_bootstrap_servers(BOOTSTRAP_SERVERS)
        .set_record_serializer(
            KafkaRecordSerializationSchema.builder()
            .set_topic("sensor-reading")
            .set_key_serialization_schema(
                JsonRowSerializationSchema.builder()
                .with_type_info(SensorReading.get_key_type())
                .build()
            )
            .set_value_serialization_schema(
                JsonRowSerializationSchema.builder()
                .with_type_info(SensorReading.get_value_type())
                .build()
            )
            .build()
        )
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build()
    )

    # define_workflow(source_stream).print() # it works!
    define_workflow(source_stream).sink_to(sensor_sink).name("sensor_sink").uid("sensor_sink")

    env.execute()

数据模型

import dataclasses
from typing import Iterable, Tuple

from pyflink.common.typeinfo import Types


@dataclasses.dataclass
class SensorReading:
    id: str
    timestamp: int
    temperature: float

    @staticmethod
    def process_elements(elements: Iterable[Tuple[int, int]]):
        id, count, temperature = None, 0, 0
        for e in elements:
            next_id = f"sensor_{e[0]}"
            if id is not None:
                assert id == next_id
            id = next_id
            count += 1
            temperature += e[1] / 100 * 20
        return id, count, temperature

    @staticmethod
    def get_key_type():
        return Types.ROW_NAMED(
            field_names=["id"],
            field_types=[Types.STRING()],
        )

    @staticmethod
    def get_value_type():
        return Types.ROW_NAMED(
            field_names=["id", "timestamp", "temperature"],
            field_types=[Types.STRING(), Types.BIG_INT(), Types.DOUBLE()],
        )
apache-kafka apache-flink pyflink
1个回答
0
投票

您正在使用事件时间窗口,这需要水印。您必须指定真正的 WatermarkStrategy,而不是

WatermarkStrategy.no_watermarks()

看起来事件应该按顺序(按时间)排列,因此您可以使用

WatermarkStrategy.for_monotonous_timestamps()
代替。

© www.soinside.com 2019 - 2024. All rights reserved.