我想知道如何使用 OrderedDict 实现基于大小的 LRU。我正在努力解决的部分是当我调用
__contains__
时移动链表的头部。除了 __contains__
方法之外,以下实现均有效。它会导致无限递归。我有什么想法可以做到这一点吗?
from collections import OrderedDict
class Cache(OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds)
self.val_sum = 0
self.hit = 0
self.num_evicted = 0
self.total_req = 0
self._check_size_limit()
def __contains__(self, key):
self.total_req += 1
if OrderedDict.__contains__(self, key):
self.hit += 1
value = OrderedDict.__getitem__ (self,key)
self.move_item_to_the_top(key, value)
return True
else:
return False
def move_item_to_the_top(self, key, value):
OrderedDict.__setitem__(self, key, value)
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self.val_sum += value
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while self.val_sum > self.size_limit:
key, value = self.popitem(last=False)
self.val_sum -= value
self.num_evicted += 1
def get_cache_size(self):
return self.val_sum
def get_number_evicted(self):
return self.num_evicted
def get_hit_ratio(self):
return 1.00 * self.hit / self.total_req
def get_total_req(self):
return self.total_req
def get_hits(self):
return self.hit
这就是我的使用方式:
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
if cache_key not in cache:
cache[cache_key] = int(obj_size)
print cache
文档对此有一个秘诀,您可以在这里看到它:https://docs.python.org/3/library/collections.html#collections.OrderedDict
class LRU(OrderedDict):
'Limit size, evicting the least recently looked-up key when full'
def __init__(self, maxsize=128, *args, **kwds):
self.maxsize = maxsize
super().__init__(*args, **kwds)
def __getitem__(self, key):
value = super().__getitem__(key)
self.move_to_end(key)
return value
def __setitem__(self, key, value):
super().__setitem__(key, value)
if len(self) > self.maxsize:
oldest = next(iter(self))
del self[oldest]
self.move_to_end(key)
运行您的代码时出现以下错误:
python cache.py
(1, 3)
(2, 3)
(1, 3)
Traceback (most recent call last):
File "cache.py", line 68, in <module>
if cache_key not in cache:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
[...]
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
RuntimeError: maximum recursion depth exceeded in __instancecheck__
查看
collections.py
的第 75 行,它表明您的回调 Cache.__contains__
导致了无限循环。
您可以重写
Cache
类而不覆盖 __contains__
,而是使用 Cache.__getitem__
来跟踪对缓存的访问:
from collections import OrderedDict
class Cache(OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds)
self.val_sum = 0
self.hit = 0
self.num_evicted = 0
self.total_req = 0
self._check_size_limit()
def move_item_to_the_top(self, key, value):
del self[key]
OrderedDict.__setitem__(self, key, value)
def __getitem__(self, key):
self.total_req += 1
try:
value = OrderedDict.__getitem__(self, key)
except KeyError:
raise
else:
self.hit += 1
self.move_item_to_the_top(key, value)
return value
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self.val_sum += value
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while self.val_sum > self.size_limit:
key, value = self.popitem(last=False)
self.val_sum -= value
self.num_evicted += 1
def get_cache_size(self):
return self.val_sum
def get_number_evicted(self):
return self.num_evicted
def get_hit_ratio(self):
return 1.00 * self.hit / self.total_req
def get_total_req(self):
return self.total_req
def get_hits(self):
return self.hit
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
try:
cache[cache_key]
except KeyError:
cache[cache_key] = int(obj_size)
print cache
您仍然可以使用
foo not in cache
,但这不会算作未命中或命中。如果您想计算 any 访问次数,请使用首选语法 try/except
[1] ,例如:
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
try:
cache[cache_key]
except KeyError:
cache[cache_key] = int(obj_size)
print cache
[1] 这是根据列表或字典中项目是否存在有条件执行某些操作的首选语法,因为它只需要对容器进行一次访问。