我有来自API的单个GET请求。通过Retrofit
和RxJava
,我得到了答复。现在,我要将响应添加到Room
数据库。但是我不想使用AsyncTask
我想使用RxJava
。我在Dao insertAll中准备了一个方法。如何使用RxJava
将列表异步添加到数据库?我的要求:
@GET("contacts")
fun getContactModel(): Single<List<Contact>>
我的insertAll
方法:
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insertAll(contact: List<Contact>?) : Completable
我使用RxJava的请求:
val disposable = CompositeDisposable()
disposable.add(contactsRepository.modelSingle()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSingleObserver<List<Contact>>() {
override fun onSuccess(t: List<Contact>) {
// t - My List from Api
}
override fun onError(e: Throwable) {
}
})
)
如果contactsRepository.modelSingle()
返回一个可观察的Single
,则可以执行
disposable.add(contactsRepository.modelSingle()
.doOnSuccess { data -> saveToDb(data) } // this will be called on Schedulers.io
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSingleObserver<List<Contact>>() {
override fun onSuccess(t: List<Contact>) {
// t - My List from Api
}
override fun onError(e: Throwable) {
}
})
)
如果是另一个Observable,则可以使用flatMap
disposable.add(contactsRepository.modelSingle()
.flatMap { data ->
saveToDb(data)
Observable.just(data)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe()
)
在Java中,这是我将首先在RxJava中使用Retrofit,并且能够如下从API检索Observable响应
在翻新界面服务中,使用以下代码
@GET("contacts")
Observable<Contacts> getContactsFromAPI
在@Dao Room Class工具中
@Insert(onConflict = OnConflictStrategy.REPLACE)
Completable insertContact(List<Contact> contact);
要从API检索可观察的数据项,请使用以下代码
RetrofitService getContactService = RetroInstance.getService();
Observable<Contact> apiData =
getContactService.getContactsFromAPI();
compositeDisposable.add(apiData
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Contact>() {
@Override
public void onNext(List<Contact> contact) {
newContact(contact)
Log.d(LOG_TAG, "The contacts are : "+ contact);
}
@Override
public void onError(Throwable e) {
Log.d(LOG_TAG, "Error: "+ e.getMessage());
}
@Override
public void onComplete() {
}
}));
将要从api的添加检索添加到Room数据库,然后创建一个称为的方法,该方法用于将数据插入到Room数据库中,并且该方法在为从api检索数据而编写的代码上被调用。
public void newContact(List<Contacts> contacts) { disposable.add(Completable.fromAction(new Action() { @Override public void run() throws Exception { rowIdInserted = contactAppDatabase.userDao().insertContact(contacts); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onComplete() { Log.d(TAG, "inserted successful"); } @Override public void onError(Throwable e) { Log.d(LOG_TAG, "tHE new error on create the new user is: " + e.getMessage()); } })); }
我认为有帮助