Realm:使用 Clean-Architecture 和 RxJava2

问题描述 投票:0回答:1

一点上下文,我尝试将一些干净的架构应用于我的一个项目,但我在存储库的(领域)磁盘实现方面遇到了问题。我有一个存储库,它根据某些条件(缓存)从不同的数据存储中提取一些数据。这是理论,当将所有这些与 UseCases 和 RxJava2 混合时,问题就来了。

首先,我从 Realm 获取对象列表,然后我手动创建它的 Observable。 但是

subscribe
(正如预期的那样)在不同的线程上执行,所以领域最终崩溃了......(第二个代码块)

这是我用来创建 Observable 的代码(来自抽象类

DiskStoreBase
):

Observable<List<T>> createListFrom(final List<T> list) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(ObservableEmitter<List<T>> emitter) throws Exception {
            if (list != null) {
                emitter.onNext(list);
                emitter.onComplete();
            } else {
                emitter.onError(new ExceptionCacheNotFound());
            }
        }
    });
}

我该如何处理这种情况?

DiskStoreForZone
的更多代码:

@Override
public Observable<List<ResponseZone>> entityList() {
    Realm realm = Realm.getDefaultInstance();
    List<ResponseZone> result = realm.where(ResponseZone.class).findAll();
    return createListFrom(result);
}

确切的崩溃:

E/REALM_JNI: jni: ThrowingException 8, Realm accessed from incorrect thread.
E/REALM_JNI: Exception has been thrown: Realm accessed from incorrect thread.
android realm rx-java clean-architecture
1个回答
14
投票

它不起作用,因为尽管使用 Rx,您的数据层不是反应性的。

Realm 本质上是一个 reactive 数据源,它的托管对象本质上也是 mutable(由 Realm 就地更新)和 thread-confined(只能在 Realm 所在的同一线程上访问打开)。

要使代码正常工作,您需要从 Realm 中复制数据。

@Override
public Single<List<ResponseZone>> entityList() {
    return Single.fromCallable(() -> {
       try(Realm realm = Realm.getDefaultInstance()) {
           return realm.copyFromRealm(realm.where(ResponseZone.class).findAll());
       }
    });
}

我冒昧地将您的

Single
表示为
Single
,考虑到它不是Observable,它不监听变化,只有1事件,这就是列表本身。所以通过
ObservableEmitter
发送它并没有真正意义,因为它不会发出事件。

因此,这就是为什么我说:你的数据层不是反应性的。你没有在听变化。您只是直接获取数据,您永远不会收到任何更改通知;尽管使用 Rx.


我用颜料画了一些图来说明我的观点。 (蓝色代表副作用)

在您的情况下,您调用一次性操作以从多个数据源(缓存、本地、远程)检索数据。一旦你获得它,你就不会监听变化;从技术上讲,如果您在一个地方和另一个地方编辑数据,唯一的更新方法是“强制缓存手动检索新数据”;为此,您必须知道您在其他地方修改了数据。为此,您需要一种方法来直接调用回调,或发送消息/事件 - 更改通知。

所以在某种程度上,你必须创建一个缓存失效通知事件。如果你听那个,解决方案可能会再次反应。除非您手动执行此操作。

------------------------------------------------ ----------------------

考虑到 Realm 已经是一个反应式数据源(类似于 SQLite 的 SQLBrite),它能够提供更改通知,您可以通过它“使缓存无效”。

事实上,如果你的本地数据源是唯一的数据源,任何来自网络的写入都是你监听的变化,那么你的“缓存”可以记为

replay(1).publish().refCount()
(为新订阅者重放最新数据,如果评估了新数据,则用新数据替换数据),即 RxReplayingShare.

使用从处理程序线程的循环程序创建的

Scheduler
,您可以在后台线程上监听 Realm 中的更改,创建一个反应式数据源,返回最新的非托管副本,您可以在线程之间传递这些副本(虽然直接映射到不可变的领域模型比copyFromRealm()
更好,如果你选择这条路线——这条路线是干净的架构)。

return io.reactivex.Observable.create(new ObservableOnSubscribe<List<ResponseZone>>() { @Override public void subscribe(ObservableEmitter<List<ResponseZone>> emitter) throws Exception { final Realm observableRealm = Realm.getDefaultInstance(); final RealmResults<ResponseZone> results = observableRealm.where(ResponseZone.class).findAllAsync(); final RealmChangeListener<RealmResults<ResponseZone>> listener = results -> { if(!emitter.isDisposed()) { if(results.isValid() && results.isLoaded()) { emitter.onNext(observableRealm.copyFromRealm(results)); } } }; emitter.setDisposable(Disposables.fromRunnable(() -> { if(results.isValid()) { results.removeChangeListener(listener); } observableRealm.close(); })); results.addChangeListener(listener); // initial value will be handled by async query } }).subscribeOn(looperScheduler).unsubscribeOn(looperScheduler);

其中looper调度器获得为

handlerThread = new HandlerThread("LOOPER_SCHEDULER"); handlerThread.start(); synchronized(handlerThread) { looperScheduler = AndroidSchedulers.from(handlerThread.getLooper()); }

这就是您使用 Realm 创建反应式干净架构的方式。


添加:

只有当你打算在 Realm 上实际执行 Clean Architecture 时才需要 LooperScheduler。这是因为默认情况下,Realm 鼓励您将数据对象用作域模型,并且作为一个好处,它提供了延迟加载的线程局部视图,这些视图在更新时会在适当的位置发生变化;但是 Clean Architecture 说你应该使用不可变的领域模型(独立于你的数据层)。因此,如果你想创建反应式干净的架构,当 Realm 发生变化时,你可以在后台线程上随时从 Realm 复制,那么你将需要一个循环调度程序(或在后台线程上观察,但是从刷新的 Realm 上复制

Schedulers.io()

).

对于 Realm,通常你会希望使用 RealmObjects 作为你的领域模型,并依赖惰性求值。在那种情况下,您不使用

copyFromRealm()

 并且您不将 RealmResults 映射到其他东西;但您可以将其显示为 
Flowable
LiveData
.

您可以在

here阅读相关内容。

© www.soinside.com 2019 - 2024. All rights reserved.