将Firebase实时数据侦听器与RxJava结合使用

问题描述 投票:12回答:5

我在我的应用程序中使用Firebase,以及RxJava。 Firebase能够在后端数据中发生更改(添加,删除,更改等)时通知您的应用。我试图将Firebase的功能与RxJava结合起来。

我正在侦听的数据称为LeisureObservable发出LeisureUpdate,其中包含Leisure和更新类型(添加,删除,移动,更改)。

这是我的方法,允许订阅此事件。

private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;

@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {

            @Override
            public void call(final Subscriber<? super LeisureUpdate> subscriber) {
                leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
                    @Override
                    public void onChildAdded(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
                    }

                    @Override
                    public void onChildChanged(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
                    }

                    @Override
                    public void onChildRemoved(DataSnapshot dataSnapshot) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
                    }

                    @Override
                    public void onChildMoved(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
                    }

                    @Override
                    public void onCancelled(FirebaseError firebaseError) {
                        subscriber.onError(new Error(firebaseError.getMessage()));
                    }
                });
            }
        });
    }
    leisureUpdatesSubscriptionsCount++;
    return leisureUpdatesObservable;
}

首先,我想使用Observable.fromCallable()方法来创建Observable,但我想这是不可能的,因为Firebase使用回调,对吧?

我保留Observable的单个实例,以便总是有一个Observable,其中多个Subscriber可以订阅。

当每个人都取消订阅并且我需要停止收听Firebase中的事件时,问题就出现了。我还没有找到使Observable了解是否还有任何订阅。所以我一直在计算我用subscribeToLeisuresUpdates()leisureUpdatesSubscriptionsCount打了多少电话。

然后每当有人想要取消订阅时,就必须打电话

@Override
public void unsubscribeFromLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        return;
    }
    leisureUpdatesSubscriptionsCount--;
    if (leisureUpdatesSubscriptionsCount == 0) {
        firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
        leisureUpdatesObservable = null;
    }
}

这是我发现在有订阅者时让Observable发出项目的唯一方法,但我觉得必须有一种更简单的方法,特别是当没有更多的订阅者收听可观察的内容时。

遇到类似问题或采用不同方法的人?

android firebase rx-java
5个回答
6
投票

您可以使用Observable.fromEmitter,这些内容

    return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
        @Override
        public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    // process update
                    LeisureUpdate leisureUpdate = ...
                    leisureUpdateEmitter.onNext(leisureUpdate);
                }

                @Override
                public void onCancelled(DatabaseError databaseError) {
                    leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
                    mDatabaseReference.removeEventListener(this);
                }
            };
            mDatabaseReference.addValueEventListener(listener);
            leisureUpdateEmitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    mDatabaseReference.removeEventListener(listener);
                }
            });
        }
    }, Emitter.BackpressureMode.BUFFER);

3
投票

最后把它放在你的Observable.create()中。

subscriber.add(Subscriptions.create(new Action0() {
                    @Override public void call() {
                        ref.removeEventListener(leisureUpdatesListener);
                    }
                }));

3
投票

我建议你检查一下下一个库作为参考(或者只是使用它):

RxJava:https://github.com/nmoskalenko/RxFirebase

RxJava 2.0:https://github.com/FrangSierra/Rx2Firebase

其中一个与RxJava一起使用,另一个与RxJava 2.0的新RC一起使用。如果你对它感兴趣,你可以看到两个here之间的差异。


0
投票

以下是将RxJava2与Firebase的CompletionListener一起使用的示例代码:

Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(final CompletableEmitter e) throws Exception {
            String orderKey = FirebaseDatabase.getInstance().getReference().child("orders").push().getKey();
            FirebaseDatabase.getInstance().getReference().child("orders").child(orderKey).setValue(order,
                    new DatabaseReference.CompletionListener() {
                        @Override
                        public void onComplete(DatabaseError databaseError, DatabaseReference databaseReference) {
                            if (e.isDisposed()) {
                                return;
                            }
                            if (databaseError == null) {
                                e.onComplete();
                            } else {
                                e.onError(new Throwable(databaseError.getMessage()));
                            }
                        }
                    });

        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

0
投票

另一个令人惊叹的库将帮助您在rx模式下包装所有firebase实时数据库逻辑。

https://github.com/Link184/Respiration

在这里,您可以创建firebase存储库并从GeneralRepository扩展它,例如:

@RespirationRepository(dataSnapshotType = Leisure.class)
public class LeisureRepository extends GeneralRepository<Leisure>{
    protected LeisureRepository(Configuration<Leisure> repositoryConfig) {
        super(repositoryConfig);
    }

    // observable will never emit any events if there are no more subscribers
    public void killRepository() {
        if (!behaviorSubject.hasObservers()) {
            //protected fields
            behaviorSubject.onComplete();
            databaseReference.removeEventListener(valueListener);
        }
    }
}

您可以通过这种方式“杀死”您的存储库:

// LeisureRepositoryBuilder is a generated class by annotation processor, will appear after a successful gradle build
LeisureRepositoryBuilder.getInstance().killRepository();

但我认为你的情况会更好地扩展com.link184.respiration.repository.ListRepository以避免通过LeisureUpdate从java.util.Map到Leisure模型的数据映射

© www.soinside.com 2019 - 2024. All rights reserved.