我正在寻找一种方法来监视 RxJS 订阅以检测应用程序中的内存泄漏。该应用程序在浏览器中运行。
到目前为止,我的方法是包装原型方法
Observable.subscribe
和 Subscription.unsubscribe
但这不起作用。计数器不准确。我猜想,在某些情况下 unsubscribe
会被调用多次。计数器显示负数。
您是否知道有任何库、黑客或方法可以在不更改生产代码中的每个订阅的情况下监控整个 RxJS 订阅应用程序?打字稿转换器也是一种选择。
我的第一个方法是这里:
let subscriptionCounter = 0
Object.defineProperty(Observable.prototype, '__subscribe', { value: Observable.prototype.subscribe })
Object.defineProperty(Observable.prototype, 'subscribe', { value: function <T> (observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription {
subscriptionCounter++
console.log(subscriptionCounter, 'subscribe');
return (this as Observable<T>)['__subscribe'](observerOrNext)
}})
Object.defineProperty(Subscription.prototype, '__unsubscribe', { value: Subscription.prototype.unsubscribe })
Object.defineProperty(Subscription.prototype, 'unsubscribe', { value: function() {
subscriptionCounter--
console.log(subscriptionCounter, 'unsubscribe');
(this as Subscription)['__unsubscribe']()
}})
我认为它应该与以下内容一起使用:
let subscriptionCounter = 0;
Object.defineProperty(Observable.prototype, '__subscribe', {
value: Observable.prototype.subscribe,
});
Object.defineProperty(Observable.prototype, 'subscribe', {
value: function <T>(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void)
): Subscription {
subscriptionCounter++;
console.log(subscriptionCounter, 'subscribe');
const originalSubscribe = (
this['__subscribe'] as Observable<T>['subscribe']
).bind(this);
if (!observerOrNext) {
return originalSubscribe(observerOrNext);
}
if (typeof observerOrNext === 'function') {
return originalSubscribe({
next: (v) => {
observerOrNext(v);
},
complete: () => {
subscriptionCounter--;
console.log(subscriptionCounter, 'complete');
},
error: () => {
subscriptionCounter--;
console.log(subscriptionCounter, 'error');
},
});
} else {
return originalSubscribe({
next: (v) => {
observerOrNext.next?.(v);
},
complete: () => {
subscriptionCounter--;
console.log(subscriptionCounter, 'complete');
observerOrNext.complete?.();
},
error: (e) => {
subscriptionCounter--;
console.log(subscriptionCounter, 'error');
observerOrNext.error?.(e);
},
});
}
},
});
这可确保您考虑到
complete
,同时也考虑到 error
。
我制作了一个 Stackblitz 来演示这个,我制作了一个包含 3 个操作员的小链示例:
rx(
from(['value 1', `value 2`,`value 3`]),
map((name) => `Hello, ${name}!`),
repeat(2)
).subscribe(console.log);
这给出了以下输出:
1 subscribe
2 subscribe
3 subscribe
Hello, value 1!
Hello, value 2!
Hello, value 3!
2 complete
1 complete
2 subscribe
3 subscribe
Hello, value 1!
Hello, value 2!
Hello, value 3!
2 complete
1 complete
0 complete
如您所见,我们从 0 开始,第一个订阅的计数为 1,而最后,我们得到的总计数为 0,正如预期的那样。
这里值得注意的是日志第 7-10 行之间,即记录的前 3 个值之后。因为
from
在这个阶段已经发送了所有内容,所以它调用complete,这将产生级联效果并关闭整个流。因此,首先 from
调用 complete
(因此总计为 2
),然后调用 map
(总计为 1
),然后我们到达 retry
,它重新打开上面的流,因此不必总共 0
,我们再次开始上述 2 个订阅,一直到总共 3 个,一旦完成,我们就做 3
、2
、1
、0
正如预期的那样。