kafka 请求 v3+ 序列化问题。代理无法反序列化消息

问题描述 投票:0回答:1

请帮助使用kafka协议。不知道为什么请求不起作用。 我认为这个问题出现在字符串的紧凑版本中。但不知道具体原因。

我向 Kafka 发送 FindCoordinator v3 请求并收到错误(来自 kafka 拦截器控制台):

2024-03-14T11:51:08+03:00 [2024-03-14 08:51:08,808] ERROR Exception while processing request from 10.89.0.45:9092-192.168.127.1:40345-26 (kafka.network.Processor)
2024-03-14T11:51:08+03:00 org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 10
2024-03-14T11:51:08+03:00 Caused by: java.nio.BufferUnderflowException
2024-03-14T11:51:08+03:00  at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
2024-03-14T11:51:08+03:00  at java.base/java.nio.ByteBuffer.get(Unknown Source)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.protocol.ByteBufferAccessor.readArray(ByteBufferAccessor.java:58)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.protocol.Readable.readUnknownTaggedField(Readable.java:52)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.message.RequestHeaderData.read(RequestHeaderData.java:135)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.message.RequestHeaderData.<init>(RequestHeaderData.java:84)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:95)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.parseRequestHeader(SocketServer.scala:999)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1012)
2024-03-14T11:51:08+03:00  at java.base/java.util.LinkedHashMap$LinkedValues.forEach(Unknown Source)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1008)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.run(SocketServer.scala:893)
2024-03-14T11:51:08+03:00  at java.base/java.lang.Thread.run(Unknown Source)

卡夫卡版本:2.8.1

请求的二进制格式(共29个字节):

    //Message Size
    [0] = {byte} 0
    [1] = {byte} 0
    [2] = {byte} 0
    [3] = {byte} 25

    /// --- Header Start
    /// ApiKey = 10
    [4] = {byte} 0
    [5] = {byte} 10

    // Version = 3
    [6] = {byte} 0
    [7] = {byte} 3

    // CorrelationId = 0
    [8] = {byte} 0
    [9] = {byte} 0
    [10] = {byte} 0
    [11] = {byte} 0

    // 'clt' string as Nullable string
    // 'clt'.length as Int16
    [12] = {byte} 0
    [13] = {byte} 3 
    // 'clt' as utf-8
    [14] = {byte} 99
    [15] = {byte} 108
    [16] = {byte} 116

    /// --- FindCoordinator start
    // 'some_group' as CompactString
    [17] = {byte} 11  // length = 'some_group'.length + 1 as VarUInt
    [18] = {byte} 115
    [19] = {byte} 111
    [20] = {byte} 109
    [21] = {byte} 101
    [22] = {byte} 95
    [23] = {byte} 103
    [24] = {byte} 114
    [25] = {byte} 111
    [26] = {byte} 117
    [27] = {byte} 112
    // KeyType = 0 - group
    [28] = {byte} 0

所有内容均适用于 FindCoordinator v2 请求(总共 30 字节): //消息大小 [0] = {字节} 0 [1] = {字节} 0 [2] = {字节} 0 [3] = {字节} 26

/// --- Header Start
/// ApiKey = 10
[4] = {byte} 0
[5] = {byte} 10

// Version = 2
[6] = {byte} 0
[7] = {byte} 2

// CorrelationId = 0
[8] = {byte} 0
[9] = {byte} 0
[10] = {byte} 0
[11] = {byte} 0

// 'clt' string as Nullable string
[12] = {byte} 0
[13] = {byte} 3
[14] = {byte} 99
[15] = {byte} 108
[16] = {byte} 116

/// --- FindCoordinator start
// 'some_group' as KafkaString
// size = 10 (2 bytes as Int16)
[17] = {byte} 0
[18] = {byte} 10
// utf-8 'some_group'
[19] = {byte} 115
[20] = {byte} 111
[21] = {byte} 109
[22] = {byte} 101
[23] = {byte} 95
[24] = {byte} 103
[25] = {byte} 114
[26] = {byte} 111
[27] = {byte} 117
[28] = {byte} 112
// KeyType = 0 - group
[29] = {byte} 0

版本大于或等于 3 的其他请求也存在同样的问题。

apache-kafka kafka-consumer-api
1个回答
0
投票

我找到了答案。 V3 需要 RequestHeader 和 FindCoorinator 负载中的 taggedFiels (TAG_BUFFER)。

我认为标记字段仅从版本 9 开始支持。 我的床。我应该更仔细地阅读文档。

//total message size = 28
// message size as Int32 = 24
[0] = {byte} 0
[1] = {byte} 0
[2] = {byte} 0
[3] = {byte} 24

/////------- Header v3 
// ApiKey=10 as Int16
[4] = {byte} 0
[5] = {byte} 10
// ApiVersion = 3 as Int16
[6] = {byte} 0
[7] = {byte} 3
// CorrelationId = 0 as Int32
[8] = {byte} 0
[9] = {byte} 0
[10] = {byte} 0
[11] = {byte} 0
// 'clt'.length as Int16
[12] = {byte} 0
[13] = {byte} 3 
// ClientId = 'clt' as Utf8 bytes
[14] = {byte} 99
[15] = {byte} 108
[16] = {byte} 116
//TaggedFiels - empty (len=0 as VarUint)
[17] = {byte} 0

/////------- FindCoordinator payload, v3
// 'some_group'.length as VarUInt
[18] = {byte} 11
// 'some_group' as utf8 bytes
[19] = {byte} 115
[20] = {byte} 111
[21] = {byte} 109
[22] = {byte} 101
[23] = {byte} 95
[24] = {byte} 103
[25] = {byte} 114
[26] = {byte} 111
[27] = {byte} 117
[28] = {byte} 112
//KeyType = 0 (group) as Int8
[29] = {byte} 0
//TaggedFiels - empty (len=0 as VarUint)
[30] = {byte} 0
© www.soinside.com 2019 - 2024. All rights reserved.