这是测试用例:
import io.reactivex.rxjava3.core.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MainTest {
public static AtomicInteger s1Id = new AtomicInteger(1);
public static AtomicInteger s2Id = new AtomicInteger(1);
public static int s1Id() {return s1Id.getAndIncrement();}
public static int s2Id() {return s2Id.getAndIncrement();}
public static class S1 {
public int idS1;
public S1() {
idS1 = s1Id();
}
}
public static class S2 {
public int idS1;
public int idS2;
public S2(int idS1) {
this.idS1 = idS1;
idS2 = s2Id();
}
}
public static Observable<Long> clock = Observable.interval(1, TimeUnit.SECONDS);
public static void main(String args[]) {
Observable<S1> s1Observable = clock.filter(l->l<1).map(l->(new S1()));
s1Observable.subscribe(s1->System.out.println("S1: "+s1.idS1));
Observable<S2> s2Observable = s1Observable.map(s1->new S2(s1.idS1));
s2Observable.subscribe(s2->System.out.println("S2: idS1: "+s2.idS1 + " idS2: " + s2.idS2));
try{Thread.sleep(100000);}catch(Exception e){e.printStackTrace();}
}
}`
我通常得到的输出: S1:1 S2:idS1:2 idS2:1
(虽然我想我也看到了: S1:2 S2: idS1: 1 idS2: 1)
在任何情况下,由于 s1.idS1 的值被用于构建 S2,我希望这些值 idS1 是相同的。时钟是引发问题所必需的。 我没有遇到什么并发问题?
我已经将我原来的问题减少到这个最小的情况。
对不起,这是一个愚蠢的问题。除其他事项外,我忘记了热和冷可观察量之间的区别。由于 s1Observable 是冷的,它将在 s2observable 的表达式中重新运行,因此为 id 选择不同的值。 (另一方面,如果它很热,它就不会产生任何东西。)所以整个事情没有任何意义。