如何反转字节数组中的UTF-8编码?

问题描述 投票:0回答:1

我有以下问题:

  1. 某个生产者以二进制数据(字节数组)的形式发送 Protobuf 消息。

  2. 这些二进制数据进入一个配置错误的 Kafka 集群,它将字节数组反序列化为字符串。

  3. 然后,该集群将数据序列化为字符串并将其发送到消费者

  4. 毫无戒心的消费者期望收到一个二进制字节数组,但却得到的是 UTF-8 编码的混乱。

我尝试在 JUnit 测试中重现它。

假设我们有以下原型文件:

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

option java_package = "com.mycompany.proto";
option java_multiple_files = true;

package com.mycompany;

enum MessageType {
    NOT_SET = 0;
    TYPE_A = 1;
    TYPE_B = 2;
}

message MyMessagePart {
    string someValue = 1;
}

message MyMessage {
  // Numeric (integer) variable
  int32 myNumber = 1;

  // Text value
  string myText = 2;

  // Enum value
  MessageType mType = 3;

  // Message parts
  repeated MyMessagePart messagePart = 4;

  // Uint32 value
  google.protobuf.UInt32Value uint32Value = 5;

  // Timestamp
  google.protobuf.Timestamp timestamp = 6;
}

然后我编写了以下测试。

public class EncodingTest {
    @Test
    public void dealWithCorruptedBinaryData() throws InvalidProtocolBufferException {
        // 1. Create a Protobuf message
        final MyMessage msg = MyMessage.newBuilder()
                .setMyNumber(42)
                .setMyText("Hello")
                .setMType(MessageType.TYPE_A)
                .setUint32Value(UInt32Value.newBuilder()
                        .setValue(2067)
                        .build())
                .addMessagePart(MyMessagePart.newBuilder()
                        .setSomeValue("message part value")
                        .build())
                .build();

        // 2. Convert it to bytes
        final byte[] bytesSentByProducer = msg.toByteArray();

        // 3. Now bytesSentByProducer enter misconfigured Kafka
        // where they are deserialized using StringDeserializer
        final StringDeserializer deserializer = new StringDeserializer();
        final String dataReceivedInsideMisconfiguredKafka = deserializer.deserialize("inputTopic",
                bytesSentByProducer);

        // 4. Then, misconfigured Kafka serializes the data as String
        final StringSerializer serializer = new StringSerializer();
        final byte[] dataSentToConsumer = serializer.serialize("outputTopic", dataReceivedInsideMisconfiguredKafka);

        // Because dataSentToConsumer have been corrupted during deserialization
        // or serialization as string, conversion back to Protobuf does not work.

        final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);

    }
}

生产者创建一条 Protobuf 消息

msg
并将其编码为字节数组
bytesSentByProducer

配置错误的 Kafka 集群接收该字节数组,将其反序列化为字符串

dataReceivedInsideMisconfiguredKafka
,将其序列化为字符串
dataSentToConsumer
并将其发送给消费者。

因为 UTF-8 编码已经损坏了二进制数据,所以调用

final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);

导致异常:

com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.

    at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:107)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1245)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1130)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1024)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readUInt32(CodedInputStream.java:954)
    at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:58)
    at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:14)

将字节数组转换回消息适用于未损坏的字节数组

bytesSentByProducer
MyMessage.parseFrom(bytesSentByProducer)
)。

问题:

  1. 是否可以将

    dataSentToConsumer
    转换为
    bytesSentByProducer

  2. 如果是,如果我唯一控制的部分是消费者,我该如何解决这个问题?如何撤消配置错误的 Kafka 集群内发生的 UTF-8 编码?

注意:显而易见的解决方案是正确配置 Kafka 集群。同一个消费者在另一个环境中工作得很好,那里有一个正常的 Kafka 集群,不会做任何奇怪的转换。由于官僚主义原因,这个明显且最简单的解决方案不可用。

