如何使用 UpdateOneTimestamps writemodel 策略增加字段

问题描述 投票:0回答:1

我想不断更新名为

value
的文档字段
{'_id': 'count', 'value':0}
一定数量。

我的 MongoSinkConnector 有

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

我正在使用 python 脚本向适当的主题生成消息

        self._aio_producer.produce(
            topic='mongo',
            value=json.dumps(
                {
                    "_id":"count",
                    "$inc":{"value":len(task['payload'].split(','))}
                 }
            )
        )

但是我在 Kafka Connect 独立进程上收到此错误:

Failed to put into the sink the following records: [SinkRecord{kafkaOffset=8174, timestampType=CreateTime} ConnectRecord{topic='mongo', kafkaPartition=1, key=null, keySchema=null, value={_id=count, $inc={value=1}}, valueSchema=null, timestamp=1697153679938, headers=ConnectHeaders(headers=)}] 
(com.mongodb.kafka.connect.sink.MongoSinkTask:244) 
com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=52, message=The dollar ($) prefixed field '$inc' in '$inc' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., details={} 

我尝试删除

$inc
部分,但它似乎只是一遍又一遍地替换文档而不增加值。有什么方法可以增加值还是我必须编写自己的自定义类?

mongodb apache-kafka-connect mongodb-kafka-connector
1个回答
0
投票

一般来说,你的代码应该可以正常工作。这里相当于

mongosh
(JavaScript):

task = { payload: 'a,b,c' }
db.collection.insertOne({ _id: 'count', value: 0 })

db.collection.updateOne(
   { _id: 'count' },
   { $inc: { value: task['payload'].split(',').length } }
)

db.collection.findOne()
=> { _id: 'count', value: 3 }

看看Kafka文档,可能不支持

$inc
运算符。

© www.soinside.com 2019 - 2024. All rights reserved.