对于 Kafka mongo 接收器连接器,我想映射现有的相同对象 ID

问题描述 投票:0回答:2

我正在使用 mongodb 的 kafka 接收器连接器。这里我想将一些json文档从kafka主题推送到mongodb,但是我在文档中使用$oid时遇到错误。

以下是错误:

{"name":"mongodb-sink-connector","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to write mongodb documents\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:227)\n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)\n\t... 10 more\nCaused by: java.lang.IllegalArgumentException: Invalid BSON field name $oid\n\tat org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:534)\n\tat com.mongodb.internal.connection.BsonWriterDecorator.writeName(BsonWriterDecorator.java:193)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:117)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)\n\tat org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)\n\tat org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)\n\tat com.mongodb.internal.connection.SplittablePayload$WriteRequestEncoder.encode(SplittablePayload.java:221)\n\tat com.mongodb.internal.connection.SplittablePayload$WriteRequestEncoder.encode(SplittablePayload.java:187)\n\tat org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)\n\tat org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)\n\tat com.mongodb.internal.connection.BsonWriterHelper.writeDocument(BsonWriterHelper.java:77)\n\tat com.mongodb.internal.connection.BsonWriterHelper.writePayload(BsonWriterHelper.java:59)\n\tat com.mongodb.internal.connection.CommandMessage.encodeMessageBodyWithMetadata(CommandMessage.java:162)\n\tat com.mongodb.internal.connection.RequestMessage.encode(RequestMessage.java:138)\n\tat com.mongodb.internal.connection.CommandMessage.encode(CommandMessage.java:59)\n\tat com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:268)\n\tat com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100)\n\tat com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490)\n\tat com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)\n\tat com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:253)\n\tat com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.executeCommand(MixedBulkWriteOperation.java:431)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:251)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:76)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:185)\n\tat com.mongodb.internal.operation.OperationHelper.withReleasableConnection(OperationHelper.java:621)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:185)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:76)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:187)\n\tat com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:442)\n\tat com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:422)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:209)\n\t... 13 more\n"}],"type":"sink"}

下面是我在kafka主题中插入的文档:

{"_id": {"$oid": "634fd99b52281517a468f3a7"},"schema": {"type": "struct", "fields": [{"type": "int32","optional": true, "field": "id"}, {"type": "string", "optional": true, "field": "name"}, {"type": "string", "optional": true,  "field": "middel_name"}, {"type": "string", "optional": true, "field": "surname"}],"optional": false, "name": "foobar"},"payload": {"id":45,"name":"mongo","middle_name": "mmp","surname": "kafka"}}
以下是我使用过的连接器设置:

{
  "name": "mongodb-sink-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "migration-mongo",
    "connection.uri": "mongodb://abc:[email protected]:27018,xx.xx.xx.02:27018,xx.xx.xx.03:27018/?authSource=admin&replicaSet=dev",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "document.id.strategy.overwrite.existing": "false",
    "validate.non.null": false,
    "database": "foo",
    "collection": "product"
  }
}
mongodb apache-kafka apache-kafka-connect
2个回答
0
投票

Kafka Connect JSONConverter 负载应具有

schema
payload
字段,而不是
_id
。你需要
"value.converter.schemas.enable": "true"
。如果您将其设置为 false,然后,您可以删除
schema
payload
,然后将
_id
直接放入有效负载中...

Mongo 客户端使用的 ID 通常与 Kafka 记录键 本身相关联,而不是您显示的值部分中嵌入的任何值,但这取决于 ID 策略


0
投票

假设我们有 userId,它作为字符串保存在 Kafka 消息中。 我们如何在 MongoDB Kafka Sink Connector 中将 userId 转换为 ObjectID?

© www.soinside.com 2019 - 2024. All rights reserved.