我尝试过的

方法1

private byte[] convertToOriginalBytes(final byte[] bytesAfter) throws CharacterCodingException {
  final Charset charset = StandardCharsets.UTF_8;
  final CharsetDecoder decoder = charset.newDecoder();
  final CharsetEncoder encoder = charset.newEncoder();
  final ByteBuffer byteBuffer = ByteBuffer.wrap(bytesAfter);
  final CharBuffer charBuffer = CharBuffer.allocate(bytesAfter.length);
  final CoderResult result = decoder.decode(byteBuffer, charBuffer, true);
  result.throwException();
  final ByteBuffer reversedByteBuffer = encoder.encode(charBuffer);

  final byte[] reversedBytes = new byte[reversedByteBuffer.remaining()];
  reversedByteBuffer.get(reversedBytes);
  return reversedBytes;
}

结果是异常。

java.nio.BufferUnderflowException
    at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:272)
    at com.mycompany.EncodingTest.convertToOriginalBytes(EncodingTest.java:67)
    at com.mycompany.EncodingTest.dealWithCorruptedBinaryData(EncodingTest.java:54)

方法2

据我所知 UTF-8 有各种字节模式:

  1. 0xxxxxxx
    用于单字节字符。
  2. 110xxxxx 10xxxxxx
    用于两字节字符等

我假设

StringDeserializer
和/或
StringSerializer
二进制数据中的某处已被修改以符合此类 UTF-8 规则。

只要这种转换是可逆的,就可以操纵位来获取原始消息。

java encoding protocol-buffers binary-data
1个回答
0
投票

讨厌传递坏消息,但你想要的却是不可能的。

关键点是完整性。是否存在从一个域(此处为原始字节)到目标域(此处为 UTF_8)的完整映射,反之亦然。

换句话说:挑战如下:给定任意选择的字节序列,创建一些文本,如果您使用 UTF-8 字符集编码序列化该文本,它会生成那些确切的字节。您是否可以选择一个字节序列,使得这项工作不可能

不幸的是,答案是,因此微不足道地证明

bytes -> text-via-UTF_8 -> bytes
致命,除非你非常幸运并且字节恰好不包含UTF8无法渲染的任何内容。

许多解码器将采用无效的 UTF8(因为,如果在使用 UTF8 将文本转换为字节时某些字节序列不可能出现,通常这意味着存在某些字节序列,如果通过 UTF8 转换为文本,则它们是无效的) - 并且只需尝试一下,或者将“损坏的数据”字形扔在那里,而不是出错。因此,无论谁管理 Kafka 服务器都不会出现错误。此行为(将无效的 UTF-8,因为它不是 UTF-8,变成“呃,什么?”符号)具有破坏性

一些字符集编码确实使这成为可能。最常用的无疑是

ISO-8859-1
。这个是“完整的”——因为它只是一个简单的映射,将每个字节值(从 0 到 255)映射到某个唯一的字符。因此,你可以整天双向进行。 因此,我们进行了一些修复:

base64 几乎能适应所有情况,这就是它的设计目的。它的效率为 33%(base64 将 3 个字节变成 4 个字节;3MB 大输入变成 4MB 大输出)。将你的字节以base64编码形式交给kafka,或者让kafka来做。
  • 在应用字符集编码的链接中设置
  • every
  • 链(以便 everywhere 字节转换为字符,反之亦然)以使用 ISO-8859-1。这是很hacky和奇怪的,不推荐,但可能是“快速”一词的某些定义的“快速”修复。 正确修复它 - 此时,我确信您已经知道该怎么做,您只是要求更快的解决方案和/或仍然处理已经损坏的数据的方法。这就是这个答案的第一句话的来源:(
前提是这种转变是可逆的

是的。您正确地确定了所有这一切的关键要求,即它是可逆的。不幸的是,事实并非如此。

© www.soinside.com 2019 - 2024. All rights reserved.