Avien OpenSearch Sink Connector:引发版本冲突

问题描述 投票:0回答:1

我们使用 Avien OpenSearch Kafka Sink 连接器将文档从 Kafka Topic 移动到 OpenSearch。但已经多次看到下面的版本冲突消息(20-30/10 分钟)。

[2023-12-07 04:29:27,381] WARN Encountered a version conflict when executing batch 1426884 of 25 records. Ignoring and will keep an existing record. Error was [index_name/AcAS_NB7SsuIL9t59d8zhg][[topic_name][1]] OpenSearchException[OpenSearch exception [type=version_conflict_engine_exception, reason=[64ae7de6aceb2382704dd17f]: version conflict, current version [5984159] is higher or equal to the one provided [5984159]]] (io.aiven.kafka.connect.opensearch.BulkProcessor)

注意:两个版本(旧版和新版)是相同的。

这是接收器连接器配置

{
    "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
    "producer.override.buffer.memory": "3145728",
    "behavior.on.null.values": "delete",
    "connection.password": "pass",
    "index.write.method": "upsert",
    "transforms.v2_rf.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.v2_rf.renames": "_id:oid",
    "tasks.max": "3",
    "transforms": "v1_ex,v2_rf",
    "key.ignore": "false",
    "connection.compression": "true",
    "retry.backoff.ms": "400",
    "transforms.v1_ex.field": "fullDocument",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "read.timeout.ms": "15000",
    "behavior.on.version.conflict": "warn",
    "topics": "topic_name",
    "transforms.v1_ex.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "batch.size": "100",
    "connection.username": "user",
    "max.in.flight.requests": "1",
    "key.ignore.id.strategy": "record.key",
    "schema.ignore": "true",
    "key.converter.schemas.enable": "false",
    "flush.timeout.ms": "180000",
    "name": "connector_name",
    "value.converter.schemas.enable": "false",
    "connection.url": "<connection url>"
}

是的,我们需要索引写入方法为

upsert

最初我们认为这是一个吞吐量问题,OpenSearch 无法匹配 Sink 连接器推送数据的速度。因此无法在特定时间内(

retry.backoff.ms
时间内)回复。所以,

  • 我们将传输中的消息数量从 5 条限制为 1 条,但没有帮助。
  • 尝试将
    retry.backoff.ms
    从 100 毫秒增加到 400 毫秒,但没有任何作用。

然后我们在 Sink Connector 上启用 DEBUG 日志,发现 OpenSearch 的响应在几毫秒内。下面显示了一个此类请求花费了 3 毫秒。

[2023-12-07 04:52:50,402] DEBUG http-outgoing-175 << "{"took":3,"errors":true,"items":[{"index":{"_index":"index_name","_id":"64d092db50dc39f7f0920c13","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[64d092db50dc39f7f0920c13]: version conflict, current version [6064623] is higher or equal to the one provided [6064623]","index":"index_name","shard":"0","index_uuid":"AcAS_NB7SsuIL9t59d8zhg"}}},...

然后我们认为任务可能多次读取相同的偏移量,因此我们应该能够在错误发生之前在日志中看到偏移量或冲突的文档 ID 或版本。但没有,我们找不到任何先前提交的实例。 可能版本逻辑混乱,但这似乎是偏移量。这使得相同的文档 id 不太可能出现在不同的分区中但位于相同的偏移量上,因为分区键就是文档 id 本身。

现在,我已经没有想法了,而且没有专门的论坛来讨论这个问题。关于这里可能出现什么问题的任何指示吗?

elasticsearch apache-kafka-connect opensearch amazon-opensearch
1个回答
0
投票

您可以将其添加到将版本冲突行为设置为“更新”或“索引”

"behavior.on.version.conflict": "update" 

更新 - 此选项使用新版本更新 OpenSearch 中的文档,解决版本冲突。
index - 使用此选项,冲突的文档会再次索引,创建具有不同 ID 的新文档以避免版本冲突。

如果您觉得有用,请点赞,因为它有助于扩大我的个人资料。

© www.soinside.com 2019 - 2024. All rights reserved.