我有一个kafka连接接收器代码,下面的json作为curl命令传递来注册任务。
如果有人知道如何获取我的连接的任务 ID,请告诉我。例如,在下面的示例中,我们定义最大任务数为 3,所以我需要知道 日志的 3 个任务的名称,即我需要知道日志的哪一行属于哪个任务。
在下面的示例中,我知道我有 3 个任务 -
TestCheck-1
、TestCheck-2
和 TestCheck-3
基于 kafka 连接日志。我想知道如何获取任务名称,以便我可以在我的 kafka 连接日志行中打印它们。
{
"name": "TestCheck",
"config": {
"topics": "topic1",
"connector.class": "ApplicationSinkTask Class package",
"tasks.max": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.url": "jdbc connection url",
"driver.name": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"username": "myusername",
"password": "mypassword",
"table.name": "test_table",
"database.name": "test",
}
}
当我注册时,我会得到以下详细信息。
curl -X POST -H "Content-Type: application/json" --data @myjson.json http://service:8082/connectors
{"name":"TestCheck","config":{"topics":"topic1","connector.class":"ApplicationSinkTask Class package","tasks.max":"3","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","connector.url":"jdbc:sqlserver://datahubprod.database.windows.net:1433;","driver.name":"jdbc connection url","username":"myuser","password":"mypassword","table.name":"test_table","database.name":"test","name":"TestCheck"},"tasks":[{"connector":"TestCheck","task":0},{"connector":"TestCheck","task":1},{"connector":"TestCheck","task":2}],"type":null}
您可以根据文档更改 PatternLayout https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs
我的 log4j.properties 的片段
# Unspecified loggers and loggers with additivity=true output to server.log and stdout
# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
log4j.rootLogger=INFO, stdout, kafkaAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%X{connector.context},%c)%n
神奇之处在于: [%d] %p %m (%X{connector.context},%c)%n
给我日志:
[2024-05-15 01:03:27,627] ERROR Graceful stop of task snowflake-connector-v10-4 failed. ([snowflake-connector-v10|task-4] ,org.apache.kafka.connect.runtime.Worker)
您可以使用 Kafka Connect Rest API 管理连接器。您可以在here
找到一大堆命令上面链接中给出的示例显示您可以使用命令
检索给定连接器的所有任务$ curl localhost:8083/connectors/local-file-sink/tasks
[
{
"id": {
"connector": "local-file-sink",
"task": 0
},
"config": {
"task.class": "org.apache.kafka.connect.file.FileStreamSinkTask",
"topics": "connect-test",
"file": "test.sink.txt"
}
}
]
您可以使用您选择的语言发送
curl
命令并将 json 响应导入到变量/字典中以供进一步使用,例如打印到日志。这是一个使用 python 的非常简单的示例,它将整个输出分配给一个变量。
import requests
import json
connectors = 'http://localhost:8083/connectors'
p = requests.get(connectors)
data = p.json()
如果将
data
变量解析为字典,则可以访问每个元素,即 task id
我希望这有帮助!