我在 AWS 中安装了 Kinesis Confluence Connector 插件,以将我的 Kinesis Stream 与我的 MSK 集群连接。它正在工作,但我在 Cloudwatch 中看到的日志记录不是很有帮助:
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
如何增强我获得的日志记录?例如,查看每分钟写入集群的记录数。
我看到了文档https://docs.confluence.io/platform/current/connect/logging.html,我认为我看到了正在写入标准输出的默认值。但我不确定如何更改 Kinesis 连接器的日志级别以显示更多信息。
这是我当前的配置:
name=msk-connector-kinesis
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=3
kafka.topic=my-topic
kinesis.region=eu-central-1
kinesis.stream=kinesis_stream_eu-central-1
confluent.topic.bootstrap.servers=<server1>:9098,<server2>:9098,<server3>:9098
confluent.topic.replication.factor=3
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.consumer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.producer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all
我们
OP 所需的指标
“每分钟写入集群的记录数”
可以在 Cloudwatch 中找到,作为 MSK Connect 发出的指标的一部分。出现这种情况是因为您是通过 MSK Connect 功能运行连接器(请参阅问题中 OP 的评论)。
由于这是源连接器(将数据从源推送到 MSK),因此您要查找的记录是SourceRecordPollRate
和
SourceRecordWriteRate
。这些指标的查询示例:
(摘自
AWS 大数据博客)
我认为 OP 发布的原始线索无效(尝试增加日志记录以获取连接器的生产者速率等指标),但值得一提的是,MSK Connect 将推送连接器生成的日志记录 Cloudwatch Logs、Amazon S3 或 Kinesis Firehose Stream 的严重性级别为 INFO、WARN、ERROR 和 FATAL。
根据OP提供的输入,这正在按预期工作:
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)