连接错误:连接ECONNREFUSED 127.0.0.1:9092,如何解决kafka错误?

问题描述 投票:0回答:1

嗯,我正在尝试上传到 Kafka 服务器,但是出了点问题,我不理解错误日志,我希望我的生产者在确切的主题时生产和消费,消费和发送我仍然只是的数据想要登录,但随后我将用来搜索用户,我的代码:

kafka-生产者

export class ProducerS {
  private kafkaProducer: KafkaProducer;
  constructor() {
    // Configuração do Kafka com os endereços dos brokers
    const kafka = new Kafka({clientId: 'kafkaP', brokers: ['localhost:9092'] });
    // Criação de um produtor Kafka
    this.kafkaProducer = kafka.producer()
  }

  // Método para conectar ao broker Kafka
  async connect(): Promise<void> {
    console.log('Conectou')
    await this.kafkaProducer.connect();
  }

  // Método para desconectar do broker Kafka
  async disconnect(): Promise<void> {
    console.log('Desconectou')
    await this.kafkaProducer.disconnect();
  }

  // Método para enviar uma mensagem para um tópico específico
  async sendM(topic: string, messages: any): Promise<void> {
    // Envia a mensagem para o tópico especificado
    await this.kafkaProducer.send({
      topic,
      messages: [{ value: JSON.stringify(messages) }],
    });
  }
}

kafka-消费者:

export class PagamentoService {
    private kafka: Kafka;
    private consumer: Consumer;

    constructor(
        @InjectRepository(DetalheEntidade)
        private readonly detalheRepository: Repository<DetalheEntidade>,
    ){ 
        this.kafka = new Kafka({
            clientId: 'kafkaP',
            brokers: ['localhost:9092'],
        });
        this.consumer = this.kafka.consumer({groupId: 'pagamentos-group'});
        
    } 

    async pegarConsumer(){
        await this.consumer.connect()
        console.log('Opa amigao')
        await this.consumer.subscribe({topic: 'criar-detalhe'})
        await this.consumer.run({
            // eslint-disable-next-line @typescript-eslint/no-unused-vars
            eachMessage: async ({topic, partition, message}) => {
                const ids = await JSON.parse(message.value.toString())
                const topicS = await JSON.parse(topic.toString())
                console.log(ids)
                console.log(topicS)
            }
        })
    }

}

docker-compose(也许在这里,idk):

version: '3.5'
services:
  kafka:
    build:
      context: ./
      dockerfile: ./apps/kafka/Dockerfile
    env_file:
      - .env
    depends_on:
      - postgres
    volumes:
      - .:/usr/src/app 
      - /usr/src/app/node_modules
    command: npm run start:dev kafka 
    
  kafkaprodute:
    build: 
      context: ./
      dockerfile: ./apps/kafkaprodute/Dockerfile
    ports:
      - '4000:5000'
    env_file:
      - .env
    depends_on:
      - kafka
    volumes:
      - .:/usr/src/app 
      - /usr/src/app/node_modules
    command: npm run start:dev kafkaprodute 
    
  postgres:
    image: postgres
    env_file:
      - .env
    ports:
      - '5432:5432'
    volumes:
      - ./db/data:/var/lib/postgresql/data

  postgres_admin:
    image: dpage/pgadmin4
    depends_on:
      - postgres
    env_file:
      - .env
    ports:
      - '15432:80'

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - '2181:2181'

  kafkaServer:
    image: wurstmeister/kafka
    container_name: kafkaServer
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

我使用生产者的地方:


    const usuarioSalvo = usuarioEntidade.id
    
    const kafkaGerent = new ProducerS()
    await kafkaGerent.connect()
    await kafkaGerent.sendM(
      'criar-detalhe', {value: JSON.stringify({usuarioSalvo})}
    )
    await kafkaGerent.disconnect()

日志错误:

ERROR [ServerKafka] ERROR [Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092 {"timestamp":"2024-04-24T15:08:35.624Z","logger":"kafkajs","broker":"localhost:9092","clientId":"nestjs-consumer-server","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1605:16)"}
kafka-1           | [Nest] 29  - 04/24/2024, 3:08:35 PM   ERROR [ServerKafka] ERROR [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092 {"timestamp":"2024-04-24T15:08:35.625Z","logger":"kafkajs","retryCount":4,"retryTime":4152}

我做错了什么?

node.js docker apache-kafka nestjs
1个回答
0
投票

当您使用 docker compose 来运行 NestJS 微服务使用者和 kafka 服务器本身时,您应该使用主机

kafkaServer
而不是
localhost
,因为 docker 网络将在 DNS 解析中设置服务名称每个容器都有自己独立的
localhost

© www.soinside.com 2019 - 2024. All rights reserved.