Flink 与 Kafka Source 和 Iceberg Sink 不写

问题描述 投票:0回答:2

我尝试使用 Flink 从 Kafka 读取数据,将 Protobuf 事件转换为 Json 字符串并将其写入 Iceberg 中的表中。

我是按照官方文档编写的代码,但我一定错过了一些东西。 该作业成功连接到 Kafka,我可以看到它生成日志并解析事件,但它永远不会写入磁盘。 然而,它确实连接到了 Iceberg,并且似乎形成了一张空桌子。 我错过了什么,为什么接收器从未真正被触发?



public class App {


    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    public static void main(String[] args) throws Exception {

        // Check if the S3 endpoint IP argument is provided
        if (args.length < 1) {
            throw new IllegalArgumentException("Please provide the S3 endpoint IP as an argument");
        }
        String s3EndpointIp = args[0];

        LOG.info("JOB Env setup");

        // Set up the Flink environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
                env,
                EnvironmentSettings.newInstance().inStreamingMode().build());

        // create the Nessie catalog
        tableEnv.executeSql(
                String.format(
                        "CREATE CATALOG iceberg WITH ("
                                + "'type'='iceberg',"
                                + "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
                                + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
                                + "'uri'='http://catalog:19120/api/v1',"
                                + "'authentication.type'='none',"
                                + "'ref'='main',"
                                + "'client.assume-role.region'='local',"
                                + "'warehouse' = 's3://warehouse',"
                                + "'s3.endpoint'='http://%s:9000'"
                                + ")",
                        s3EndpointIp));

        // Set the current catalog to the new catalog
        tableEnv.useCatalog("iceberg");

        // Create a database in the current catalog
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db");

        // create the table
        tableEnv.executeSql(
                "CREATE TABLE IF NOT EXISTS db.table1 ("
                        + "data STRING"
                        + ")");

        LOG.info("JOB Kafka setup");

        KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource
                .<String>builder()
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("sasl.mechanism", "SCRAM-SHA-256")
                .setProperty("ssl.truststore.location", "./jks/truststore.jks")
                .setProperty("ssl.truststore.password", "strong-password")
                .setProperty("sasl.jaas.config",
                        "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                                "username=\"username\" " +
                                "password=\"strong-password\";")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setBootstrapServers("my-dns:my-port")
                .setGroupId("datalake-flink-java")
                .setTopics(Arrays.asList("my-kafka-topic"))
                .setDeserializer(new EventDeserializationSchema()); // missing
        KafkaSource<String> kafkaSource = kafkaSourceBuilder.build();


        DataStream<String> processedStream = env
            .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
            .map(new MapFunction<String, String>() {
                @Override
                public String map(String value) {
                    LOG.info("Mapping value");
                    return value;
                }
            });

        FileSink<String> sink = FileSink
            .forRowFormat(new Path("db.table1"), new SimpleStringEncoder<String>("UTF-8"))
            .withRollingPolicy(
                DefaultRollingPolicy.builder()
                    .withRolloverInterval(Duration.ofMinutes(5))
                    .withInactivityInterval(Duration.ofMinutes(2))
                    .withMaxPartSize(MemorySize.ofMebiBytes(256))
                    .build())
            .build();

        processedStream.sinkTo(sink);

        env.execute("Kafka to Iceberg job");

    }


    public static class EventDeserializationSchema implements KafkaRecordDeserializationSchema<String> {

        @Override
        public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> collector) throws IOException {
            try {
                ResponseEnrichedEvent parsed = ResponseEnrichedEvent.parseFrom(record.value());
                LOG.info("About to deserialize Kafka record");
                String jsonString = JsonFormat.printer().print(parsed);
                collector.collect(jsonString);
                LOG.info("Successfully to deserialized Kafka record");
            } catch (InvalidProtocolBufferException e) {
                LOG.error("Failed to deserialize Kafka record", e);
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
    
}

console

datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - Mapping value
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - Successfully to deserialized Kafka record
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - About to deserialize Kafka record
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - Mapping value
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,926 INFO  App                                              [] - Successfully to deserialized Kafka record

我也看到创建了源,但没有创建接收器。

我尝试重写代码,使接收器位于单独的类中并且仅使用 SQL,但我似乎最终得到了类似的结果。

apache-flink flink-streaming flink-sql iceberg apache-iceberg
2个回答
0
投票

我设法通过稍微改变我的方法来使其工作,我按照评论建议添加了检查点,然后我使用直接通过

tableEnv
创建的
tableEnv.fromDataStream()
进行写入,而不是使用 FileSink。
tableEnv
已配置为直接使用 Iceberg。

但是,我确实看到

tableEnv.fromDataStream
被认为在 1.17+ 版本中已弃用,因此可以找到更面向未来的解决方案。

所以在初始化期间我做了:

// Set up the Flink environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
                env,
                EnvironmentSettings.newInstance().inStreamingMode().build());

然后我没有使用 FileSink 而是这样做了:


        // Convert DataStream to Table
        org.apache.flink.table.api.Table processedTable = tableEnv.fromDataStream(processedStream, $("data"));


        // Write Table to Iceberg
        processedTable.executeInsert("db.table1");

0
投票

您的代码正确创建了 Iceberg 表,但您尝试使用

FileSink
写入数据(我认为这是
org.apache.flink.connector.file.sink.FileSink
)。

如果您的目标是写入 Iceberg 表,则应该使用

org.apache.iceberg.flink.sink.FlinkSink
,如下所述:https://iceberg.apache.org/docs/latest/flink/#writing

例如:

FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();

或者,正如您已经发现的,您可以使用 FlinkSQL 来写入数据:

processedTable.executeInsert("db.table1");

或者与实际的SQL命令相同:

INSERT INTO `hive_catalog`.`db`.`table1` SELECT id, data from other_kafka_table;
© www.soinside.com 2019 - 2024. All rights reserved.