我正在开发一个IOT应用程序,它要求我处理许多小的非结构化消息(这意味着它们的字段会随着时间而改变-有些字段可能会出现,而另一些可能会消失)。这些消息通常具有2到15个字段,其值属于基本数据类型(int / long,字符串,布尔值)。这些消息非常适合JSON数据格式(或msgpack)。
至关重要的是,消息必须按到达顺序进行处理(理解:它们需要由单个线程处理-无法并行处理此部分)。我有自己的逻辑来实时处理这些消息(吞吐量相对较小,最多每秒几十万条消息),但是引擎越来越需要能够通过重播邮件的历史记录。尽管起初并不是为了达到这个目的而编写的,但是如果我能够以每秒的速度向其提供历史数据,那么我的事件处理引擎(用Go编写)可以很好地每秒处理数十个(也许是几百个)百万条消息。足够的速度。
这正是问题所在。我已经在很长一段时间(数年)内存储了许多(数千亿)这些消息,目前以分隔的msgpack格式(https://github.com/msgpack/msgpack-python#streaming-unpacking)存储。在此设置和其他设置下(请参见下文),我能够基准化〜2M消息/秒的峰值解析速度(仅在2019 Macbook Pro上,仅解析),这远没有使磁盘IO饱和。
即使不谈论IO,也要执行以下操作:
import json
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
json_message = json.dumps(message)
%%timeit
json.loads(json_message)
给我一个3微秒/消息的解析时间,比30万条消息/秒略高。与ujson,rapidjson和orjson相比,而不是标准库的json
模块,我能够获得1微秒/消息(使用ujson)的峰值速度,大约是1M消息/秒。
Msgpack稍微好一些:
import msgpack
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
msgpack_message = msgpack.packb(message)
%%timeit
msgpack.unpackb(msgpack_message)
给我大约750ns /条消息(大约100ns /场)的处理时间,大约是130万条消息/秒。我最初以为C ++可能会更快。这是一个使用nlohmann/json的示例,尽管它不能与msgpack直接比较:
#include <iostream>
#include "json.hpp"
using json = nlohmann::json;
const std::string message = "{\"value\": \"hello\"}";
int main() {
auto jsonMessage = json::parse(message);
for(size_t i=0; i<1000000; ++i) {
jsonMessage = json::parse(message);
}
std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away.
};
使用clang 11.0.3(std = c ++ 17,-O3)进行编译,这在同一台Macbook上以〜1.4s的速度运行,也就是说,解析速度为〜700k消息/秒,甚至比Python示例。我知道nlohmann / json可能非常慢,并且使用simdjson的DOM API能够获得大约2M消息/秒的解析速度。
对于我的用例来说,这仍然太慢了。我愿意接受所有建议,以提高Python,C ++,Java(或任何JVM语言)或Go中潜在应用程序的消息解析速度。
注意:
我已经探索的事物:
谢谢!
我假设消息仅包含一些基本类型的命名属性(在运行时定义),并且这些基本类型例如是字符串,整数和浮点数。
为了快速实现,最好:
因此,我们首先需要设计一个简单而快速的二进制消息协议:
二进制消息包含其属性数(以1字节编码),后跟属性列表。每个属性都包含一个字符串,该字符串的前缀是其大小(以1个字节编码),然后是属性的类型(std :: variant中类型的索引,以1个字节编码)以及属性值(大小为-前缀字符串,64位整数或64位浮点数)。
每个编码的消息是一个字节流,可以容纳在一个较大的缓冲区中(分配一次并重用于多个传入消息)。>>
这里是用于解码来自原始二进制缓冲区的消息的代码:
#include <unordered_map> #include <variant> #include <climits> // Define the possible types here using AttrType = std::variant<std::string_view, int64_t, double>; // Decode the `msgData` buffer and write the decoded message into `result`. // Assume the message is not ill-formed! // msgData must not be freed or modified while the resulting map is being used. void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result) { static_assert(CHAR_BIT == 8); const size_t attrCount = msgData[0]; size_t cur = 1; result.clear(); for(size_t i=0 ; i<attrCount ; ++i) { const size_t keyLen = msgData[cur]; std::string_view key(msgData+cur+1, keyLen); cur += 1 + keyLen; const size_t attrType = msgData[cur]; cur++; // A switch could be better if there is more types if(attrType == 0) // std::string_view { const size_t valueLen = msgData[cur]; std::string_view value(msgData+cur+1, valueLen); cur += 1 + valueLen; result[key] = std::move(AttrType(value)); } else if(attrType == 1) // Native-endian 64-bit integer { int64_t value; // Required to not break the strict aliasing rule std::memcpy(&value, msgData+cur, sizeof(int64_t)); cur += sizeof(int64_t); result[key] = std::move(AttrType(value)); } else // IEEE-754 double { double value; // Required to not break the strict aliasing rule std::memcpy(&value, msgData+cur, sizeof(double)); cur += sizeof(double); result[key] = std::move(AttrType(value)); } } }
您可能也需要编写编码功能(基于相同的想法)。
这里是用法示例(基于与json相关的代码):
const char* message = "\x01\x05value\x00\x05hello"; void bench() { std::unordered_map<std::string_view, AttrType> decodedMsg; decodedMsg.reserve(16); decode(message, decodedMsg); for(size_t i=0; i<1000*1000; ++i) { decode(message, decodedMsg); } visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]); }
在我的机器(使用Intel i7-9700KF处理器)上,根据您的基准,使用nlohmann json库的代码获得了2.7M消息/秒,使用新代码获得了35.4M的消息/秒。
请注意,此代码可以快得多
。实际上,大多数时间都花在了有效的哈希和分配上。您可以通过使用更快的哈希映射实现(例如boost :: container :: flat_map或ska :: bytell_hash_map)和/或使用自定义分配器来缓解此问题。另一种方法是构建自己精心调整的哈希图实现。另一种选择是使用键-值对的向量并使用线性搜索来执行查找(这应该很快,因为您的消息应该没有很多属性,并且因为您说每条消息只需要一小部分属性)。但是,消息越大,解码速度越慢。因此,您可能需要利用并行性来更快地解码消息块。所有这些,有可能达到超过100 M条消息/秒。