尽管抛出错误仍提交偏移量

问题描述 投票:0回答:1

我有一个混合应用程序,使用

connectMicroservice
连接 Kafka 微服务

  const application = await NestFactory.create(appModule)

  application.connectMicroservice(
    { transport: Transport.KAFKA, { client: { clientId: 'id', brokers: ['broker:123'] } } },
    { inheritAppConfig: true }
  )

  await application.startAllMicroservices()
  await application.listen(3000)

在业务逻辑中的某个地方,我抛出了一个仅用于日志记录的无效消息的错误。

throw new RpcException('')

我使用全局异常处理程序进行日志记录。 (但是在没有自定义异常处理程序的情况下会出现此问题)。

@Catch()
export class GlobalExceptionFilter implements RpcExceptionFilter {
  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
    const data = host.switchToRpc().getData()
    console.log(data)
    return throwError(() => exception.getError())
  }
}

我想提交此失败的消息来处理以下消息,但无论我传递给微服务的选项如何,该消息都永远不会提交,并且应用程序会重试处理此无效消息,这会阻止所有其他消息。我尝试了不同的

client
配置但无济于事

retry: { retries: 1 }

我看到以下错误;

Error: ERROR [Runner] Error when calling eachMessage

我为run尝试了不同的

option
配置,但似乎没有任何效果。我还尝试为
eachMessage
定义自定义函数,但它没有被触发。

我正在考虑通过定义客户端来手动提交偏移量,但是nestjs文档似乎缺少一点,因为我很难让

ClientsModule.register
application.connectMicroservice
一起工作。

我遇到有关已在定义的端口上运行的应用程序的问题,或者该应用程序似乎没有侦听该主题...

node.js apache-kafka error-handling nestjs
1个回答
0
投票

检查消息负载。

我收到“错误:ERROR [Runner] 调用eachMessage 时出错..”

但是,就我而言,我正在使用 Kafka 桌面客户端作为生产者来调试消费者。

该错误是由我从生产者发送的 JSON 负载中的语法错误引起的(额外的逗号)。修复 JSON 有效负载后,一切都按预期运行。

© www.soinside.com 2019 - 2024. All rights reserved.