控制器中注入的服务未定义

问题描述 投票:0回答:1

我正在尝试将 kafka 消息生成器服务注入到我的控制器中(kafka 服务在 monorepo 中的共享后端项目下可用)

共享中的kafka生产者:


@Injectable()
export default class KafkaProducerService {
  constructor(private readonly kafkaService: KafkaService) {}

  createMessage<K>(data: MessageWithPartition<K>): Message {
    return {
      key: data.key ?? crypto.randomUUID(),
      timestamp: Date.now().toString(),
      value: JSON.stringify(data.message),
      headers: data.additionalHeaders,
      partition: data.partition,
    };
  }

  async produceMessage<K>(
    topic: string,
    messageWithPartition: MessageWithPartition<K>
  ) {
    try {
      console.log(messageWithPartition);
      logger.log(`Sending a message to topic [${topic}]`);
      return await this.kafkaService.getProducer().send({
        topic,
        compression: CompressionTypes.GZIP,
        messages: [this.createMessage(messageWithPartition)],
      });
    } catch (error) {
      logger.error(
        `Error happened during sending Kafka message to topic [${topic}]: ${error.message}`,
        'produceMessage'
      );
      throw error;
    }
  }
}

shared-be 中的 kafka 服务:

@Injectable()
export class KafkaService
  implements OnModuleInit, OnModuleDestroy, IKafkaProducer
{
  private kafkaClient: Kafka;
  private producer: Producer;
  private consumers: Consumer[] = [];

  constructor(@Inject('KAFKA_CONFIG') private config: KafkaConfig) {
    this.kafkaClient = new Kafka({
      clientId: config.clientId,
      brokers: config.brokers,
      retry: config.retry,
    });

    this.producer = this.kafkaClient.producer({
      allowAutoTopicCreation: config.allowAutoTopicCreation || true,
    });
  }
  async onModuleInit() {
    await this.initProducer();
    await this.initAndRegisterConsumers();
  }

  async onModuleDestroy() {
    await this.producer.disconnect();
    await Promise.all(this.consumers.map((consumer) => consumer.disconnect()));
  }

  private async initProducer() {
    await this.producer.connect();
    logger.log('Kafka producer is up and running');
  }

  private async initAndRegisterConsumers() {
    if (this.config.consumers) {
      for (const consumerClass of this.config.consumers) {
        const consumerInstance = new consumerClass();
        const consumer = this.kafkaClient.consumer({
          groupId: consumerInstance.groupId,
          sessionTimeout: this.config.kafkaSessionTimeoutMs,
          heartbeatInterval: this.config.kafkaHeartbeatIntervalMs,
        });
        await consumer.connect();
        await consumer.subscribe({
          topic: consumerInstance.topic,
          fromBeginning: consumerInstance.fromBeginning,
        });
        await consumer.run({
          eachMessage: async ({ topic, partition, message }) => {
            await consumerInstance.handle({ topic, partition, message });
          },
        });
        this.consumers.push(consumer);
      }
    }
    logger.log('Kafka consumers are up and running');
  }

  getProducer(): Producer {
    return this.producer;
  }
}

这是kafka.module.ts:

import { DynamicModule, Module } from '@nestjs/common';

import { KafkaConfig } from './interfaces/kafka.interface';
import { KafkaService } from './kafka.service';
import KafkaProducerService from './kafka-producer.service';

@Module({})
export class KafkaModule {
  static forRoot(config: KafkaConfig): DynamicModule {
    return {
      module: KafkaModule,
      providers: [
        {
          provide: 'KAFKA_CONFIG',
          useValue: config,
        },
        KafkaService,
        KafkaProducerService,
      ],
      exports: [KafkaProducerService],
    };
  }
}

然后在我的项目中,我尝试通过以下方式创建自己的 kafkaModule 实例:

import { Module } from '@nestjs/common';
import { KafkaModule } from '@monorepo/platform/shared-be/kafka';

import { AdapterModule } from './adapters/adapter.module';
import ControllersModule from './controllers/controllers.module';
import {
  ClientInfoConsumer,
  ClientRegisterConsumer,
} from './lib/kafka/consumers/gateway.consumers';
import { PrismaModule } from './lib/prisma/prisma.module';

@Module({
  imports: [
    KafkaModule.forRoot({
      clientId: env.clientId,
      brokers: env.brokersList,
      kafkaHeartbeatIntervalMs: 3000,
      kafkaSessionTimeoutMs: 10_000,
      retry: {
        initialRetryTime: 300,
        retries: 10,
      },
      consumers: [ClientInfoConsumer, ClientRegisterConsumer],
    }),
    ControllersModule,
    AdapterModule,
    PrismaModule,
  ],
  providers: [KafkaModule],
  exports: [KafkaModule],
})
export class AppModule {}

然后在我的控制器模块中我有:


import { Module } from '@nestjs/common';
import { KafkaModule } from '@monorepo/platform/shared-be/kafka';

import AdapterController from './controller.adapter';

@Module({
  imports: [KafkaModule],
  controllers: [AdapterController],
})
export default class ControllersModule {}

因此,在 AdapterController 中之后,我尝试通过执行以下操作来注入

KafkaProducerService

@Controller('psp-gateway')
export default class AdapterController {
  constructor(
    @Inject(KafkaProducerService)
    private readonly kafkaProducerService: KafkaProducerService
  ) {
    console.log('controller', kafkaProducerService);
  }

我明白了

Nest can't resolve dependencies of the AdapterController (?). Please make sure that the argument KafkaProducerService at index [0] is available in the ControllersModule context.

我也尝试从控制器依赖注入中删除注入,但在这种情况下,它只会导致未定义。我错过了什么?

我的

"emitDecoratorMetadata": true
里也有
tsconfig.json

typescript nestjs
1个回答
0
投票

您已在动态模块中声明了

KafkaProducerService
提供程序
KafkaModule.forRoot
但您忘记将该动态模块设为全局,否则您必须再次将该动态模块导入到
ControllersModule
而不是其静态版本(没有您要注入的提供程序)

因此,将

global: true
添加到
KafkaModule.forRoot
的返回中,并从
KafkaModule
中删除导入
ControllersModule
,因为它非常无用。

© www.soinside.com 2019 - 2024. All rights reserved.