将Flowable分成2个,处理2个流,但是一个依赖于另一个?

问题描述 投票:0回答:1

我有以下情况:我需要处理以Flowable形式接收的流。流上的每个项目都有一条数据,只有流上的第一个元素包含元数据。可以处理数据流的功能需要元数据中的信息。

类似:

// Stream items look like this
class StreamItem{
   Metadata meta;
   Data data;
}

// Processor looks like this
Single<Result> processStream(Meta meta, Flowable<Data> data);

我收到了Flowable<StreamItem>。我试图做类似的事情:

Flowable<StreamItem> input = ...

ConnectableFlowable<StreamItem> multi = input.publish;

Single<Meta> streamMeta = multi.firstOrError().map(StreamItem::getMeta);

Flowable<Data> streamData = multi.map(StreamItem::getData);

multi.connect();

Single<Result> result = streamMeta.flatMap(meta ->  processStream(meta,streamData));

[之后,我只返回result.ignoreResult()(因为我们需要过程的副作用,但实际上不是对象),然后从客户端(这是入口点),我们将Completable映射到标准响应中电话。不确定最后一部分是否相关。

我也尝试过:

Flowable<Result> res = input.publish(
   flow -> {
     Single<Meta> meta = flow.firstOrError().map(StreamItem::getMeta);
     Flowable<Data> data = flow.map(StreamITem::getData);
     return meta.flatMap(met -> processStream(met,data)).toFlowable();
   });

然后为上述相同的res.ignoreElements()处理返回Completable

我已经能够处理元数据,也可以存出元数据并处理数据流,但是如上所述,一旦我将两者都连接起来,似乎没有任何处理可以完成。我认为可能是我正在嵌套同一流的处理?无论如何,我认为我可能误会了所有这些工作原理(我对Rx还是很陌生),因此,如果有人对如何实现这一目标有更好的想法,我很想听听!

rx-java rx-java2
1个回答
1
投票
// Stream items look like this class StreamItem { String meta; Integer data; public String getMeta() { return meta; } public Integer getData() { return data; } } // Processor looks like this interface Processor { String processStream( String meta, Integer data ); } @Test public void testFlowable() { // Set up mock input: AtomicBoolean first = new AtomicBoolean( true ); Flowable<StreamItem> input = Flowable.generate( emitter -> { StreamItem item = new StreamItem(); item.data = (int)( Math.random() * 100 ); if ( first.getAndSet( false )) { item.meta = UUID.randomUUID().toString(); } emitter.onNext( item ); } ); // Mock processor: Processor processor = ( meta, data ) -> meta + " : " + data; // Set up rx pipeline: Flowable<StreamItem> multi = input.share(); Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta ); Flowable<String> result = multi.map( StreamItem::getData ) .withLatestFrom( streamMeta.toFlowable(), ( data, meta ) -> processor.processStream( meta, data )); // Subscribe: result.take( 5 ).blockingSubscribe( System.out::println ); }

输出:

3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 72
3fba00bd-027b-4802-8b7d-674497d72052 : 47
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 93

根据反馈进行更新:

如果您确实需要数据Flowable和一个具体的元数据对象,这似乎可以解决问题:

// Stream items look like this class StreamItem { String meta; Integer data; public String getMeta() { return meta; } public Integer getData() { return data; } } // Processor looks like this interface Processor { String processStream( String meta, Flowable<Integer> data ); } @Test public void testFlowable() { // Set up mock input: AtomicBoolean first = new AtomicBoolean( true ); Flowable<StreamItem> input = Flowable.generate( emitter -> { StreamItem item = new StreamItem(); item.data = (int)( Math.random() * 100 ); if ( first.getAndSet( false )) { item.meta = UUID.randomUUID().toString(); } emitter.onNext( item ); } ); // Mock processor: Processor processor = ( meta, data ) -> { System.out.println( meta ); data.subscribe( System.out::println ); return meta; }; // Set up rx pipeline: Flowable<StreamItem> multi = input.take( 5 ).share(); Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta ); streamMeta.map( meta -> processor.processStream( meta, multi.map( StreamItem::getData ))) .subscribe(); }

输出:

3421c5f6-8554-43ce-aa69-e6cef9c1ed89
47
46
74
59
57
© www.soinside.com 2019 - 2024. All rights reserved.