我们的项目位于 Angular 16,带有 npm 包
"ngx-mqtt": "^16.1.0"
。
我们有一个管理屏幕,它会在构造函数中自动连接到 HiveMq Mqtt 代理:
constructor(
private readonly mqttNotificationsService: MqttNotificationsService,
private readonly config: AppConfigService,
) {
if (this.config.getConfig('mqttMessagesEnabled')) {
this.mqttNotificationsService.startConnection();
}
}
但是有时这个连接会中断。也就是说,我的通知图标仅显示
0
消息 - 唯一的解决方案是 硬刷新 浏览器。刷新后,我的浏览器客户端成功重新连接到 MQTT 代理 - 并且检索到最新消息。
这里是
startConnection
和 restartMqtt
方法,以及 hubListener
函数:
startConnection(): void {
if (!this.config.getConfig('mqttMessagesEnabled')) {
return;
}
this.topicsService
.getTopics(-1, '')
.pipe(
tap((topics) => {
this.topics = topics;
}),
)
.subscribe({
next: (_) => {
this.connection = this.getBrokerConnection();
try {
this.mqttService.connect(this.connection);
this.hubListener();
} catch (error) {
console.log('mqtt.connect error', error);
}
},
});
}
restartMqtt(): void {
this.destroyConnection();
timer(1000).pipe(
tap((_) => {
this.startConnection();
}),
).subscribe();
}
destroyConnection() {
this.mqttConnected = false;
if (this.subscriptions) {
this.subscriptions.forEach((sub) => sub.unsubscribe());
}
this.subscriptions = [];
this.subscribeMultiple = [];
try {
this.mqttService?.disconnect(true);
} catch (error) {
console.log('Disconnect failed', error.toString());
}
}
// ON CONNECT!
public hubListener = () => {
this.mqttService.onConnect
.pipe(
tap((conn) => {
// see mqtt-packet interface IConnackPacket
console.log(`Mqtt OnConnect returned: ${conn.cmd}`); // 'CONNACK' = Conn Acknowledgement
this.mqttConnected = true;
this.subscribeToTopics();
}),
)
.subscribe();
this.mqttService.onClose.subscribe((_) => (this.mqttConnected = false));
this.mqttService.onError
.pipe(
tap((result) => {
console.log('Mqtt onError event fired: ', result);
}),
)
.subscribe();
this.mqttService.onOffline
.pipe(
tap((result) => {
this.mqttConnected = false;
console.log('Mqtt onOffline event fired: ', result);
}),
)
.subscribe();
this.mqttService.onMessage // Broker published a msg; cmd: 'publish'
.subscribe();
this.mqttService.onSuback // Subscribe Acknowledgement to Mqtt topic
.subscribe();
this.mqttService.onPacketsend.subscribe(); // resulting from a 'pingreq' packet to broker
};
重现有点困难。但在某些时候,我的经纪人连接似乎中断了。也就是说,即使我切换页面并返回 - 我仍然无法检索到最新消息。
谢谢你。
我最终将
hubListener
调用从 startConnection
移出到构造函数()中。这样,如果 this.mqttService.onConnect
被多次解雇(我不知道什么原因),那么至少 hubListener
不会再次被调用。
现在一切都很好,我的 mqtt 通知始终存在。
constructor(
public mqttService: MqttService,
private readonly topicsService: MqttTopicsService,
private readonly toastr: ToastrService,
private readonly configService: ConfigService,
) {
this.configService.mqttNotification$ = this.mqttReceivedSubject.asObservable();
this.configService.siteConfigMonitoringEvent$.pipe(
filter((monitoringEnabled) => monitoringEnabled),
tap((enabled) => {
this.siteConfigMonitoringEnabled = enabled;
this.restartMqtt();
}),
).subscribe();
this.hubListener(); // Now my onConnect event will be setup only once !
}