监控 RxJS 订阅应用程序范围

问题描述 投票:0回答:1

我正在寻找一种方法来监视 RxJS 订阅以检测应用程序中的内存泄漏。该应用程序在浏览器中运行。

到目前为止,我的方法是包装原型方法

Observable.subscribe
Subscription.unsubscribe
但这不起作用。计数器不准确。我猜想,在某些情况下
unsubscribe
会被调用多次。计数器显示负数。

您是否知道有任何库、黑客或方法可以在不更改生产代码中的每个订阅的情况下监控整个 RxJS 订阅应用程序?打字稿转换器也是一种选择。

我的第一个方法是这里:


  let subscriptionCounter = 0

  Object.defineProperty(Observable.prototype, '__subscribe', { value: Observable.prototype.subscribe })

  Object.defineProperty(Observable.prototype, 'subscribe', { value: function <T> (observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription {
    subscriptionCounter++
    console.log(subscriptionCounter, 'subscribe');
    return (this as Observable<T>)['__subscribe'](observerOrNext)
  }})

  Object.defineProperty(Subscription.prototype, '__unsubscribe', { value: Subscription.prototype.unsubscribe })

  Object.defineProperty(Subscription.prototype, 'unsubscribe', { value: function() {
    subscriptionCounter--
    console.log(subscriptionCounter, 'unsubscribe');
    (this as Subscription)['__unsubscribe']()
  }})
typescript rxjs memory-leaks monitoring
1个回答
0
投票

我认为它应该与以下内容一起使用:

let subscriptionCounter = 0;

Object.defineProperty(Observable.prototype, '__subscribe', {
  value: Observable.prototype.subscribe,
});

Object.defineProperty(Observable.prototype, 'subscribe', {
  value: function <T>(
    observerOrNext?: Partial<Observer<T>> | ((value: T) => void)
  ): Subscription {
    subscriptionCounter++;
    console.log(subscriptionCounter, 'subscribe');

    const originalSubscribe = (
      this['__subscribe'] as Observable<T>['subscribe']
    ).bind(this);

    if (!observerOrNext) {
      return originalSubscribe(observerOrNext);
    }

    if (typeof observerOrNext === 'function') {
      return originalSubscribe({
        next: (v) => {
          observerOrNext(v);
        },
        complete: () => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'complete');
        },
        error: () => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'error');
        },
      });
    } else {
      return originalSubscribe({
        next: (v) => {
          observerOrNext.next?.(v);
        },
        complete: () => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'complete');
          observerOrNext.complete?.();
        },
        error: (e) => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'error');
          observerOrNext.error?.(e);
        },
      });
    }
  },
});

这可确保您考虑到

complete
,同时也考虑到
error

我制作了一个 Stackblitz 来演示这个,我制作了一个包含 3 个操作员的小链示例:

rx(
  from(['value 1', `value 2`,`value 3`]),
  map((name) => `Hello, ${name}!`),
  repeat(2)
).subscribe(console.log);

这给出了以下输出:

1 subscribe
2 subscribe
3 subscribe
Hello, value 1!
Hello, value 2!
Hello, value 3!
2 complete
1 complete
2 subscribe
3 subscribe
Hello, value 1!
Hello, value 2!
Hello, value 3!
2 complete
1 complete
0 complete

如您所见,我们从 0 开始,第一个订阅的计数为 1,而最后,我们得到的总计数为 0,正如预期的那样。

这里值得注意的是日志第 7-10 行之间,即记录的前 3 个值之后。因为

from
在这个阶段已经发送了所有内容,所以它调用complete,这将产生级联效果并关闭整个流。因此,首先
from
调用
complete
(因此总计为
2
),然后调用
map
(总计为
1
),然后我们到达
retry
,它重新打开上面的流,因此不必总共
0
,我们再次开始上述 2 个订阅,一直到总共 3 个,一旦完成,我们就做
3
2
1
0
正如预期的那样。

© www.soinside.com 2019 - 2024. All rights reserved.