如何有效地将入站netty.io ByteBuf消息分发到ChannelGroup?

问题描述 投票:0回答:1

我创建了一个netty.io BootStrap,它接收来自旧服务器的流数据。服务器使用ISO-8859-1 charset发送数据。还有一个使用不同分隔符字节的内部“协议”:

private static final byte GS = 29;  // FS ASCII char 1D (group separator)
private static final byte RS = 30;  // FS ASCII char 1E (record separator)
private static final byte US = 31;  // FS ASCII char 1F (unit separator)

private static final ByteProcessor GROUP_SEPARATOR_LOCATOR = value -> value != GS;
private static final ByteProcessor RECORD_SEPARATOR_LOCATOR = value -> value != RS;
private static final ByteProcessor UNIT_SEPARATOR_LOCATOR = value -> value != US;

这些ByteProcessor实例用于拆分消息。每条消息最终都被翻译成相应的对象表示,而keyValueMapping包含来自origin消息的主要内容:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, byte[]> keyValueMapping;
    // SOME OTHER STUFF
}

随后,所有更新都将转发到所有连接的Web套接字客户端,这些客户端由单独的ServerBootStrap处理:

public void distribute(ChannelGroup recipients, Object msg) {
    Update updateMsg = (Update) msg;
    recipients.writeAndFlush(updateMsg);
}

当我激活Java Flight Recording并执行一些负载测试时,我意识到主要的分配热点是将初始入站消息中的值转换为ISO-8859-1字节数组的方法:

private byte[] translateValue(ByteBuf in) {
    byte [] result;

    if (!in.hasArray()) {
        result = new byte[in.readableBytes()];
        in.getBytes(in.readerIndex(), result);
    } else {
        result = in.array();
    }
    return result;
}

最初我没有翻译ByteBufs并将它们直接存储在Update的keyValueMapping映射中。由于ByteBuf对象维护了一些内部索引(读取器,编写器,标记等),这些内部索引没有保护 - 按照设计,我害怕简单地将这些ByteBufs包装并转发到不同的通道(参见上面的收件人channelGroup)并决定使用它而是byte []表示。

检查Java Flight Recording结果,我想知道是否有任何建议如何将未更改的入站数据分发到一组不同的通道而不过多地接近GC?从结果中学习,直接缓冲区用于给定通道,因为会创建许多新的字节数组。

为了提供更多上下文,我还添加了执行剩余消息转换的代码:

while (in.readableBytes() > 0) {
    ByteBuf keyAsByteBuf = nextToken(in, UNIT_SEPARATOR_LOCATOR);
    String key = translateKey(keyAsByteBuf);

    if (key != null) {
        ByteBuf valueAsByteBuf = nextToken(in, RECORD_SEPARATOR_LOCATOR);
        byte[] value = translateValue(valueAsByteBuf);

        if (value.length > 0) {
            mapping.put(key, value); 
        }
    }
}

private ByteBuf nextToken(ByteBuf in, ByteProcessor locator) {
    int separatorIdx = in.forEachByte(in.readerIndex(), in.readableBytes(), locator);

    if (separatorIdx >= 0) {
        ByteBuf token = in.readSlice(separatorIdx - in.readerIndex());
        in.skipBytes(1);
        return token;
    }
    return in.readSlice(in.readableBytes());
}

private String translateKey(ByteBuf in) {
    return keyTranslator.translate(in);
}
java netty bytebuffer
1个回答
1
投票

嗯......实际上,你的问题并不那么简单。我会尽快回答一下。

如果您的应用中不需要,则无需将ByteBuf翻译为byte[]。所以我假设你有下一个结构:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, ByteBuf> keyValueMapping;
}

这里的问题是你部分解析ByteBuf。所以你在这个java对象中有java object + ByteBuf

这完全没问题,你可以进一步使用那些ByteBuf's。你的班级Update应该实现ReferenceCounted接口。所以当你做recipients.writeAndFlush(updateMsg)(假设收件人是DefaultChannelGroup)时,netty DefaultChannelGroup将处理对这些缓冲区的引用。

那么会发生什么:

recipients.writeAndFlush(updateMsg)之后,DefaultChannelGroup in loop将你的updateMsg发送到channel.writeAndFlush(safeDuplicate(message))列表中的每个频道。 safeDuplicate是处理对netty ByteBuf的引用的特殊方法,因此您可以将相同的缓冲区发送到多个接收器(它实际上使用retainedDuplicate()复制缓冲区)。但是,您的对象不是ByteBuf,而是java对象。这是该方法的代码:

private static Object safeDuplicate(Object message) {
    if (message instanceof ByteBuf) {
        return ((ByteBuf) message).retainedDuplicate();
    } else if (message instanceof ByteBufHolder) {
        return ((ByteBufHolder) message).retainedDuplicate();
    } else {
        return ReferenceCountUtil.retain(message);
    }
}

所以,为了正确处理ByteBuf的引用,你需要为ReferenceCounted实现ReferenceCountUtil.retain(message)。像这样的东西:

public class Update implements ReferenceCounted {
    @Override
    public final Update retain() {
        return new Update(id, updateType, makeRetainedBuffers());
    }  

    private Map makeRetainedBuffers() {
       Map newMap = new HashMap();
       for (Entry entry : keyValueMapping) {
           newMap.put(entry.key, entry.value.duplicate().retain())
       }
       return newMap;
    }
}

这只是一个伪代码。但你应该明白这个想法。您还必须在release()类中实现Update方法,并确保它始终释放它所拥有的缓冲区。并释放内部的所有缓冲区。我假设你的管道中已经有编码器用于调用Updaterelease()类。

另一个选择是实现自己的DefaultChannelGroup。在这种情况下,您不必依赖safeDuplicate方法。因此,您不需要实现ReferenceCounted,但是您仍然需要在该类中手动释放retain,release。

© www.soinside.com 2019 - 2024. All rights reserved.