我正在开发一个 NestJS 项目(混合应用程序),我们使用 KafkaJS 在微服务之间交换一些数据,在某些情况下,这些数据发送到其他服务的顺序非常重要,因为它无法处理第二条消息没有第一个。 问题是,我们的生产订单对象有一个属性,它是一个对象数组 (
orderedArray
),这些对象通过其属性之一 (count
) 正确排序,它看起来像这样:
{
"productionOrderId": '...',
"property1": '...',
"property2": '...',
"orderedArray": [
{
"id": '...',
"timestamp": '...',
"count": 1
},
{
"id": '...',
"timestamp": '...',
"count": 2
},
{
"id": '...',
"timestamp": '...',
"count": 3
}
]
}
在一项特定功能中,我们从前端收到生产订单创建请求 (REST),经过一番验证后,我们将其保存在数据库中。 之后的最后一步是将生产订单发送到另一个服务,我们使用 Kafka 来做到这一点,真正的问题是:我们将其正确保存到我们的数据库中,通过调试我们的应用程序,我发现,直到我们调用 KafkaJS 方法发送消息,它仍然按照我们保存的数字顺序,所以我 100% 确定我们以正确的顺序发送它。当我们在其他服务中收到该订单时,它的顺序不同,更具体地说,该数组的最后两项已经交换了它们的顺序,如下所示(看看
count
字段):
"orderedArray": [
{
"id": '...',
"timestamp": '...',
"count": 1
},
{
"id": '...',
"timestamp": '...',
"count": 3
},
{
"id": '...',
"timestamp": '...',
"count": 2
}
]
任何数量的记录都会发生这种情况,但仅限于数组的最后两项,所以:
这就是我们构建消息的方式(不要介意打字,一旦我解决了这个问题,我将重构它):
// KafkaService
this.kafkaClient.emit(message.topic, message.messages),
// OrderService
for (const order of productionOrders) {
this.sendMessage(
kafkaUtilities.buildOrderedMessage(
'my-topic-here', Array.of(order),
'my-key-here')
);
}
// KafkaUtilities
private createMessage(topic: string, data: Array<unknown>, key?: string): ProducerRecord {
const content = {
value: JSON.stringify(data),
} as any
if (key) {
this.timestamp += 1000
content.key = key
content.timestamp = this.timestamp.toString()
}
return { topic, messages: content }
}
我为我们需要发送的每条记录的时间戳添加一秒。现在,使排序工作的唯一方法是在我的 for 循环中添加一个“睡眠”函数,但它只需要在那里,即使 0.5 毫秒它也可以工作:
function sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
我们的 Kafka 配置:
options: {
client: {
brokers: ['my-broker'],
connectionTimeout: 4000,
logLevel: logLevel[env.KAFKA_ENABLE_LOG ? 'DEBUG' : 'NOTHING'],
sasl: getSasl(enableSecurity),
ssl: enableSecurity,
requestTimeout: 90000,
},
consumer: {
groupId: 'my-id',
heartbeatInterval: 3000,
metadataMaxAge: 180000,
sessionTimeout: 60000,
retry: {
initialRetryTime: 30000,
retries: 578,
multiplier: 2,
maxRetryTime: 300000,
factor: 0,
},
},
producer: {
metadataMaxAge: 180000,
},
},
我们花了一些时间试图找出导致问题的原因,但我们还不确定可能导致问题的原因。这是我到目前为止所尝试过的(有些东西在没有上下文的情况下可能没有多大意义):
maxInFlightRequests
设置为 1key
和 partition
设置为消息(对每条消息都尝试相同的操作,并对每条消息进行连续的设置)我要尝试的(并将尽快更新这个问题):
我们的项目信息:
NestJS version
:8.2.6
KafkaJS version
:1.15.0
在我们的云环境中,我们使用事件中心事件作为 Kafka 提供程序,但它也发生在 Kafka 本地。
我猜这是一个与 KafkaJS 相关的问题,但我目前只是猜测。
也遇到了这个问题,通过
awating
Producer.send 方法解决了
await producer.send({...})
Kafka 中的消息排序仅在分区内得到保证,这意味着消费者将以与同一分区中生成的消息相同的顺序消费消息(相反,如果在两个不同的分区上生成两条消息,则不能保证排序) ).
因此您可以检查的是 Kafka 主题中的分区数量。如果您有多个消息,并且每条消息的分区键并不总是相同,那么消息在消费时混乱是正常的。
如果订购在您的环境中是一项硬性要求,那么您可以:
productionOrder
productionOrderId
作为分区键(我假设您当前正在使用消息 id
作为分区)密钥来自您的代码,但我不是 KafkaJS 专家)。在这种情况下,属于同一个 productionOrder
的所有消息都将进入同一个分区(并以适当的顺序被消耗)。