RxJS 检测 observable 何时被订阅

问题描述 投票:0回答:3

我需要检测何时订阅了一个可观察量(

observedEvents
),然后订阅另一个可观察量(
triggerEvent
)。我不想手动订阅
triggerEvent
,但只能在
observedEvents
订阅时订阅一次。

这里有一些代码解释了我正在寻找的内容:

// This just emits events
let emitting = new EventEmitter();

// This is the main Observable which someone else might
// have access to
let observedEvents = Rx.Observable.merge(
  Rx.Observable.fromEvent(emitting, 'aba'),
  Rx.Observable.fromEvent(emitting, 'bob')
)

// This trigger should get a subscription if observedEvents
// has one, i.e. when I subscribe to observedEvents
// that subscription activates this trigger

// I have made an attempt at this by calling skipUntil
// this however skips one event, but I don't want that
let triggerEvent = Rx.Observable.merge(
  // these actions are things that can
  // happen when the trigger is active
  Rx.Observable.of('a').delay(200),
  Rx.Observable.of('b').delay(400),
  Rx.Observable.of('c').delay(600)
)
.skipUntil(observedEvents);

// Something else should be used to activate trigger
// I don't want to do this part manually
triggerEvent.subscribe(val => {
    console.log(`Do something fancy with ${val}`);
});

//----------------------------------------------------
// Somewhere else in the code...
//----------------------------------------------------
observedEvents.subscribe(evt => {
  console.log(`Some event: ${evt}`);
});
// At this point I want triggerEvent to become active
// because observedEvents has a subscription

setTimeout(() => {
  emitting.emit('bob', 'world');
  setTimeout(() => emitting.emit('aba', 'stackoverflow!'), 500);
}, 200);
<!DOCTYPE html>
<html>
<head>
  <script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/EventEmitter/5.1.0/EventEmitter.min.js"></script>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>JS Bin</title>
</head>
<body>

</body>
</html>

这可能吗?

我希望这能解释我在寻找什么。

当我写这篇文章时,我想一个带有 Subjects 的解决方案可能就是我需要的。我不确定,但如果可能的话,我只需要朝正确的方向推动或解决方案。

javascript ecmascript-6 observable reactivex
3个回答
13
投票

对于 rxjs > v7,请参阅 此答案


回答

果然我对使用主题的看法是正确的。关键是对象的观察者名单。这是我最终所做的:

let emitting = new EventEmitter();
let sub = new Rx.Subject();

// return this to users
let myGlobalSub = sub.merge(Rx.Observable.of(1, 2, 3));

// For internal use
let myObservers = Rx.Observable.fromEvent(emitting, 'evt');

console.log(`The number of subscribers is ${sub.observers.length}`);

// Only do something if myGlobalSub has subscribers
myObservers.subscribe(l => {
  if (sub.observers.length) { // here we check observers
    console.log(l);
  }
});

// Somewhere in the code...
emitting.emit('evt', "I don't want to see this"); // No output because no subscribers

myGlobalSub.subscribe(l => console.log(l)); // One sub

emitting.emit('evt', 'I want to see this'); // Output because of one sub

console.log(`The number of subscribers is ${sub.observers.length}`);
<!DOCTYPE html>
<html lang=en>

<head>
  <script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/EventEmitter/5.2.5/EventEmitter.min.js"></script>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>JS Bin</title>
</head>

<body>
</body>

</html>


4
投票

@smac89 的答案使用了

observers
属性,该属性已在 RxJS 7 中弃用,并计划在版本 8 中删除。

如果您只想知道 Observable 是否已被订阅,您可以使用

observed
布尔属性。

目前没有与观察者数组真正等效的东西。

let emitting = new EventEmitter();
let sub = new rxjs.Subject();

// return this to users
let myGlobalSub = rxjs.merge(sub,rxjs.of(1, 2, 3));

// For internal use
let myObservers = rxjs.fromEvent(emitting, 'evt');

console.log(`Has the subject been subscribed to ? ${sub.observed}`);

// Only do something if myGlobalSub has subscribers
myObservers.subscribe(l => {
  if (sub.observed) { // here we check observers
    console.log(l);
  }
});

// Somewhere in the code...
emitting.emit('evt', "I don't want to see this"); // No output because no subscribers

myGlobalSub.subscribe(l => console.log(l)); // One sub

emitting.emit('evt', 'I want to see this'); // Output because of one sub

console.log(`Has the subject been subscribed to ? ${sub.observed}`);
<!DOCTYPE html>
<html lang=en>

<head>
  <script src="https://unpkg.com/rxjs@%5E7/dist/bundles/rxjs.umd.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/EventEmitter/5.2.5/EventEmitter.min.js"></script>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>JS Bin</title>
</head>

<body>
</body>

</html>


0
投票

TL;如果您已经非常熟悉 rxjs,请在底部找到 DR 完整解决方案

我们可以通过编写自己的运算符函数来实现此目的,该函数使用现有的 rxjs 运算符!就我而言,我正在寻找具有

startWith
tap
结合效果的东西;一旦订阅发生就会触发一些副作用,但不用担心它会做什么或返回什么。 鉴于一些可观察的来源,我们希望执行以下操作:

const source = of(1,2,3).pipe( onSubscribe(() => /* some action */ console.log('observable subscribed')), /* your other code */ tap({ next: n => console.log('next ', n), error: e => console.log('error ', e), complete: () => console.log('observable complete') }), );

如果您查看
创建运算符的文档

,浏览所有内容(即运算符的主体),您会发现该运算符只是一个接受 something 并返回一个接受observable(您的源 observable)并返回一个 observable。 在这种情况下,我们的运算符的起点可以只是源可观察量的传递,因为我们不想实际修改它。

onSubscribe<T>() { return (obs: Observable<T>) => { return obs; }; }

接下来我们需要弄清楚如何将它与我们的回调函数结合起来。

// WRONG onSubscribe<T>(callback: Function) { return (obs: Observable<T>) => { callback(); return obs; }; }

这是有问题的。如果您不在这里订阅
source

,输出仍将显示

"observable subscribed"
。这里的问题是
callback()
将在订阅实际发生之前的设置阶段运行(不知道这是否是技术术语)。显然,我们仍然必须将其组合到可观察流本身中,但仍然会立即被调用。好吧,
of
使“事情”立即发生(立即完成但没有值,或者运行并立即完成并有值),所以我们可以
tap
我们的回调,然后将其与我们的源可观察值实际结合起来,我们可以使用
merge
onSubscribe<T>(callback: Function) {
  return (obs: Observable<T>) => {
    // null, or any value, is required in order to trigger the tap
    return merge(of(null).pipe(tap(() => callback())), obs);
  };
}

现在,如果您不订阅 
source

,则在您订阅之前不会有输出......除了输出还包括来自

null
of
。让我们通过第一个发出的值
merge
skip
来清理它,我们就完成了!
onSubscribe<T>(callback: Function) {
  return (obs: Observable<T>) => {
    return merge(of(null).pipe(tap(() => callback())), obs).pipe(skip(1));
  };
}

© www.soinside.com 2019 - 2024. All rights reserved.