一点上下文,我尝试将一些干净的架构应用于我的一个项目,但我在存储库的(领域)磁盘实现方面遇到了问题。我有一个存储库,它根据某些条件(缓存)从不同的数据存储中提取一些数据。这是理论,当将所有这些与 UseCases 和 RxJava2 混合时,问题就来了。
首先,我从 Realm 获取对象列表,然后我手动创建它的 Observable。 但是
subscribe
(正如预期的那样)在不同的线程上执行,所以领域最终崩溃了......(第二个代码块)
这是我用来创建 Observable 的代码(来自抽象类
DiskStoreBase
):
Observable<List<T>> createListFrom(final List<T> list) {
return Observable.create(new ObservableOnSubscribe<List<T>>() {
@Override
public void subscribe(ObservableEmitter<List<T>> emitter) throws Exception {
if (list != null) {
emitter.onNext(list);
emitter.onComplete();
} else {
emitter.onError(new ExceptionCacheNotFound());
}
}
});
}
我该如何处理这种情况?
DiskStoreForZone
的更多代码:
@Override
public Observable<List<ResponseZone>> entityList() {
Realm realm = Realm.getDefaultInstance();
List<ResponseZone> result = realm.where(ResponseZone.class).findAll();
return createListFrom(result);
}
确切的崩溃:
E/REALM_JNI: jni: ThrowingException 8, Realm accessed from incorrect thread.
E/REALM_JNI: Exception has been thrown: Realm accessed from incorrect thread.
它不起作用,因为尽管使用 Rx,您的数据层不是反应性的。
Realm 本质上是一个 reactive 数据源,它的托管对象本质上也是 mutable(由 Realm 就地更新)和 thread-confined(只能在 Realm 所在的同一线程上访问打开)。
要使代码正常工作,您需要从 Realm 中复制数据。
@Override
public Single<List<ResponseZone>> entityList() {
return Single.fromCallable(() -> {
try(Realm realm = Realm.getDefaultInstance()) {
return realm.copyFromRealm(realm.where(ResponseZone.class).findAll());
}
});
}
我冒昧地将您的
Single
表示为Single
,考虑到它不是Observable,它不监听变化,只有1事件,这就是列表本身。所以通过 ObservableEmitter
发送它并没有真正意义,因为它不会发出事件。
因此,这就是为什么我说:你的数据层不是反应性的。你没有在听变化。您只是直接获取数据,您永远不会收到任何更改通知;尽管使用 Rx.
我用颜料画了一些图来说明我的观点。 (蓝色代表副作用)
在您的情况下,您调用一次性操作以从多个数据源(缓存、本地、远程)检索数据。一旦你获得它,你就不会监听变化;从技术上讲,如果您在一个地方和另一个地方编辑数据,唯一的更新方法是“强制缓存手动检索新数据”;为此,您必须知道您在其他地方修改了数据。为此,您需要一种方法来直接调用回调,或发送消息/事件 - 更改通知。
所以在某种程度上,你必须创建一个缓存失效通知事件。如果你听那个,解决方案可能会再次反应。除非您手动执行此操作。
考虑到 Realm 已经是一个反应式数据源(类似于 SQLite 的 SQLBrite),它能够提供更改通知,您可以通过它“使缓存无效”。
事实上,如果你的本地数据源是唯一的数据源,任何来自网络的写入都是你监听的变化,那么你的“缓存”可以记为
replay(1).publish().refCount()
(为新订阅者重放最新数据,如果评估了新数据,则用新数据替换数据),即 RxReplayingShare.
Scheduler
,您可以在后台线程上监听 Realm 中的更改,创建一个反应式数据源,返回最新的非托管副本,您可以在线程之间传递这些副本(虽然直接映射到不可变的领域模型比copyFromRealm()
更好,如果你选择这条路线——这条路线是干净的架构)。
return io.reactivex.Observable.create(new ObservableOnSubscribe<List<ResponseZone>>() {
@Override
public void subscribe(ObservableEmitter<List<ResponseZone>> emitter)
throws Exception {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmResults<ResponseZone> results = observableRealm.where(ResponseZone.class).findAllAsync();
final RealmChangeListener<RealmResults<ResponseZone>> listener = results -> {
if(!emitter.isDisposed()) {
if(results.isValid() && results.isLoaded()) {
emitter.onNext(observableRealm.copyFromRealm(results));
}
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> {
if(results.isValid()) {
results.removeChangeListener(listener);
}
observableRealm.close();
}));
results.addChangeListener(listener);
// initial value will be handled by async query
}
}).subscribeOn(looperScheduler).unsubscribeOn(looperScheduler);
其中looper调度器获得为
handlerThread = new HandlerThread("LOOPER_SCHEDULER");
handlerThread.start();
synchronized(handlerThread) {
looperScheduler = AndroidSchedulers.from(handlerThread.getLooper());
}
这就是您使用 Realm 创建反应式干净架构的方式。
只有当你打算在 Realm 上实际执行 Clean Architecture 时才需要 LooperScheduler。这是因为默认情况下,Realm 鼓励您将数据对象用作域模型,并且作为一个好处,它提供了延迟加载的线程局部视图,这些视图在更新时会在适当的位置发生变化;但是 Clean Architecture 说你应该使用不可变的领域模型(独立于你的数据层)。因此,如果你想创建反应式干净的架构,当 Realm 发生变化时,你可以在后台线程上随时从 Realm 复制,那么你将需要一个循环调度程序(或在后台线程上观察,但是从刷新的 Realm 上复制
Schedulers.io()
).对于 Realm,通常你会希望使用 RealmObjects 作为你的领域模型,并依赖惰性求值。在那种情况下,您不使用
copyFromRealm()
并且您不将 RealmResults 映射到其他东西;但您可以将其显示为
Flowable
或
LiveData
.您可以在
here阅读相关内容。