MongoIO Apache Beam GCP 数据流与 Mongo Upsert 管道示例

问题描述 投票:0回答:2

我正在寻找一个示例来实现 Apache Beam GCP 数据流管道,以使用 upsert 操作更新 Mongo DB 中的数据,即如果值存在,则应更新该值,如果不存在,则应插入。

语法如下:

pipeline.apply(...)
.apply(MongoDbIO.write()
.withUri("mongodb://localhost:27017")
.withDatabase("my-database")
.withCollection("my-collection")
.withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
      .withUpdateFields(UpdateField.fieldUpdate("$set", "source-field1", "dest-field1"),
                        UpdateField.fieldUpdate("$set","source-field2", "dest-field2"),
                       //pushes entire input doc to the dest field
                         UpdateField.fullUpdate("$push", "dest-field3") ))); 

下面是我的管道代码,我目前正在准备如下所示的集合后插入文档

{"_id":{"$oid":"619632693261e80017c44145"},"vin":"SATESTCAVA74621","timestamp":"2021-11-18T10:48:59.889Z","key":"EV_CHARGE_NOW_SETTING","value":"DEFAULT"}

现在我想更新 'value' 和 'timestamp' 如果存在 'vin' 和 'key' 组合,如果 'vin' 和 'key' 组合存在不存在,然后使用 upsert 插入新文档。

PCollection<PubsubMessage> pubsubMessagePCollection= pubsubMessagePCollectionMap.get(topic);
            pubsubMessagePCollection.apply("Convert pubsub to kv,k=vin", ParDo.of(new ConvertPubsubToKVFn()))
                .apply("group by vin key",GroupByKey.<String,String>create())
                .apply("filter data for alerts, status and vehicle data", ParDo.of(new filterMessages()))
                .apply("converting message to document type", ParDo.of(
                    new ConvertMessageToDocumentTypeFn(list_of_keys_str, collection, options.getMongoDBHostName(),options.getMongoDBDatabaseName())).withSideInputs(list_of_keys_str))
                .apply(MongoDbIO.write()
                    .withUri(options.getMongoDBHostName())
                    .withDatabase(options.getMongoDBDatabaseName())
                    .withCollection(collection));

现在如果我想使用下面的代码行:

.withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
      .withUpdateFields(UpdateField.fieldUpdate("$set", "source-field1", "dest-field1"),
                        UpdateField.fieldUpdate("$set","source-field2", "dest-field2"),
                       //pushes entire input doc to the dest field
                         UpdateField.fullUpdate("$push", "dest-field3") )));

我的 key1“source-field1”、“dest-field1”“source-field2”、“dest-field2”“dest-field3” 是什么?

我对这个价值观感到困惑。请帮忙!

下面的代码我正在尝试更新

MongoDbIO.write()
.withUri(options.getMongoDBHostName())
.withDatabase(options.getMongoDBDatabaseName())
.withCollection(collection)
.withUpdateConfiguration(UpdateConfiguration.create()
                            .withIsUpsert(true)
                            .withUpdateKey("vin")
                            .withUpdateKey("key")
                            .withUpdateFields(UpdateField.fieldUpdate("$set", "vin", "vin"),
                                              UpdateField.fieldUpdate("$set", "key", "key"),
                                              UpdateField.fieldUpdate("$set", "timestamp", "timestamp"),
                                              UpdateField.fieldUpdate("$set", "value", "value")))

使用上面的代码我的文档没有更新,而是添加了 id = vin ,它应该根据 vin 和密钥匹配的现有记录进行更新,如果插入它应该插入自动生成的 _id 值。

请建议在这里做什么?

mongodb google-cloud-dataflow dataflow upsert apache-beam-io
2个回答
1
投票

upsert配置是从这里读取的,您可以使用withIsUpsert(true)进行配置。

在原始语法中,添加额外的行以启用更新插入。

pipeline.apply(...)
  .apply(MongoDbIO.write()
    .withUri("mongodb://localhost:27017")
    .withDatabase("my-database")
    .withCollection("my-collection")
    .withUpdateConfiguration(
      UpdateConfiguration.create()
        .withIsUpsert(true)
        .withUpdateKey("key1")
        .withUpdateFields(
          UpdateField.fieldUpdate("$set", "source-field1", "dest-field1"),
          UpdateField.fieldUpdate("$set","source-field2", "dest-field2"),
          //pushes entire input doc to the dest field
          UpdateField.fullUpdate("$push", "dest-field3")))); 

0
投票

但是使用这种方法,当插入文档时,它实际上并没有维护 bson 文档中给定的字段序列顺序,而是维护 mongodb 文档中按排序顺序排列的字段。谁能帮助我如何在插入后将给定的字段顺序保留到 mongodb 文档中?

© www.soinside.com 2019 - 2024. All rights reserved.