Source code for yamicache.yamicache

#!/usr/bin/env python
# coding: utf-8
"""
yamicache : Yet another in-memory cache module ('yami' sounds better to me than
'yaim')

This module provides a simple in-memory interface for caching results from
function calls.
"""

import collections
import contextlib
import inspect

# Imports #####################################################################
import json
import pickle
import time
from functools import wraps
from hashlib import sha224
from threading import Lock, Thread
from typing import Any, ItemsView, KeysView, Optional, ValuesView

# Globals #####################################################################
__all__ = ["Cache", "nocache", "override_timeout"]


CachedItem = collections.namedtuple("CachedItem", "value timeout time_added")
INIT_CACHE_VALUE = CachedItem("<value not cached yet>", None, None)


[docs]class Cache(collections.abc.MutableMapping): """ A class for caching and retreiving returns from function calls. :param bool hashing: Whether or not to hash the function inputs when calculating the key. This helps keep the keys *readable*, especially for functions with many inputs. :param str key_join: The character used to join the different parts that make up the hash key. :param bool debug: When ``True``, ``Cache.counters`` will be enabled and cache hits will produce output on ``stdout``. :param str prefix: All cache keys will use this prefix. Since the current implementation is instance-based, this is only helpful if dumping or comparing the cache to another instance. :param bool quiet: Don't print during ``debug`` cache hits :param int default_timeout: If > 0, all cached items will be considered stale this many seconds after they are cached. In that case, the function will be run again, cached, and a new timeout value will be created. :param int gc_thread_wait: The number of seconds in between cache *garbage collection*. The default, ``None``, will disable the garbage collection thread. This parameter is only valid if ``default_timeout`` is > 0 (``ValueError`` is raised otherwise). """ def __init__( self, hashing: bool = True, key_join: str = "|", debug: bool = False, prefix: Optional[str] = None, quiet: bool = False, default_timeout: int = 0, gc_thread_wait: bool = False, ): self._prefix = prefix or "" self._hashing = hashing self._key_join = key_join self._debug = debug self._quiet = quiet self._cache = True # Allow for ``nocache`` self._data_store: dict[Any, Any] = {} self._default_timeout = default_timeout self._gc_thread_wait = gc_thread_wait self._gc_thread = None self._do_gc_thread = False self._gc_lock = Lock() self.counters: dict[Any, int] = {} # Only enabled with ``debug`` # Force all calls to use this value instead of default, or what was # used during decorator creation. self._override_timeout: Optional[int] = None if default_timeout and not isinstance(default_timeout, int): raise ValueError("Default timeout can only be `int`") if self._gc_thread_wait: self._do_gc_thread = True self._gc_thread = Thread(target=self._gc) self._gc_thread.daemon = True self._gc_thread.start() # Default stuff to override MutableMapping ABC ############################ def __len__(self) -> int: return len([x for x, y in self.items() if y is not INIT_CACHE_VALUE]) def __getitem__(self, key: Any) -> Any: """Only return the item if it's not the INIT value""" with self._gc_lock: if (key not in self._data_store) or ( self._data_store[key] is INIT_CACHE_VALUE ): raise KeyError(key) return self._data_store[key] def __setitem__(self, key: Any, value: Any) -> None: with self._gc_lock: self._data_store[key] = value def __delitem__(self, key: Any) -> None: with self._gc_lock: del self._data_store[key] def __iter__(self) -> Any: """ Override ``iter()``. This can make things slow, but it's the only way to prevent the underlying object from changing during iteration. """ with self._gc_lock: for x in self._data_store.keys(): yield x # Override some of the *normal* methods to include the lock ###############
[docs] def clear(self) -> None: """Clear the cache""" with self._gc_lock: self._data_store.clear() self.counters.clear()
[docs] def keys(self) -> KeysView: """Return a list of keys in the cache""" with self._gc_lock: return self._data_store.keys()
[docs] def items(self) -> ItemsView: """Return all items in the cache as a list of ``tuple(key, value)``""" with self._gc_lock: return self._data_store.items()
[docs] def values(self) -> ValuesView: """Return a list of cached values""" with self._gc_lock: return self._data_store.values()
[docs] def pop(self, key: Any, /) -> Any: # type: ignore """Remove the cached value specified by ``key``""" with self._gc_lock: return self._data_store.pop(key)
[docs] def popitem(self) -> Any: """Remove a random item from the cache (only useful during testing)""" with self._gc_lock: return self._data_store.popitem()
########################################################################### def _is_key_initialized(self, key: str) -> bool: with self._gc_lock: return self._data_store.get(key) is INIT_CACHE_VALUE def _from_timestamp(self, timestamp: str) -> float: """Convert a timestamp string to an epoch value""" return time.mktime(time.strptime(timestamp)) def _to_timestamp(self, epoch: Optional[float] = None) -> str: """Convert an epoch value to a timestamp string""" if epoch: return time.asctime(time.localtime(epoch)) return time.asctime() def _debug_print(self, *args) -> None: if self._debug: print(*args)
[docs] def dump(self) -> str: """Dump the entire cache as a JSON string""" return json.dumps(self._data_store, indent=4, separators=(",", ": "))
def _calculate_key(self, func, cached_key: str = "", *args, **kwargs) -> str: """ Calculates the cache key based on the function, inputs, and object settings. :param code func: The function being cached :param str cached_key: The `keyed_cache`, if any :param *args: Any ``*args`` used to call the function :param *kwargs: Any ``*kwargs`` used to call the function """ if cached_key: return cached_key # We need to grab the default arguments. `inspect.getargspec()` # returns the function argument names, and any defaults. The # defaults are always the last args. For example: # `args=['arg1', 'arg2'], defaults=(4,)` means that `arg2` has a # default of 4. spec = inspect.getfullargspec(func) # Load the defaults first, since they may not be in the calling # spec. key: dict = {} if spec.defaults: key = dict(zip(spec.args[-len(spec.defaults) :], spec.defaults)) # Now load in the arguments. key.update(kwargs) key.update(dict(zip(func.__code__.co_varnames, args))) # This next issue is that Python may re-order the keys when we go # to repr them. This will cause invalid cache misses. We can fix # this by recreating a dictionary with a 'known' algorithm. repr_key = repr(dict(sorted(key.items()))) return "{prefix}{name}{join}{formatted_key}".format( join=self._key_join, prefix=(self._prefix + self._key_join) if self._prefix else "", name=func.__name__, formatted_key=sha224(repr_key.encode("utf-8")).hexdigest() if self._hashing else repr_key, ) def _update_counter(self, key: str) -> None: """Keeps track of cache hits""" if not self._debug: return with self._gc_lock: if key in self.counters: self.counters[key] += 1 else: self.counters[key] = 1 def _gc(self): """ This is the garbage collection thread that periodically calls our collect method. """ tnext = time.time() + self._gc_thread_wait while self._do_gc_thread: if time.time() > tnext: self.collect() tnext = time.time() + self._gc_thread_wait time.sleep(1)
[docs] def collect(self, since: Optional[float] = None) -> None: """ Clear any item from the cache that has timed out. """ remove_keys = [] for key, item in self.items(): if ( item.timeout and (time.time() > self._from_timestamp(item.timeout)) ) or (since and (self._from_timestamp(item.time_added) > since)): self._debug_print("collecting : %s" % key) remove_keys.append(key) for key in remove_keys: if key in self: del self[key]
# Decorators ##############################################################
[docs] def clear_cache(self): """ A decorator used to clear the cache everytime the function is called. For example, let's say you have a "discovery" function that stores data read by other functions, and those function use caching. You want to use ``@c.clear_cache()`` for your main function so you don't have to worry about cache being stale. """ def real_decorator(function): @wraps(function) def wrapper(*args, **kwargs): self.clear() return function(*args, **kwargs) return wrapper return real_decorator
[docs] def cached(self, key: str = "", timeout: Optional[int] = 0): """ A decorator used to memoize the return of a function call. """ if timeout and not isinstance(timeout, int): raise ValueError("timeout can only be `int`") elif key and (key in self) or self._is_key_initialized(key): # `key in self` will return False if the key either doesn't exist, # or it's set to the INIT value. Therefore, we need to call # `_is_key_initialized()` to check that condition. raise ValueError("cache key '%s' already exists" % key) elif key: # Set the default value so we can check for collisions when the # next decorator is called. self[key] = INIT_CACHE_VALUE def real_decorator(function, timeout: Optional[int] = timeout): function.__cached_timeout__ = timeout or self._default_timeout @wraps(function) def wrapper(*args, **kwargs): # Check the timeout here, since this is the call and not the # instantiation. if not self._cache: return function(*args, **kwargs) cache_key = self._calculate_key(function, key, *args, **kwargs) # Let `override_timeout` do its thing if self._override_timeout is not None: timeout = self._override_timeout else: timeout = function.__cached_timeout__ try: if cache_key in self and (self[cache_key] is not INIT_CACHE_VALUE): result = self[cache_key] if (not result.timeout) or ( result.timeout and (time.time() <= self._from_timestamp(result.timeout)) ): self._debug_print("cache hit : %s" % cache_key) self._update_counter(cache_key) return result.value elif result.timeout and ( time.time() > self._from_timestamp(result.timeout) ): self._debug_print("cache timeout: %s" % cache_key) result = CachedItem( value=function(*args, **kwargs), timeout=self._to_timestamp(time.time() + timeout) if timeout else 0, time_added=self._to_timestamp(), ) self[cache_key] = result return self[cache_key].value except KeyError: # pragma: nocover # Workaround for threading issues, as opposed to a potential # lock block. A thread may have deleted this key, and # that's fine. We simply need to cache it again. # We won't always hit this, so we disable code coverage. self._debug_print("KeyError %s" % cache_key) self._debug_print("caching %s" % cache_key) result = CachedItem( value=function(*args, **kwargs), timeout=self._to_timestamp(time.time() + timeout) if timeout else None, time_added=self._to_timestamp(), ) self[cache_key] = result return result.value return wrapper return real_decorator
[docs] def serialize(self, filename: str) -> None: """ Serialize the cache to a filename. This process uses ``pickle``; Do not use this function if you are caching something that is not picklable! """ with open(filename, "wb") as fh: pickle.dump(self._data_store, fh, -1)
[docs] def deserialize(self, filename: str) -> None: """ Read the serialized cache data from a file. """ with open(filename, "rb") as fh: self._data_store = pickle.load(fh)
[docs]@contextlib.contextmanager def override_timeout(cache_obj: Cache, timeout: int): cache_obj._override_timeout = timeout try: yield finally: # The value of ``None`` disable the override timeout mechanism cache_obj._override_timeout = None
[docs]@contextlib.contextmanager def nocache(cache_obj: Cache): """ Use this context manager to temporarily disable all caching for an object. Example: >>> from yamicache import Cache, nocache >>> c = Cache() >>> @c.cached ... def test(): ... return 4 ... >>> with nocache(c): ... test() ... 4 >>> print c.data_store {} >>> """ cache_obj._cache = False try: yield finally: cache_obj._cache = True