CREATE TABLE test
(
_id STRING PRIMARY KEY NOT ENFORCED,
a INT
) WITH (
'connector' = 'mongodb',
'uri' = 'mongodb://user:password@database:27017',
'database' = 'database',
'collection' = 'test'
);
INSERT INTO test
VALUES ('id',
(SELECT bar
FROM (SELECT 1 AS foo)
LEFT JOIN (SELECT 2 AS bar)
ON foo = bar));
MongoDB 中期望的结果:
{ "_id" : "id"}
实际:
{ "_id" : "id", "a" : null }
是否可以仅使用 Flink SQL 不实现具有 NULL 值的字段,还是必须使用数据流 API?
我在文档中找不到任何选项来更改此行为。
以下是如何使用 DataStream API 实现此功能的基本示例
DataStream<YourCustomType> stream = // your data stream source
stream.map(new MapFunction<YourCustomType, YourCustomType>() {
@Override
public YourCustomType map(YourCustomType value) throws Exception {
if (value.getYourField() == null) {
value.setYourField(/* some default value or just leave it unset */);
}
return value;
}
}).addSink(new MongoDBSinkFunction<>()); // your custom MongoDB sink
在此代码片段中,YourCustomType 将是一个表示您正在使用的数据结构的类,您将在映射函数内实现逻辑以根据需要修改或删除 NULL 字段。