RXJS按id和进程按顺序分配每个id

问题描述 投票:0回答:1

问题:游戏:所以我有一些船可以到达许多行星。如果2艘船在新星球上同时到达,则可能导致两次更改所有权的相同过程。这个过程是异步的,只应在每个行星所有权变更时发生一次。

为了解决这个问题,我希望按行星ID分割船只流,这样每个流只能用于一个行星。现在棘手的部分是每艘船只应该在前一艘船被处理之后进行处理。

  • 船舶$
  • 按行星ID划分 行星id1:按顺序处理 行星id2:按顺序处理 ...

这里有一些代码可以显示它应该如何表现。

const ships = [
  {
    id: 1,
    planetId: 1,
  },
  {
    id: 2,
    planetId: 1,
  },
  {
    id: 3,
    planetId: 2,
  },
  // ... never finishes 
]
// the source observable never finishes 
const source$ = interval(1000).pipe(
  take(ships.length),
  map(i => ships[i]),
)

const createSubject = (ship) => {
  // Doesn't need to be a subject, but needs to emit new items after a bit of time based on some other requests.
  console.log(`>>>`, ship.id);
  const subject = new Subject();
  setTimeout(() => {
    subject.next(ship.id + ' a' + new Date());
  }, 1000);
  setTimeout(() => {
    subject.next(ship.id + ' b' + new Date());
    subject.complete();
  }, 2000);
  return subject.asObservable();
}

// The result should be the following (t, is the time in seconds, t3, is time after 3 seconds)
// t0: >>> 1
// t0: >>> 3
// t1: 1 a
// t1: 2 a
// t2: 1 b
// t2: 2 b
// t2: >>> 2 (note that the second ship didn't call the createSubject until the first finished)
// t3: 1 a
// t4: 1 2

Solution (with a lot of help from A.Winnen and some figuring out)

在这里运行:https://stackblitz.com/edit/angular-8zopfk?file=src/app/app.component.ts

  const ships = [
    {
      id: 1,
      planetId: 1,
    },
    {
      id: 2,
      planetId: 1,
    },
    {
      id: 3,
      planetId: 2,
    }
  ];
  const createSubject = (ship) => {
    console.log(ship.id + ' a')
    const subject = new Subject();
    setTimeout(() => {
      //subject.next(ship.id + ' b');
    }, 500);//
    setTimeout(() => {
      subject.next(ship.id + ' c');
      subject.complete();//
    }, 1000);
    return subject.asObservable();
  }
  let x = 0;
  interval(10).pipe(//
    take(ships.length),
    map(i => ships[i]),
    groupBy(s => s.planetId),
    mergeMap(group$ => {//
      x++
      return group$.pipe(
        tap(i => console.log('x', i, x)),
        concatMap(createSubject)
      )
    }),
  ).subscribe(res => console.log('finish', res), undefined, () => console.log("completed"))

如何在rxjs中完成?

码:

  const shipArriveAction$ = action$.pipe<AppAction>(
    ofType(ShipActions.arrive),
    groupBy(action => action.payload.ship.toPlanetId),
    mergeMap((shipByPlanet$: Observable<ShipActions.Arrive>) => {
      return shipByPlanet$.pipe(
        groupBy(action => action.payload.ship.id),
        mergeMap((planet$) => {
          return planet$.pipe(
            concatMap((action) => {
              console.log(`>>>concat`, new Date(), action);
              // this code should be called in sequence for each ship with the same planet. I don't need only the results to be in order, but also this to be called in sequence.
              const subject = new Subject();
              const pushAction: PushAction = (pushedAction) => {
                subject.next(pushedAction);
              };
              onShipArriveAction(state$.value, action, pushAction).then(() => {
                subject.complete();
              });
              return subject.asObservable();
            }),
          )
        })
      );

    )
  ;

来自A.Winnen的代码非常接近,但只适用于已完成的源观察,而不是连续的:

    const ships = [
      {
        id: 1,
        planetId: 1,
      },
      {
        id: 2,
        planetId: 1,
      },
      {
        id: 3,
        planetId: 2,
      }
    ];
    const createSubject = (ship) => {
      console.log(ship.id + ' a')
      const subject = new Subject();
      setTimeout(() => {
        subject.next(ship.id + ' b');
      }, 1000);//
      setTimeout(() => {
        subject.next(ship.id + ' c');
        subject.complete();//
      }, 2000);
      return subject.asObservable().pipe(
        finalize(null)
      );
    }

    interval(1000).pipe(
      take(ships.length),
      tap(console.log),
      map(i => ships[i]),
      groupBy(s => s.planetId),
      mergeMap(group => group.pipe(toArray())),
      mergeMap(group => from(group).pipe(
        concatMap(createSubject)
      ))
    ).subscribe(res => console.log(res), undefined, () => console.log("completed"))
rxjs
1个回答
1
投票

您可以使用groupBy和mergeMap的组合来实现您的目标。

from(ships).pipe(
  groupBy(ship => ship.planetId),
  mergeMap(planetGroup => planetGroup.pipe(
    concatMap(ship => {
      // do real processing in this step
     return of(`planetGroup: ${planetGroup.key} - processed ${ship.ship}`);
    })
  ))
).subscribe(result => console.log(result));

我做了一个简单的例子:https://stackblitz.com/edit/angular-6etaja?file=src%2Fapp%2Fapp.component.ts

编辑:更新闪电堆栈:https://stackblitz.com/edit/angular-y7znvk

© www.soinside.com 2019 - 2024. All rights reserved.