Confluence Kafka Timestamp Converter 值问题

问题描述 投票:0回答:1

我使用 Apache Kafka 和 Confluence Connect v7.3.2 平台以及部署为 Sink Connector 的 MongoDB 连接器,以便将具有 2 个时间戳字段的消息流式传输到集合中。这些值以“yyyy-MM-dd T HH:mm:ss.fffff”(小数点后 5 位数字)的时间格式显示,我想稍后通过过滤时间戳来查询这些记录。

到目前为止,我一直在使用 MongoDB ISODate 数据类型来按日期范围进行过滤,并且 ISODate 数据类型支持毫秒的时间精度(小数点后 3 位数字),因此我在 Sink 连接器中使用了 SMT 时间戳转换器。这是 Sink Connector 的配置文件的样子(忽略注释行):

name=mongo-sink
topics=topic-with-sink
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1

# Message types
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

connection.uri=mongodb://192.168.41.3:27017
database=MarketData
collection=Mycollection

# Single Message Transform (SMT) to convert "TimeStamp" string to ISODate
#transforms=TimestampToIsoDate
#transforms.TimestampToIsoDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
#transforms.TimestampToIsoDate.target.type=Timestamp
#transforms.TimestampToIsoDate.field=TimeStamp
#transforms.TimestampToIsoDate.format=yyyy-MM-dd'T'HH:mm:ss.SSSSS

# Single Message Transform (SMT) to convert "LastUpdated" string to ISODate
transforms=TimestampToIsoDate
transforms.TimestampToIsoDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampToIsoDate.target.type=Timestamp
transforms.TimestampToIsoDate.field=LastUpdated
transforms.TimestampToIsoDate.format=yyyy-MM-dd'T'HH:mm:ss.SSSSS

## Document manipulation settings
key.projection.type=none
key.projection.list=
value.projection.type=none
value.projection.list=

field.renamer.mapping=[]
field.renamer.regex=[]

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder


# Write configuration
delete.on.null.values=false
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy

max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0

# Change Data Capture handling
change.data.capture.handler=

# Topic override examples for the sourceB topic
topic.override.sourceB.collection=sourceB
topic.override.sourceB.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy

但最近我注意到这个转换器生成的“LastUpdated”值与 Kafka 主题中的值有些不同。现在在 Kafka 主题中,这是一个实时的“LastUpdated”值:

"LastUpdated":"2023-08-22T13:52:50.13426"

但在 MongoDB 中它保存为:

"lastUpdated": "2023-08-22T13:53:03.426Z"

相差 3 秒,有时相差一两分钟。我不明白为什么,但它可能与时区无关。看起来时间戳转换过程中发生了一些事情。

如有任何帮助,我们将不胜感激。

mongodb apache-kafka timestamp type-conversion mongodb-kafka-connector
1个回答
0
投票

您必须在 mongoDB 中运行以下脚本:

console.log(new Date());

并修改 Mongodb 日期时间。

© www.soinside.com 2019 - 2024. All rights reserved.