我想不断更新名为
value
的文档字段
{'_id': 'count', 'value':0}
一定数量。
我的 MongoSinkConnector 有
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
我正在使用 python 脚本向适当的主题生成消息
self._aio_producer.produce(
topic='mongo',
value=json.dumps(
{
"_id":"count",
"$inc":{"value":len(task['payload'].split(','))}
}
)
)
但是我在 Kafka Connect 独立进程上收到此错误:
Failed to put into the sink the following records: [SinkRecord{kafkaOffset=8174, timestampType=CreateTime} ConnectRecord{topic='mongo', kafkaPartition=1, key=null, keySchema=null, value={_id=count, $inc={value=1}}, valueSchema=null, timestamp=1697153679938, headers=ConnectHeaders(headers=)}]
(com.mongodb.kafka.connect.sink.MongoSinkTask:244)
com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=52, message=The dollar ($) prefixed field '$inc' in '$inc' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., details={}
我尝试删除
$inc
部分,但它似乎只是一遍又一遍地替换文档而不增加值。有什么方法可以增加值还是我必须编写自己的自定义类?
一般来说,你的代码应该可以正常工作。这里相当于
mongosh
(JavaScript):
task = { payload: 'a,b,c' }
db.collection.insertOne({ _id: 'count', value: 0 })
db.collection.updateOne(
{ _id: 'count' },
{ $inc: { value: task['payload'].split(',').length } }
)
db.collection.findOne()
=> { _id: 'count', value: 3 }
看看Kafka文档,可能不支持
$inc
运算符。