# :coding: utf-8
# :copyright: Copyright (c) 2014 ftrack
"""Caching framework.
Defines a standardised :class:`Cache` interface for storing data against
specific keys. Key generation is also standardised using a :class:`KeyMaker`
interface.
Combining a Cache and KeyMaker allows for memoisation of function calls with
respect to the arguments used by using a :class:`Memoiser`.
As a convenience a simple :func:`memoise` decorator is included for quick
memoisation of function using a global cache and standard key maker.
"""
from builtins import str
from builtins import object
import functools
import abc
import collections.abc
import copy
import inspect
import re
import pickle
import contextlib
import dbm as anydbm
import ftrack_api.inspection
import ftrack_api.symbol
[docs]class Cache(metaclass=abc.ABCMeta):
"""Cache interface.
Derive from this to define concrete cache implementations. A cache is
centered around the concept of key:value pairings where the key is unique
across the cache.
"""
[docs] @abc.abstractmethod
def get(self, key):
"""Return value for *key*.
Raise :exc:`KeyError` if *key* not found.
"""
[docs] @abc.abstractmethod
def set(self, key, value):
"""Set *value* for *key*."""
[docs] @abc.abstractmethod
def remove(self, key):
"""Remove *key* and return stored value.
Raise :exc:`KeyError` if *key* not found.
"""
[docs] def keys(self):
"""Return list of keys at this current time.
.. warning::
Actual keys may differ from those returned due to timing of access.
"""
raise NotImplementedError() # pragma: no cover
[docs] def values(self):
"""Return values for current keys."""
values = []
for key in list(self.keys()):
try:
value = self.get(key)
except KeyError:
continue
else:
values.append(value)
return values
[docs] def clear(self, pattern=None):
"""Remove all keys matching *pattern*.
*pattern* should be a regular expression string.
If *pattern* is None then all keys will be removed.
"""
if pattern is not None:
pattern = re.compile(pattern)
for key in list(self.keys()):
if pattern is not None:
if not pattern.search(key):
continue
try:
self.remove(key)
except KeyError:
pass
[docs]class ProxyCache(Cache):
"""Proxy another cache."""
[docs] def __init__(self, proxied):
"""Initialise cache with *proxied* cache instance."""
self.proxied = proxied
super(ProxyCache, self).__init__()
[docs] def get(self, key):
"""Return value for *key*.
Raise :exc:`KeyError` if *key* not found.
"""
return self.proxied.get(key)
[docs] def set(self, key, value):
"""Set *value* for *key*."""
return self.proxied.set(key, value)
[docs] def remove(self, key):
"""Remove *key* and return stored value.
Raise :exc:`KeyError` if *key* not found.
"""
return self.proxied.remove(key)
[docs] def keys(self):
"""Return list of keys at this current time.
.. warning::
Actual keys may differ from those returned due to timing of access.
"""
return list(self.proxied.keys())
[docs]class LayeredCache(Cache):
"""Layered cache."""
[docs] def __init__(self, caches):
"""Initialise cache with *caches*."""
super(LayeredCache, self).__init__()
self.caches = caches
[docs] def get(self, key):
"""Return value for *key*.
Raise :exc:`KeyError` if *key* not found.
Attempt to retrieve from cache layers in turn, starting with shallowest.
If value retrieved, then also set the value in each higher level cache
up from where retrieved.
"""
target_caches = []
value = ftrack_api.symbol.NOT_SET
for cache in self.caches:
try:
value = cache.get(key)
except KeyError:
target_caches.append(cache)
continue
else:
break
if value is ftrack_api.symbol.NOT_SET:
raise KeyError(key)
# Set value on all higher level caches.
for cache in target_caches:
cache.set(key, value)
return value
[docs] def set(self, key, value):
"""Set *value* for *key*."""
for cache in self.caches:
cache.set(key, value)
[docs] def remove(self, key):
"""Remove *key*.
Raise :exc:`KeyError` if *key* not found in any layer.
"""
removed = False
for cache in self.caches:
try:
cache.remove(key)
except KeyError:
pass
else:
removed = True
if not removed:
raise KeyError(key)
[docs] def keys(self):
"""Return list of keys at this current time.
.. warning::
Actual keys may differ from those returned due to timing of access.
"""
keys = []
for cache in self.caches:
keys.extend(list(cache.keys()))
return list(set(keys))
[docs]class MemoryCache(Cache):
"""Memory based cache."""
[docs] def __init__(self):
"""Initialise cache."""
self._cache = {}
super(MemoryCache, self).__init__()
[docs] def get(self, key):
"""Return value for *key*.
Raise :exc:`KeyError` if *key* not found.
"""
return self._cache[key]
[docs] def set(self, key, value):
"""Set *value* for *key*."""
self._cache[key] = value
[docs] def remove(self, key):
"""Remove *key*.
Raise :exc:`KeyError` if *key* not found.
"""
del self._cache[key]
[docs] def keys(self):
"""Return list of keys at this current time.
.. warning::
Actual keys may differ from those returned due to timing of access.
"""
return list(self._cache.keys())
[docs]class FileCache(Cache):
"""File based cache that uses :mod:`anydbm` module.
.. note::
No locking of the underlying file is performed.
"""
[docs] def __init__(self, path):
"""Initialise cache at *path*."""
self.path = path
# Initialise cache.
cache = anydbm.open(self.path, "c")
cache.close()
super(FileCache, self).__init__()
@contextlib.contextmanager
def _database(self):
"""Yield opened database file."""
cache = anydbm.open(self.path, "w")
try:
yield cache
finally:
cache.close()
[docs] def get(self, key):
"""Return value for *key*.
Raise :exc:`KeyError` if *key* not found.
"""
with self._database() as cache:
return cache[key.encode("ascii")].decode("utf-8")
[docs] def set(self, key, value):
"""Set *value* for *key*."""
with self._database() as cache:
cache[key.encode("ascii")] = value
[docs] def remove(self, key):
"""Remove *key*.
Raise :exc:`KeyError` if *key* not found.
"""
with self._database() as cache:
del cache[key.encode("ascii")]
[docs] def keys(self):
"""Return list of keys at this current time.
.. warning::
Actual keys may differ from those returned due to timing of access.
"""
with self._database() as cache:
return [s.decode("utf-8") for s in cache.keys()]
# return list(map(str, cache.keys()))
[docs]class SerialisedCache(ProxyCache):
"""Proxied cache that stores values as serialised data."""
[docs] def __init__(self, proxied, encode=None, decode=None):
"""Initialise cache with *encode* and *decode* callables.
*proxied* is the underlying cache to use for storage.
"""
self.encode = encode
self.decode = decode
super(SerialisedCache, self).__init__(proxied)
[docs] def get(self, key):
"""Return value for *key*.
Raise :exc:`KeyError` if *key* not found.
"""
value = super(SerialisedCache, self).get(key)
if self.decode:
value = self.decode(value)
return value
[docs] def set(self, key, value):
"""Set *value* for *key*."""
if self.encode:
value = self.encode(value)
super(SerialisedCache, self).set(key, value)
[docs]class KeyMaker(metaclass=abc.ABCMeta):
"""Generate unique keys."""
[docs] def __init__(self):
"""Initialise key maker."""
super(KeyMaker, self).__init__()
self.item_separator = ""
[docs] def key(self, *items):
"""Return key for *items*."""
keys = []
for item in items:
keys.append(self._key(item))
return self.item_separator.join(keys)
@abc.abstractmethod
def _key(self, obj):
"""Return key for *obj*."""
[docs]class StringKeyMaker(KeyMaker):
"""Generate string key."""
def _key(self, obj):
"""Return key for *obj*."""
return str(obj)
[docs]class ObjectKeyMaker(KeyMaker):
"""Generate unique keys for objects."""
[docs] def __init__(self):
"""Initialise key maker."""
super(ObjectKeyMaker, self).__init__()
self.item_separator = b"\0"
self.mapping_identifier = b"\1"
self.mapping_pair_separator = b"\2"
self.iterable_identifier = b"\3"
self.name_identifier = b"\4"
def _key(self, item):
return self.__key(item)
def __key(self, item):
"""Return key for *item*.
Returned key will be a pickle like string representing the *item*. This
allows for typically non-hashable objects to be used in key generation
(such as dictionaries).
If *item* is iterable then each item in it shall also be passed to this
method to ensure correct key generation.
Special markers are used to distinguish handling of specific cases in
order to ensure uniqueness of key corresponds directly to *item*.
Example::
>>> key_maker = ObjectKeyMaker()
>>> def add(x, y):
... "Return sum of *x* and *y*."
... return x + y
...
>>> key_maker.key(add, (1, 2))
'\x04add\x00__main__\x00\x03\x80\x02K\x01.\x00\x80\x02K\x02.\x03'
>>> key_maker.key(add, (1, 3))
'\x04add\x00__main__\x00\x03\x80\x02K\x01.\x00\x80\x02K\x03.\x03'
"""
# Ensure p3k uses a protocol available in py2 so can decode it.
pickle_protocol = 2
# TODO: Consider using a more robust and comprehensive solution such as
# dill (https://github.com/uqfoundation/dill).
if isinstance(item, collections.abc.Iterable):
if isinstance(item, str):
return pickle.dumps(item, pickle_protocol)
if isinstance(item, collections.abc.Mapping):
contents = self.item_separator.join(
[
(
self._key(key)
+ self.mapping_pair_separator
+ self._key(value)
)
for key, value in sorted(item.items())
]
)
return self.mapping_identifier + contents + self.mapping_identifier
else:
contents = self.item_separator.join([self._key(item) for item in item])
return self.iterable_identifier + contents + self.iterable_identifier
elif inspect.ismethod(item):
return b"".join(
(
self.name_identifier,
item.__name__.encode(),
self.item_separator,
item.__self__.__class__.__name__.encode(),
self.item_separator,
item.__module__.encode(),
)
)
elif inspect.isfunction(item) or inspect.isclass(item):
return b"".join(
(
self.name_identifier,
item.__name__.encode(),
self.item_separator,
item.__module__.encode(),
)
)
elif inspect.isbuiltin(item):
return self.name_identifier + item.__name__.encode()
else:
return pickle.dumps(item, pickle_protocol)
[docs]class Memoiser(object):
"""Memoise function calls using a :class:`KeyMaker` and :class:`Cache`.
Example::
>>> memoiser = Memoiser(MemoryCache(), ObjectKeyMaker())
>>> def add(x, y):
... "Return sum of *x* and *y*."
... print 'Called'
... return x + y
...
>>> memoiser.call(add, (1, 2), {})
Called
>>> memoiser.call(add, (1, 2), {})
>>> memoiser.call(add, (1, 3), {})
Called
"""
[docs] def __init__(self, cache=None, key_maker=None, return_copies=True):
"""Initialise with *cache* and *key_maker* to use.
If *cache* is not specified a default :class:`MemoryCache` will be
used. Similarly, if *key_maker* is not specified a default
:class:`ObjectKeyMaker` will be used.
If *return_copies* is True then all results returned from the cache will
be deep copies to avoid indirect mutation of cached values.
"""
self.cache = cache
if self.cache is None:
self.cache = MemoryCache()
self.key_maker = key_maker
if self.key_maker is None:
self.key_maker = ObjectKeyMaker()
self.return_copies = return_copies
super(Memoiser, self).__init__()
[docs] def call(self, function, args=None, kw=None):
"""Call *function* with *args* and *kw* and return result.
If *function* was previously called with exactly the same arguments
then return cached result if available.
Store result for call in cache.
"""
if args is None:
args = ()
if kw is None:
kw = {}
# Support arguments being passed as positionals or keywords.
arguments = inspect.getcallargs(function, *args, **kw)
key = self.key_maker.key(function, arguments)
try:
value = self.cache.get(key)
except KeyError:
value = function(*args, **kw)
self.cache.set(key, value)
# If requested, deep copy value to return in order to avoid cached value
# being inadvertently altered by the caller.
if self.return_copies:
value = copy.deepcopy(value)
return value
[docs]def memoise_decorator(memoiser):
"""Decorator to memoise function calls using *memoiser*."""
def outer(function):
@functools.wraps(function)
def inner(*args, **kw):
return memoiser.call(function, args, kw)
return inner
return outer
#: Default memoiser.
memoiser = Memoiser()
#: Default memoise decorator using standard cache and key maker.
memoise = memoise_decorator(memoiser)