我有以下问题:
某个生产者以二进制数据(字节数组)的形式发送 Protobuf 消息。
这些二进制数据进入一个配置错误的 Kafka 集群,它将字节数组反序列化为字符串。
然后,该集群将数据序列化为字符串并将其发送到消费者。
毫无戒心的消费者期望收到一个二进制字节数组,但却得到的是 UTF-8 编码的混乱。
我尝试在 JUnit 测试中重现它。
假设我们有以下原型文件:
syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";
option java_package = "com.mycompany.proto";
option java_multiple_files = true;
package com.mycompany;
enum MessageType {
NOT_SET = 0;
TYPE_A = 1;
TYPE_B = 2;
}
message MyMessagePart {
string someValue = 1;
}
message MyMessage {
// Numeric (integer) variable
int32 myNumber = 1;
// Text value
string myText = 2;
// Enum value
MessageType mType = 3;
// Message parts
repeated MyMessagePart messagePart = 4;
// Uint32 value
google.protobuf.UInt32Value uint32Value = 5;
// Timestamp
google.protobuf.Timestamp timestamp = 6;
}
然后我编写了以下测试。
public class EncodingTest {
@Test
public void dealWithCorruptedBinaryData() throws InvalidProtocolBufferException {
// 1. Create a Protobuf message
final MyMessage msg = MyMessage.newBuilder()
.setMyNumber(42)
.setMyText("Hello")
.setMType(MessageType.TYPE_A)
.setUint32Value(UInt32Value.newBuilder()
.setValue(2067)
.build())
.addMessagePart(MyMessagePart.newBuilder()
.setSomeValue("message part value")
.build())
.build();
// 2. Convert it to bytes
final byte[] bytesSentByProducer = msg.toByteArray();
// 3. Now bytesSentByProducer enter misconfigured Kafka
// where they are deserialized using StringDeserializer
final StringDeserializer deserializer = new StringDeserializer();
final String dataReceivedInsideMisconfiguredKafka = deserializer.deserialize("inputTopic",
bytesSentByProducer);
// 4. Then, misconfigured Kafka serializes the data as String
final StringSerializer serializer = new StringSerializer();
final byte[] dataSentToConsumer = serializer.serialize("outputTopic", dataReceivedInsideMisconfiguredKafka);
// Because dataSentToConsumer have been corrupted during deserialization
// or serialization as string, conversion back to Protobuf does not work.
final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);
}
}
生产者创建一条 Protobuf 消息
msg
并将其编码为字节数组 bytesSentByProducer
。
配置错误的 Kafka 集群接收该字节数组,将其反序列化为字符串
dataReceivedInsideMisconfiguredKafka
,将其序列化为字符串 dataSentToConsumer
并将其发送给消费者。
因为 UTF-8 编码已经损坏了二进制数据,所以调用
final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);
导致异常:
com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:107)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1245)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1130)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1024)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readUInt32(CodedInputStream.java:954)
at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:58)
at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:14)
将字节数组转换回消息适用于未损坏的字节数组
bytesSentByProducer
(MyMessage.parseFrom(bytesSentByProducer)
)。
问题:
是否可以将
dataSentToConsumer
转换为 bytesSentByProducer
?
如果是,如果我唯一控制的部分是消费者,我该如何解决这个问题?如何撤消配置错误的 Kafka 集群内发生的 UTF-8 编码?
注意:显而易见的解决方案是正确配置 Kafka 集群。同一个消费者在另一个环境中工作得很好,那里有一个正常的 Kafka 集群,不会做任何奇怪的转换。由于官僚主义原因,这个明显且最简单的解决方案不可用。
我尝试过的
方法1
private byte[] convertToOriginalBytes(final byte[] bytesAfter) throws CharacterCodingException {
final Charset charset = StandardCharsets.UTF_8;
final CharsetDecoder decoder = charset.newDecoder();
final CharsetEncoder encoder = charset.newEncoder();
final ByteBuffer byteBuffer = ByteBuffer.wrap(bytesAfter);
final CharBuffer charBuffer = CharBuffer.allocate(bytesAfter.length);
final CoderResult result = decoder.decode(byteBuffer, charBuffer, true);
result.throwException();
final ByteBuffer reversedByteBuffer = encoder.encode(charBuffer);
final byte[] reversedBytes = new byte[reversedByteBuffer.remaining()];
reversedByteBuffer.get(reversedBytes);
return reversedBytes;
}
结果是异常。
java.nio.BufferUnderflowException
at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:272)
at com.mycompany.EncodingTest.convertToOriginalBytes(EncodingTest.java:67)
at com.mycompany.EncodingTest.dealWithCorruptedBinaryData(EncodingTest.java:54)
方法2
据我所知 UTF-8 有各种字节模式:
0xxxxxxx
用于单字节字符。110xxxxx 10xxxxxx
用于两字节字符等我假设
StringDeserializer
和/或 StringSerializer
二进制数据中的某处已被修改以符合此类 UTF-8 规则。
只要这种转换是可逆的,就可以操纵位来获取原始消息。
讨厌传递坏消息,但你想要的却是不可能的。
关键点是完整性。是否存在从一个域(此处为原始字节)到目标域(此处为 UTF_8)的完整映射,反之亦然。
换句话说:挑战如下:给定任意选择的字节序列,创建一些文本,如果您使用 UTF-8 字符集编码序列化该文本,它会生成那些确切的字节。您是否可以选择一个字节序列,使得这项工作不可能?
不幸的是,答案是是,因此微不足道地证明
bytes -> text-via-UTF_8 -> bytes
是致命,除非你非常幸运并且字节恰好不包含UTF8无法渲染的任何内容。
许多解码器将采用无效的 UTF8(因为,如果在使用 UTF8 将文本转换为字节时某些字节序列不可能出现,通常这意味着存在某些字节序列,如果通过 UTF8 转换为文本,则它们是无效的) - 并且只需尝试一下,或者将“损坏的数据”字形扔在那里,而不是出错。因此,无论谁管理 Kafka 服务器都不会出现错误。此行为(将无效的 UTF-8,因为它不是 UTF-8,变成“呃,什么?”符号)具有破坏性。
一些字符集编码确实使这成为可能。最常用的无疑是
ISO-8859-1
。这个是“完整的”——因为它只是一个简单的映射,将每个字节值(从 0 到 255)映射到某个唯一的字符。因此,你可以整天双向进行。
因此,我们进行了一些修复:
base64 几乎能适应所有情况,这就是它的设计目的。它的效率为 33%(base64 将 3 个字节变成 4 个字节;3MB 大输入变成 4MB 大输出)。将你的字节以base64编码形式交给kafka,或者让kafka来做。
是的。您正确地确定了所有这一切的关键要求,即它是可逆的。不幸的是,事实并非如此。