请帮助使用kafka协议。不知道为什么请求不起作用。 我认为这个问题出现在字符串的紧凑版本中。但不知道具体原因。
我向 Kafka 发送 FindCoordinator v3 请求并收到错误(来自 kafka 拦截器控制台):
2024-03-14T11:51:08+03:00 [2024-03-14 08:51:08,808] ERROR Exception while processing request from 10.89.0.45:9092-192.168.127.1:40345-26 (kafka.network.Processor)
2024-03-14T11:51:08+03:00 org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 10
2024-03-14T11:51:08+03:00 Caused by: java.nio.BufferUnderflowException
2024-03-14T11:51:08+03:00 at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
2024-03-14T11:51:08+03:00 at java.base/java.nio.ByteBuffer.get(Unknown Source)
2024-03-14T11:51:08+03:00 at org.apache.kafka.common.protocol.ByteBufferAccessor.readArray(ByteBufferAccessor.java:58)
2024-03-14T11:51:08+03:00 at org.apache.kafka.common.protocol.Readable.readUnknownTaggedField(Readable.java:52)
2024-03-14T11:51:08+03:00 at org.apache.kafka.common.message.RequestHeaderData.read(RequestHeaderData.java:135)
2024-03-14T11:51:08+03:00 at org.apache.kafka.common.message.RequestHeaderData.<init>(RequestHeaderData.java:84)
2024-03-14T11:51:08+03:00 at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:95)
2024-03-14T11:51:08+03:00 at kafka.network.Processor.parseRequestHeader(SocketServer.scala:999)
2024-03-14T11:51:08+03:00 at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1012)
2024-03-14T11:51:08+03:00 at java.base/java.util.LinkedHashMap$LinkedValues.forEach(Unknown Source)
2024-03-14T11:51:08+03:00 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1008)
2024-03-14T11:51:08+03:00 at kafka.network.Processor.run(SocketServer.scala:893)
2024-03-14T11:51:08+03:00 at java.base/java.lang.Thread.run(Unknown Source)
卡夫卡版本:2.8.1
请求的二进制格式(共29个字节):
//Message Size
[0] = {byte} 0
[1] = {byte} 0
[2] = {byte} 0
[3] = {byte} 25
/// --- Header Start
/// ApiKey = 10
[4] = {byte} 0
[5] = {byte} 10
// Version = 3
[6] = {byte} 0
[7] = {byte} 3
// CorrelationId = 0
[8] = {byte} 0
[9] = {byte} 0
[10] = {byte} 0
[11] = {byte} 0
// 'clt' string as Nullable string
// 'clt'.length as Int16
[12] = {byte} 0
[13] = {byte} 3
// 'clt' as utf-8
[14] = {byte} 99
[15] = {byte} 108
[16] = {byte} 116
/// --- FindCoordinator start
// 'some_group' as CompactString
[17] = {byte} 11 // length = 'some_group'.length + 1 as VarUInt
[18] = {byte} 115
[19] = {byte} 111
[20] = {byte} 109
[21] = {byte} 101
[22] = {byte} 95
[23] = {byte} 103
[24] = {byte} 114
[25] = {byte} 111
[26] = {byte} 117
[27] = {byte} 112
// KeyType = 0 - group
[28] = {byte} 0
所有内容均适用于 FindCoordinator v2 请求(总共 30 字节): //消息大小 [0] = {字节} 0 [1] = {字节} 0 [2] = {字节} 0 [3] = {字节} 26
/// --- Header Start
/// ApiKey = 10
[4] = {byte} 0
[5] = {byte} 10
// Version = 2
[6] = {byte} 0
[7] = {byte} 2
// CorrelationId = 0
[8] = {byte} 0
[9] = {byte} 0
[10] = {byte} 0
[11] = {byte} 0
// 'clt' string as Nullable string
[12] = {byte} 0
[13] = {byte} 3
[14] = {byte} 99
[15] = {byte} 108
[16] = {byte} 116
/// --- FindCoordinator start
// 'some_group' as KafkaString
// size = 10 (2 bytes as Int16)
[17] = {byte} 0
[18] = {byte} 10
// utf-8 'some_group'
[19] = {byte} 115
[20] = {byte} 111
[21] = {byte} 109
[22] = {byte} 101
[23] = {byte} 95
[24] = {byte} 103
[25] = {byte} 114
[26] = {byte} 111
[27] = {byte} 117
[28] = {byte} 112
// KeyType = 0 - group
[29] = {byte} 0
版本大于或等于 3 的其他请求也存在同样的问题。
我找到了答案。 V3 需要 RequestHeader 和 FindCoorinator 负载中的 taggedFiels (TAG_BUFFER)。
我认为标记字段仅从版本 9 开始支持。 我的床。我应该更仔细地阅读文档。
//total message size = 28
// message size as Int32 = 24
[0] = {byte} 0
[1] = {byte} 0
[2] = {byte} 0
[3] = {byte} 24
/////------- Header v3
// ApiKey=10 as Int16
[4] = {byte} 0
[5] = {byte} 10
// ApiVersion = 3 as Int16
[6] = {byte} 0
[7] = {byte} 3
// CorrelationId = 0 as Int32
[8] = {byte} 0
[9] = {byte} 0
[10] = {byte} 0
[11] = {byte} 0
// 'clt'.length as Int16
[12] = {byte} 0
[13] = {byte} 3
// ClientId = 'clt' as Utf8 bytes
[14] = {byte} 99
[15] = {byte} 108
[16] = {byte} 116
//TaggedFiels - empty (len=0 as VarUint)
[17] = {byte} 0
/////------- FindCoordinator payload, v3
// 'some_group'.length as VarUInt
[18] = {byte} 11
// 'some_group' as utf8 bytes
[19] = {byte} 115
[20] = {byte} 111
[21] = {byte} 109
[22] = {byte} 101
[23] = {byte} 95
[24] = {byte} 103
[25] = {byte} 114
[26] = {byte} 111
[27] = {byte} 117
[28] = {byte} 112
//KeyType = 0 (group) as Int8
[29] = {byte} 0
//TaggedFiels - empty (len=0 as VarUint)
[30] = {byte} 0