我的打字稿应用程序由多个项目组成,这些项目都有不同类型的微服务;
server
:REST API,将消息发布到 NATS,以便其他服务更新 (mongo) DB
synchronizer
:消耗 NATS 消息并更新 DB 的进程。
我的问题是我可以运行相同同步器的多个实例。在这种情况下,我有多个相同类型的消费者绑定到同一个流。
我希望我的拉取消费者消费相同的流,但不消费相同的消息。
目前,我的 Consumer 类如下所示:
import { AckPolicy, Codec, ConsumerConfig, DeliverPolicy, JetStreamClient, JetStreamManager, JsMsg, JSONCodec, ReplayPolicy, StringCodec } from "nats";
interface IEvent<T extends any> {
subject: string;
data: T
}
const createConsumerOptions = (serviceName: string): ConsumerConfig => {
return {
ack_policy: AckPolicy.Explicit,
deliver_policy: DeliverPolicy.New,
replay_policy: ReplayPolicy.Instant,
ack_wait: 1000,
flow_control: true,
max_ack_pending: 5,
max_waiting: 1,
durable_name: serviceName // <- Example: "Auth-Service"
};
};
export const initConsumer = async (js: JetStreamClient, serviceName: string, subject: string) => {
const jsm = await js.jetstreamManager();
if (!jsm) throw new Error("JetstreamManager Error");
await initStream(jsm, subject);
const options = createConsumerOptions(serviceName);
await jsm.consumers.add(subject, options).catch((ex) => console.log(ex));
let result = await js.consumers.get(subject, options);
if (!result) throw new Error("Consumer Error");
return result;
};
export const initStream = async (jsm: JetStreamManager, subject: string) => {
const stream = await jsm.streams.find(subject).catch((ex) => console.log(ex));
if (!stream) await jsm.streams.add({ name: subject.toString(), subjects: [subject] }).catch((ex) => console.log(ex));
};
export abstract class Consumer<T extends IEvent<any>> {
abstract subject: T["subject"];
abstract onMessage(data: T["data"], msg: JsMsg): Promise<any>;
protected _client: Client;
constructor(client: Client) {
this._client = client;
}
async listen() {
const js = await this._client.Connection?.jetstream();
if (!js) throw new Error("Jetstream Error");
const consumer = await initConsumer(js, this._client.ServiceName, this.subject);
const msgs = await consumer.consume();
for await (const msg of msgs) {
const data = JSONCodec().decode(msg.data)
try {
await this.onMessage(data, msg);
msg.ack();
} catch (err) {
msg.nack();
}
}
}
}
目前用推式消费者取代了拉式消费者。 它已被弃用,但这是我发现让它工作的唯一方法。
如果有人遇到同样的问题,我将示例放在这里。
(如果您有更好的解决方案,请随时提出)
我更换了
async listen() {
const js = await this._client.Connection?.jetstream();
if (!js) throw new Error("Jetstream Error");
const consumer = await initConsumer(js, this._client.ServiceName, this.subject);
const msgs = await consumer.consume();
for await (const msg of msgs) {
const data = JSONCodec().decode(msg.data)
try {
await this.onMessage(data, msg);
msg.ack();
} catch (err) {
msg.nack();
}
}
}
与
async listen() {
const js = await this._client.Connection?.jetstream();
if (!js) throw new Error("Jetstream Error");
const options = consumerOpts()
.ackExplicit()
.consumerName(this._client.ServiceName)
.deliverAll()
.deliverGroup(this._client.ServiceName)
.deliverTo(createInbox())
.ackWait(1000)
.durable(this._client.ServiceName)
.queue(this._client.ServiceName);
const subscription = await js.subscribe(this.subject, options); // js.subscribe IS DEPRECATED
for await (const msg of subscription) {
const data = parseMessage(msg);
try {
const details = await this.onMessage(data, msg);
this.onAccept(msg, data, details);
} catch (err) {
this.onReject(msg, data, err);
}
}
}