我正在尝试创建一个GetAndFetch
方法,该方法首先从缓存中返回数据,然后从Web服务获取并返回数据,最后更新缓存。
这样的函数已经存在于akavache
中,但是,由它检索或存储的数据就像一个blob。即如果我对rss
饲料感兴趣,我只能在整个饲料水平上工作,而不是单个项目。我有兴趣创建一个返回IObservable<Item>
项目的版本。这有一个优点,即Item
返回时可以显示新的service
s,而不是等待所有的Items
s。
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
// Then call the service
IObservable<Item> fetchObs = service.GetItems(feedUrl);
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs = fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
{
return cache.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>());
});
// Then make sure cache retrieval, fetching and update is done in order
return cacheBlobObject.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(upadteObs);
}
我的方法的问题是Concat(upadteObs)
重新订阅fetchObs
并最终再次调用service.GetItems(feedUrl)
这是浪费。
你听起来像需要.Publish(share => { ... })
超载。
试试这个:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
return
service
.GetItems(feedUrl)
.Publish(fetchObs =>
{
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs =
fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
cache
.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>()));
// Then make sure cache retrieval, fetching and update is done in order
return
cacheBlobObject
.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(updateObs);
});
}
我担心Concat
电话 - 他们可能需要是Merge
。
此外,似乎你对service.GetItems
的调用仍然是获取所有项目 - 它是如何避免已经在缓存中的项目?
基于评论的替代实现:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
return
(
from hs in cache.GetObject<HashSet<Item>>(feedUrl)
let ids = new HashSet<string>(hs.Select(x => x.Id))
select
hs
.ToObservable()
.Merge(
service
.GetItems(feedUrl)
.Where(x => !ids.Contains(x.Id))
.Do(x => cache.InsertObject(feedUrl, new [] { x })))
).Merge();
}