我已经得到了一个有点问题。
我在nestjs中使用kafka与cqrs。.
我的问题是,在写入过程中处理错误的最好方法是什么。
我向我的nestjs应用程序的web api发送请求。
我有和事件 "ObjectedCreatedEvent "是发送它的事件总线上后,创建和写入kafka(confluent.cloud)。
只是在这个过程中启动了另一个监听器?我希望kafka (confluent cloud)至少要有这样的主题。
我只需要一个ObjectValdiationFailedEvent事件,然后把它放到eventbus上。
Confluent Cloud在这里帮不上忙,因为 这是客户的问题. 正如你所知道的,Kafka接收任何被生产者序列化的数据,并简单地将数据存储到选定的分区中。在你的案例中 数据不事件离开客户端 这意味着,不管是什么原因导致了这个 错误八七 当然不是服务器端的Kafka,而是客户端的。
我的建议是设置你的框架NestJS支持的任何异常处理程序。我不是Node.js的开发者(我的背景是Java和Go),但快速查看NestJS文档显示,这个框架允许你注册能够处理异常的过滤器。比如说,你可以注册一个能够处理异常的过滤器。
import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';
@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
return throwError(exception.getError());
}
}
更多信息 此处.
所以,你可能要调查是哪一层抛出了这个 错误八七 所以你可以进行相应的处理。