我尝试使用 Flink 从 Kafka 读取数据,将 Protobuf 事件转换为 Json 字符串并将其写入 Iceberg 中的表中。
我是按照官方文档编写的代码,但我一定错过了一些东西。 该作业成功连接到 Kafka,我可以看到它生成日志并解析事件,但它永远不会写入磁盘。 然而,它确实连接到了 Iceberg,并且似乎形成了一张空桌子。 我错过了什么,为什么接收器从未真正被触发?
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
public static void main(String[] args) throws Exception {
// Check if the S3 endpoint IP argument is provided
if (args.length < 1) {
throw new IllegalArgumentException("Please provide the S3 endpoint IP as an argument");
}
String s3EndpointIp = args[0];
LOG.info("JOB Env setup");
// Set up the Flink environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
// create the Nessie catalog
tableEnv.executeSql(
String.format(
"CREATE CATALOG iceberg WITH ("
+ "'type'='iceberg',"
+ "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
+ "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
+ "'uri'='http://catalog:19120/api/v1',"
+ "'authentication.type'='none',"
+ "'ref'='main',"
+ "'client.assume-role.region'='local',"
+ "'warehouse' = 's3://warehouse',"
+ "'s3.endpoint'='http://%s:9000'"
+ ")",
s3EndpointIp));
// Set the current catalog to the new catalog
tableEnv.useCatalog("iceberg");
// Create a database in the current catalog
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db");
// create the table
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS db.table1 ("
+ "data STRING"
+ ")");
LOG.info("JOB Kafka setup");
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource
.<String>builder()
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "SCRAM-SHA-256")
.setProperty("ssl.truststore.location", "./jks/truststore.jks")
.setProperty("ssl.truststore.password", "strong-password")
.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"username\" " +
"password=\"strong-password\";")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBootstrapServers("my-dns:my-port")
.setGroupId("datalake-flink-java")
.setTopics(Arrays.asList("my-kafka-topic"))
.setDeserializer(new EventDeserializationSchema()); // missing
KafkaSource<String> kafkaSource = kafkaSourceBuilder.build();
DataStream<String> processedStream = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
LOG.info("Mapping value");
return value;
}
});
FileSink<String> sink = FileSink
.forRowFormat(new Path("db.table1"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(2))
.withMaxPartSize(MemorySize.ofMebiBytes(256))
.build())
.build();
processedStream.sinkTo(sink);
env.execute("Kafka to Iceberg job");
}
public static class EventDeserializationSchema implements KafkaRecordDeserializationSchema<String> {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> collector) throws IOException {
try {
ResponseEnrichedEvent parsed = ResponseEnrichedEvent.parseFrom(record.value());
LOG.info("About to deserialize Kafka record");
String jsonString = JsonFormat.printer().print(parsed);
collector.collect(jsonString);
LOG.info("Successfully to deserialized Kafka record");
} catch (InvalidProtocolBufferException e) {
LOG.error("Failed to deserialize Kafka record", e);
}
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
}
console
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - Mapping value
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - Successfully to deserialized Kafka record
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - About to deserialize Kafka record
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - Mapping value
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,926 INFO App [] - Successfully to deserialized Kafka record
我也看到创建了源,但没有创建接收器。
我尝试重写代码,使接收器位于单独的类中并且仅使用 SQL,但我似乎最终得到了类似的结果。
我设法通过稍微改变我的方法来使其工作,我按照评论建议添加了检查点,然后我使用直接通过
tableEnv
创建的 tableEnv.fromDataStream()
进行写入,而不是使用 FileSink。 tableEnv
已配置为直接使用 Iceberg。
但是,我确实看到
tableEnv.fromDataStream
被认为在 1.17+ 版本中已弃用,因此可以找到更面向未来的解决方案。
所以在初始化期间我做了:
// Set up the Flink environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
然后我没有使用 FileSink 而是这样做了:
// Convert DataStream to Table
org.apache.flink.table.api.Table processedTable = tableEnv.fromDataStream(processedStream, $("data"));
// Write Table to Iceberg
processedTable.executeInsert("db.table1");
您的代码正确创建了 Iceberg 表,但您尝试使用
FileSink
写入数据(我认为这是 org.apache.flink.connector.file.sink.FileSink
)。
如果您的目标是写入 Iceberg 表,则应该使用
org.apache.iceberg.flink.sink.FlinkSink
,如下所述:https://iceberg.apache.org/docs/latest/flink/#writing
例如:
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();
或者,正如您已经发现的,您可以使用 FlinkSQL 来写入数据:
processedTable.executeInsert("db.table1");
或者与实际的SQL命令相同:
INSERT INTO `hive_catalog`.`db`.`table1` SELECT id, data from other_kafka_table;