我正在尝试将 kafka 消息生成器服务注入到我的控制器中(kafka 服务在 monorepo 中的共享后端项目下可用)
共享中的kafka生产者:
@Injectable()
export default class KafkaProducerService {
constructor(private readonly kafkaService: KafkaService) {}
createMessage<K>(data: MessageWithPartition<K>): Message {
return {
key: data.key ?? crypto.randomUUID(),
timestamp: Date.now().toString(),
value: JSON.stringify(data.message),
headers: data.additionalHeaders,
partition: data.partition,
};
}
async produceMessage<K>(
topic: string,
messageWithPartition: MessageWithPartition<K>
) {
try {
console.log(messageWithPartition);
logger.log(`Sending a message to topic [${topic}]`);
return await this.kafkaService.getProducer().send({
topic,
compression: CompressionTypes.GZIP,
messages: [this.createMessage(messageWithPartition)],
});
} catch (error) {
logger.error(
`Error happened during sending Kafka message to topic [${topic}]: ${error.message}`,
'produceMessage'
);
throw error;
}
}
}
shared-be 中的 kafka 服务:
@Injectable()
export class KafkaService
implements OnModuleInit, OnModuleDestroy, IKafkaProducer
{
private kafkaClient: Kafka;
private producer: Producer;
private consumers: Consumer[] = [];
constructor(@Inject('KAFKA_CONFIG') private config: KafkaConfig) {
this.kafkaClient = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
retry: config.retry,
});
this.producer = this.kafkaClient.producer({
allowAutoTopicCreation: config.allowAutoTopicCreation || true,
});
}
async onModuleInit() {
await this.initProducer();
await this.initAndRegisterConsumers();
}
async onModuleDestroy() {
await this.producer.disconnect();
await Promise.all(this.consumers.map((consumer) => consumer.disconnect()));
}
private async initProducer() {
await this.producer.connect();
logger.log('Kafka producer is up and running');
}
private async initAndRegisterConsumers() {
if (this.config.consumers) {
for (const consumerClass of this.config.consumers) {
const consumerInstance = new consumerClass();
const consumer = this.kafkaClient.consumer({
groupId: consumerInstance.groupId,
sessionTimeout: this.config.kafkaSessionTimeoutMs,
heartbeatInterval: this.config.kafkaHeartbeatIntervalMs,
});
await consumer.connect();
await consumer.subscribe({
topic: consumerInstance.topic,
fromBeginning: consumerInstance.fromBeginning,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await consumerInstance.handle({ topic, partition, message });
},
});
this.consumers.push(consumer);
}
}
logger.log('Kafka consumers are up and running');
}
getProducer(): Producer {
return this.producer;
}
}
这是kafka.module.ts:
import { DynamicModule, Module } from '@nestjs/common';
import { KafkaConfig } from './interfaces/kafka.interface';
import { KafkaService } from './kafka.service';
import KafkaProducerService from './kafka-producer.service';
@Module({})
export class KafkaModule {
static forRoot(config: KafkaConfig): DynamicModule {
return {
module: KafkaModule,
providers: [
{
provide: 'KAFKA_CONFIG',
useValue: config,
},
KafkaService,
KafkaProducerService,
],
exports: [KafkaProducerService],
};
}
}
然后在我的项目中,我尝试通过以下方式创建自己的 kafkaModule 实例:
import { Module } from '@nestjs/common';
import { KafkaModule } from '@monorepo/platform/shared-be/kafka';
import { AdapterModule } from './adapters/adapter.module';
import ControllersModule from './controllers/controllers.module';
import {
ClientInfoConsumer,
ClientRegisterConsumer,
} from './lib/kafka/consumers/gateway.consumers';
import { PrismaModule } from './lib/prisma/prisma.module';
@Module({
imports: [
KafkaModule.forRoot({
clientId: env.clientId,
brokers: env.brokersList,
kafkaHeartbeatIntervalMs: 3000,
kafkaSessionTimeoutMs: 10_000,
retry: {
initialRetryTime: 300,
retries: 10,
},
consumers: [ClientInfoConsumer, ClientRegisterConsumer],
}),
ControllersModule,
AdapterModule,
PrismaModule,
],
providers: [KafkaModule],
exports: [KafkaModule],
})
export class AppModule {}
然后在我的控制器模块中我有:
import { Module } from '@nestjs/common';
import { KafkaModule } from '@monorepo/platform/shared-be/kafka';
import AdapterController from './controller.adapter';
@Module({
imports: [KafkaModule],
controllers: [AdapterController],
})
export default class ControllersModule {}
因此,在 AdapterController 中之后,我尝试通过执行以下操作来注入
KafkaProducerService
:
@Controller('psp-gateway')
export default class AdapterController {
constructor(
@Inject(KafkaProducerService)
private readonly kafkaProducerService: KafkaProducerService
) {
console.log('controller', kafkaProducerService);
}
我明白了
Nest can't resolve dependencies of the AdapterController (?). Please make sure that the argument KafkaProducerService at index [0] is available in the ControllersModule context.
我也尝试从控制器依赖注入中删除注入,但在这种情况下,它只会导致未定义。我错过了什么?
我的
"emitDecoratorMetadata": true
里也有
tsconfig.json
您已在动态模块中声明了
KafkaProducerService
提供程序 KafkaModule.forRoot
但您忘记将该动态模块设为全局,否则您必须再次将该动态模块导入到 ControllersModule
而不是其静态版本(没有您要注入的提供程序)
因此,将
global: true
添加到 KafkaModule.forRoot
的返回中,并从 KafkaModule
中删除导入 ControllersModule
,因为它非常无用。