Firebase + RxJava,onComplete()事件

问题描述 投票:1回答:1

没有人知道如何将Firebase与RxJava连接,因此当我从数据库加载所有数据时,它会运行arrayAdapter.notifyDataSetChanged()?我当时想用onComplete()方法编写它,但它在加载所有数据之前仍然运行

        Completable.fromCallable(new Callable<List<cards>>() {
            @Override
            public List<cards> call() throws Exception {
                newUserDb.addListenerForSingleValueEvent(new ValueEventListener() {
                    @Override
                    public void onDataChange(@NonNull DataSnapshot dataSnapshot) {
                        if (dataSnapshot.child(currentUID).child("sex").exists()) {
                            myInfo.put("sex", dataSnapshot.child(currentUID).child("sex").getValue().toString());
                        }
                        if (dataSnapshot.child(currentUID).child("dateOfBirth").exists()) {
                            int myAge = stringDateToAge(dataSnapshot.child(currentUID).child("dateOfBirth").getValue().toString());
                            myInfo.put("age", String.valueOf(myAge));
                        }

                        if (dataSnapshot.child(currentUID).child("connections").child("yes").exists()) {
                            for (DataSnapshot ds : dataSnapshot.child(currentUID).child("connections").child("yes").getChildren()) {
                                if (!dataSnapshot.child(currentUID).child("connections").child("matches").hasChild(ds.getKey())) {
                                    Log.d("rxJava", "onDataChange: " + ds.getKey());
                                    first.add(ds.getKey());
                                    getTagsPreferencesUsers(dataSnapshot.child(ds.getKey()), true);
                                }
                            }
                        }
                    }

                    @Override
                    public void onCancelled(@NonNull DatabaseError databaseError) {

                    }
                });
                return rowItems;
            }
        }).subscribeOn(Schedulers.computation())
                .subscribe(new CompletableObserver() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d("rxJava", "Test RxJAVA, onSubscribe");
                    }

                    @Override
                    public void onComplete() {
                        Log.d("rxJava", "Test RxJAVA, onComplete");
                    }

                    @Override
                    public void onError(Throwable error) {
                        Log.d("rxJava", "Test RxJAVA, onError");
                    }
                });

并且输出为

2020-06-04 23:36:28.797 29515-29515/com.example.tinderapp D/rxJava: Test RxJAVA, onSubscribe
2020-06-04 23:36:28.800 29515-29612/com.example.tinderapp D/rxJava: Test RxJAVA, onComplete
2020-06-04 23:36:29.018 29515-29515/com.example.tinderapp D/rxJava: onDataChange: a4hqGgAJBRTVJOlPp3blNDt5v7q1
2020-06-04 23:36:29.022 29515-29515/com.example.tinderapp D/rxJava: onDataChange: aA9HAOtaB7ao6vzKqqBNp0iaBev2
java android firebase firebase-realtime-database rx-java
1个回答
0
投票

我想这是预期的行为,在下面进行描述:

Completable.fromCallable

fromCallable需要一个Lambda,该Lambda将返回订阅列表。在您的情况下,数据库连接也将同时打开,这基本上是无效的,因为回调是通过非阻塞回调注册的。

subscribeOn

这可确保从给定的调度程序中调用fromCallable的subscriptionAcutal。因此,订阅线程和发射线程是分离的。

您首先获得onComplete,因为fromCallable将立即返回rowItems,并且数据库连接将保持打开状态,因为您没有删除侦听器。稍后,您将获得数据库回调日志,因为数据库连接仍处于打开状态,并且侦听器仍处于注册状态。

您实际上想做这样的事情:

    Single.create<List<Card>> { emitter ->
        // register onChange callback to database
        // callback will be called, when a value is available
        // the Single will stay open, until emitter#onSuccess is called with a collected list.
        newUserDb.addListenerForSingleValueEvent {
            // do some stuff

            emitter.onSuccess(listOf()) // return collected data from database here...
        }

        emitter.setCancellable {
            // unregister addListenerForSingleValueEvent from newUserDb here
        }
    }.subscribeOn(Schedulers.computation())
        .subscribe(
            // stuff
        )

如果您想要持续不断的更新,请将Single与Observable / Flowable交换

© www.soinside.com 2019 - 2024. All rights reserved.