NATS Jetstream 消费者并发

问题描述 投票:0回答:1

我的打字稿应用程序由多个项目组成,这些项目都有不同类型的微服务;

server
:REST API,将消息发布到 NATS,以便其他服务更新 (mongo) DB
synchronizer
:消耗 NATS 消息并更新 DB 的进程。

我的问题是我可以运行相同同步器的多个实例。在这种情况下,我有多个相同类型的消费者绑定到同一个流。

我希望我的拉取消费者消费相同的流,但不消费相同的消息。

目前,我的 Consumer 类如下所示:

import { AckPolicy, Codec, ConsumerConfig, DeliverPolicy, JetStreamClient, JetStreamManager, JsMsg, JSONCodec, ReplayPolicy, StringCodec } from "nats";

interface IEvent<T extends any> {
  subject: string;
  data: T
}

const createConsumerOptions = (serviceName: string): ConsumerConfig => {
  return {
    ack_policy: AckPolicy.Explicit,
    deliver_policy: DeliverPolicy.New,
    replay_policy: ReplayPolicy.Instant,
    ack_wait: 1000,
    flow_control: true,
    max_ack_pending: 5,
    max_waiting: 1,
    durable_name: serviceName // <- Example: "Auth-Service"
  };
};

export const initConsumer = async (js: JetStreamClient, serviceName: string, subject: string) => {
  const jsm = await js.jetstreamManager();
  if (!jsm) throw new Error("JetstreamManager Error");

  await initStream(jsm, subject);

  const options = createConsumerOptions(serviceName);

  await jsm.consumers.add(subject, options).catch((ex) => console.log(ex));
  let result = await js.consumers.get(subject, options);

  if (!result) throw new Error("Consumer Error");

  return result;
};

export const initStream = async (jsm: JetStreamManager, subject: string) => {
  const stream = await jsm.streams.find(subject).catch((ex) => console.log(ex));
  if (!stream) await jsm.streams.add({ name: subject.toString(), subjects: [subject] }).catch((ex) => console.log(ex));
};

export abstract class Consumer<T extends IEvent<any>> {
  abstract subject: T["subject"];
  abstract onMessage(data: T["data"], msg: JsMsg): Promise<any>;

  protected _client: Client;

  constructor(client: Client) {
    this._client = client;
  }

  async listen() {
    const js = await this._client.Connection?.jetstream();
    if (!js) throw new Error("Jetstream Error");

    const consumer = await initConsumer(js, this._client.ServiceName, this.subject);
    const msgs = await consumer.consume();
    for await (const msg of msgs) {
      const data = JSONCodec().decode(msg.data)

      try {
        await this.onMessage(data, msg);
        msg.ack();
      } catch (err) {
        msg.nack();
      }
    }
  }
}
typescript microservices nats-jetstream
1个回答
0
投票

目前用推式消费者取代了拉式消费者。 它已被弃用,但这是我发现让它工作的唯一方法。

如果有人遇到同样的问题,我将示例放在这里。

(如果您有更好的解决方案,请随时提出)

我更换了

async listen() {
    const js = await this._client.Connection?.jetstream();
    if (!js) throw new Error("Jetstream Error");

    const consumer = await initConsumer(js, this._client.ServiceName, this.subject);
    const msgs = await consumer.consume();
    for await (const msg of msgs) {
      const data = JSONCodec().decode(msg.data)

      try {
        await this.onMessage(data, msg);
        msg.ack();
      } catch (err) {
        msg.nack();
      }
    }
  }

async listen() {
    const js = await this._client.Connection?.jetstream();
    if (!js) throw new Error("Jetstream Error");

    const options = consumerOpts()
      .ackExplicit()
      .consumerName(this._client.ServiceName)
      .deliverAll()
      .deliverGroup(this._client.ServiceName)
      .deliverTo(createInbox())
      .ackWait(1000)
      .durable(this._client.ServiceName)
      .queue(this._client.ServiceName);

    const subscription = await js.subscribe(this.subject, options); // js.subscribe IS DEPRECATED

    for await (const msg of subscription) {
      const data = parseMessage(msg);

      try {
        const details = await this.onMessage(data, msg);
        this.onAccept(msg, data, details);
      } catch (err) {
        this.onReject(msg, data, err);
      }
    }
  }
© www.soinside.com 2019 - 2024. All rights reserved.