Flink SQL:插入 MongoDB 时不要具体化 NULL 字段

问题描述 投票:0回答:1
CREATE TABLE test
(
    _id STRING PRIMARY KEY NOT ENFORCED,
    a INT
) WITH (
      'connector' = 'mongodb',
      'uri' = 'mongodb://user:password@database:27017',
      'database' = 'database',
      'collection' = 'test'
      );

INSERT INTO test
VALUES ('id',
           (SELECT bar
        FROM (SELECT 1 AS foo)
        LEFT JOIN (SELECT 2 AS bar)
        ON foo = bar));

MongoDB 中期望的结果:

{ "_id" : "id"}

实际:

{ "_id" : "id", "a" : null }

是否可以仅使用 Flink SQL 不实现具有 NULL 值的字段,还是必须使用数据流 API?

我在文档中找不到任何选项来更改此行为。

sql mongodb apache-flink flink-sql sql-null
1个回答
0
投票

以下是如何使用 DataStream API 实现此功能的基本示例

DataStream<YourCustomType> stream = // your data stream source

stream.map(new MapFunction<YourCustomType, YourCustomType>() {
@Override
     public YourCustomType map(YourCustomType value) throws Exception {
         if (value.getYourField() == null) {
             value.setYourField(/* some default value or just leave it unset */);
         }
         return value;
     }
}).addSink(new MongoDBSinkFunction<>()); // your custom MongoDB sink

在此代码片段中,YourCustomType 将是一个表示您正在使用的数据结构的类,您将在映射函数内实现逻辑以根据需要修改或删除 NULL 字段。

© www.soinside.com 2019 - 2024. All rights reserved.