我有一个Single
进行了REST
呼叫。此Single最多可同时调用10次,最终导致同一事物的10个请求。我看不到有什么方法可以使Single
变热,并且在cache()
无效时尝试使用Single
。
我该如何实现?
这是我拨打电话时遵循的逻辑:
fun getUser(userID: UUID): Single<User> {
if (userCache.containsUser(userID)) {
// Just return the value already saved in the cache
return Single.create {
it.onSuccess(getUserFromCache(userID))
}
} else {
// Make rest call, add user in cache, and then return that user
return Single.fromCallable {
val user = getUserFromRest(userID).blockingGet()
userCache.addUser(user)
return@fromCallable user
}.cache()
}
}
我已经更新了先前的代码以实现缓存:
private var ongoingRequests: HashMap<UUID, Single<User>> = HashMap()
fun getUser(userID: UUID): Single<User> {
if (userCache.containsUser(userID)) {
return Single.create {
it.onSuccess(getUserFromCache(userID))
}
} else if (ongoingRequests.containsKey(userID)) {
return ongoingRequests[userID]!!
} else {
val request = Single.create<User> {
getUserFromRest(userID).subscribe(
{ user ->
userCache.addUser(user)
ongoingRequests.remove(userID)
it.onSuccess(user)
},
{
}
)
}.cache()
ongoingRequests[userID] = request
return request
}
}
您的代码中有两个问题。首先,将在您调用getUser方法而不是您进行订阅时检查用户是否存在。
在类似情况下,可以使用Single.deferred
方法返回延迟的实例。
在这种情况下,我想您不需要。如果您使用的是Single.cache,则也可以删除缓存逻辑,因为Rx会这样做。
private val shared: MutableMap<UUID, Single<User>> = mutableMapOf()
fun getUser(userID: UUID): Single<User> {
return shared.getOrPut(userID, {
return Single.fromCallable {
val user = getUserFromRest(userID).blockingGet()
userCache.addUser(user)
return@fromCallable user
}.cache()
})
}
我用Java编写代码来说明这个想法(抱歉,我不是Kotlin开发人员)
缓存长时间处理的呼叫的结果
Single<User> getUser(String userId) {
return Single.<String>create(emitter -> {
Thread.sleep(1000); // call the rest
emitter.onSuccess(new User());
}).cache();
}
当然,您可以使用地图重用结果:
Map<String, Single<User>> ongoingRequests = new HashMap<>();
Single<User> call = ongoingRequests.computeIfAbsent(userId, this::getUser);
call.subscribe()
[call
被执行一次,然后立即被缓存。