最后两条消息不遵守 Kafka 消息顺序

问题描述 投票:0回答:2

我正在开发一个 NestJS 项目(混合应用程序),我们使用 KafkaJS 在微服务之间交换一些数据,在某些情况下,这些数据发送到其他服务的顺序非常重要,因为它无法处理第二条消息没有第一个。 问题是,我们的生产订单对象有一个属性,它是一个对象数组 (

orderedArray
),这些对象通过其属性之一 (
count
) 正确排序,它看起来像这样:

{
    "productionOrderId": '...',
    "property1": '...',
    "property2": '...',
    "orderedArray": [
        {
            "id": '...',
            "timestamp": '...',
            "count": 1
        },
        {
            "id": '...',
            "timestamp": '...',
            "count": 2
        },
        {
            "id": '...',
            "timestamp": '...',
            "count": 3
        }
    ]
}

在一项特定功能中,我们从前端收到生产订单创建请求 (REST),经过一番验证后,我们将其保存在数据库中。 之后的最后一步是将生产订单发送到另一个服务,我们使用 Kafka 来做到这一点,真正的问题是:我们将其正确保存到我们的数据库中,通过调试我们的应用程序,我发现,直到我们调用 KafkaJS 方法发送消息,它仍然按照我们保存的数字顺序,所以我 100% 确定我们以正确的顺序发送它。当我们在其他服务中收到该订单时,它的顺序不同,更具体地说,该数组的最后两项已经交换了它们的顺序,如下所示(看看

count
字段):

    "orderedArray": [
        {
            "id": '...',
            "timestamp": '...',
            "count": 1
        },
        {
            "id": '...',
            "timestamp": '...',
            "count": 3
        },
        {
            "id": '...',
            "timestamp": '...',
            "count": 2
        }
    ]

任何数量的记录都会发生这种情况,但仅限于数组的最后两项,所以:

  • 1 2 3 变成 1 3 2
  • 1 2 变成 2 1
  • 1 2 3 4 5 6 7 变为 1 2 3 4 5 7 6
  • 1 2 3 ... 99 100 变为 1 2 3 ... 100 99

这就是我们构建消息的方式(不要介意打字,一旦我解决了这个问题,我将重构它):

    // KafkaService
    this.kafkaClient.emit(message.topic, message.messages),

    // OrderService
    for (const order of productionOrders) {
      this.sendMessage(
        kafkaUtilities.buildOrderedMessage(
          'my-topic-here', Array.of(order),
          'my-key-here')
      );
    }

  // KafkaUtilities
  private createMessage(topic: string, data: Array<unknown>, key?: string): ProducerRecord {
    const content = {
      value: JSON.stringify(data),
    } as any

    if (key) {
      this.timestamp += 1000

      content.key = key
      content.timestamp = this.timestamp.toString()
    }

    return { topic, messages: content }
  }

我为我们需要发送的每条记录的时间戳添加一秒。现在,使排序工作的唯一方法是在我的 for 循环中添加一个“睡眠”函数,但它只需要在那里,即使 0.5 毫秒它也可以工作:

    function sleep(ms: number) {
      return new Promise((resolve) => {
        setTimeout(resolve, ms);
      });
    }

我们的 Kafka 配置:

   options: {
      client: {
        brokers: ['my-broker'],
        connectionTimeout: 4000,
        logLevel: logLevel[env.KAFKA_ENABLE_LOG ? 'DEBUG' : 'NOTHING'],
        sasl: getSasl(enableSecurity),
        ssl: enableSecurity,
        requestTimeout: 90000,
      },
      consumer: {
        groupId: 'my-id',
        heartbeatInterval: 3000,
        metadataMaxAge: 180000,
        sessionTimeout: 60000,
        retry: {
          initialRetryTime: 30000,
          retries: 578,
          multiplier: 2,
          maxRetryTime: 300000,
          factor: 0,
        },
      },
      producer: {
        metadataMaxAge: 180000,
      },
    },

我们花了一些时间试图找出导致问题的原因,但我们还不确定可能导致问题的原因。这是我到目前为止所尝试过的(有些东西在没有上下文的情况下可能没有多大意义):

  • maxInFlightRequests
    设置为 1
  • key
    partition
    设置为消息(对每条消息都尝试相同的操作,并对每条消息进行连续的设置)
  • 创建一个新的存储库和一个新主题(仍然有同样的问题)
  • 尝试使用我们在云环境中使用的相同 Kafka 实例

我要尝试的(并将尽快更新这个问题):

  • 降级KafkaJS
  • 使用纯 NodeJS 项目进行测试

我们的项目信息:

NestJS version
:8.2.6
KafkaJS version
:1.15.0 在我们的云环境中,我们使用事件中心事件作为 Kafka 提供程序,但它也发生在 Kafka 本地。

我猜这是一个与 KafkaJS 相关的问题,但我目前只是猜测。

node.js typescript apache-kafka nestjs kafkajs
2个回答
0
投票

也遇到了这个问题,通过

awating
Producer.send 方法解决了

    await producer.send({...})

-1
投票

Kafka 中的消息排序仅在分区内得到保证,这意味着消费者将以与同一分区中生成的消息相同的顺序消费消息(相反,如果在两个不同的分区上生成两条消息,则不能保证排序) ).

因此您可以检查的是 Kafka 主题中的分区数量。如果您有多个消息,并且每条消息的分区键并不总是相同,那么消息在消费时混乱是正常的。

如果订购在您的环境中是一项硬性要求,那么您可以:

  1. 将分区数量减少到 1。但这可能是一个“错误的想法”,因为您会失去可扩展性。 或者,如果您的业务要求是仅在同一
  2. productionOrder
  3. 内订购消息,那么您可以在生成消息时使用
    productionOrderId
    作为分区键(我假设您当前正在使用消息
    id
    作为分区)密钥来自您的代码,但我不是 KafkaJS 专家)。在这种情况下,属于同一个
    productionOrder
    的所有消息都将进入同一个分区(并以适当的顺序被消耗)。
    
        
© www.soinside.com 2019 - 2024. All rights reserved.