Source code for ooodev.utils.cache.time_cache

from __future__ import annotations
import threading
from typing import Any
from datetime import datetime, timedelta, timezone
from ooodev.events.args.cancel_event_args import CancelEventArgs
from ooodev.events.args.event_args import EventArgs
from ooodev.utils.helper.dot_dict import DotDict
from ooodev.events.partial.events_partial import EventsPartial


[docs]class TimeCache(EventsPartial): """ Time based Cache. Cached items expire after a specified time. If ``cleanup_interval`` is set, then the cache is cleaned up at regular intervals; Otherwise, the cache is only cleaned up when an item is accessed. Each time an element is accessed, the timestamp is updated. If the element has expired, it is removed from the cache. When an item expires, the event ``cache_items_expired`` is triggered. This event is called on a separate thread. for this reason it is important to make sure that the event handler is thread safe. Example: .. code-block:: python import threading from ooodev.utils.cache.time_cache import TimeCache LOCK = threading.Lock() def on_items_expired(source, event): with LOCK: keys = event.event_data.keys for key in keys: print(f"Expired: {key}") cache = TimeCache(60.0) # 60 seconds cache.subscribe_event("cache_items_expired", on_items_expired) cache["key"] = "value" value = cache["key"] """
[docs] def __init__(self, seconds: float, cleanup_interval: float = 60.0) -> None: """ Time based Cache. Args: seconds (float): Cache expiration time in seconds. cleanup_interval (float, optional): Cache cleanup interval in seconds. If set to ``0`` then the cleanup is disabled. Defaults to ``60.0``. """ EventsPartial.__init__(self) self._lock = threading.Lock() self._delta = timedelta(seconds=max(seconds, 0)) self._seconds = self._delta.total_seconds() self._timeout = max(cleanup_interval, 0) self._expiration_time = datetime.now(timezone.utc) + self._delta self._cache = {} self._timer = None self._fn_clear_expired = self.clear_expired self._fn_trigger_event = self.trigger_event self.start_timer()
[docs] def clear(self) -> None: """ Clear cache. """ self.stop_timer() self._cache.clear() self.stop_timer()
[docs] def get(self, key: Any) -> Any: """ Get value by key. Args: key (Any): Any Hashable object. Returns: Any: Value or ``None`` if not found. Note: The ``get`` method is an alias for the ``__getitem__`` method. So you can use ``cache_inst.get(key)`` or ``cache_inst[key]`` interchangeably. """ return self[key]
[docs] def put(self, key: Any, value: Any) -> None: """ Put value by key. Args: key (Any): Any Hashable object. value (Any): Any object. Note: The ``put`` method is an alias for the ``__setitem__`` method. So you can use ``cache_inst.put(key, value)`` or ``cache_inst[key] = value`` interchangeably. """ self[key] = value
[docs] def remove(self, key: Any) -> None: """ Remove key. Args: key (Any): Any Hashable object. Note: The ``remove`` method is an alias for the ``__delitem__`` method. So you can use ``cache_inst.remove(key)`` or ``del cache_inst[key]`` interchangeably. """ del self[key]
[docs] def start_timer(self) -> bool: """ Start the timer. This only applies if the cleanup interval is set. Returns: bool: ``True`` if the timer is started, ``False`` if the timer is already running. Note: Triggers the event ``time_cache_timer_started``. """ if self.seconds > 0 and self._timeout > 0 and self._timer is None: self._timer = threading.Timer(self._timeout, self._fn_clear_expired) self._timer.daemon = True # important for the timer to stop when the main thread exits self._timer.start() eargs = EventArgs(self) eargs.event_data = DotDict(timer=self._timer) self.trigger_event("time_cache_timer_started", eargs) return True return False
[docs] def stop_timer(self) -> bool: """ Stop the timer. This only applies if the cleanup interval is set. Returns: bool: ``True`` if the timer is stopped, ``False`` if the timer was not running. Note: Triggers the event ``time_cache_timer_stopped``. """ if self._timer is not None: self._timer.cancel() self._timer = None eargs = EventArgs(self) self.trigger_event("time_cache_timer_stopped", eargs) return True return False
[docs] def clear_expired(self) -> None: """ Clear expired items from the cache. Note: Triggers the event ``cache_items_expired``, on a new thread. The event args ``event_data`` is a ``DotDict`` instance that contains the ``keys`` as a list of the items that were removed. """ self.stop_timer() with self._lock: if not self._cache: self.start_timer() return seconds = self.seconds del_keys = [ key for key, (_, timestamp) in self._cache.items() if (datetime.now(timezone.utc) - timestamp).total_seconds() >= seconds ] for key in del_keys: del self._cache[key] if del_keys: eargs = EventArgs(self) eargs.event_data = DotDict(keys=del_keys) thread = threading.Thread(target=self._fn_trigger_event, args=("cache_items_expired", eargs), daemon=True) thread.start() thread.join() # wait for the thread to finish before starting timer again. self.start_timer()
# region Dunder Methods
[docs] def __getitem__(self, key: Any) -> Any: if key is None: raise TypeError("Key must not be None.") if key in self._cache: value, timestamp = self._cache[key] now_dt = datetime.now(timezone.utc) if (now_dt - timestamp).total_seconds() < self.seconds: # update timestamp self._cache[key] = (value, now_dt) return value else: del self._cache[key] # remove expired item return None
[docs] def __setitem__(self, key: Any, value: Any) -> None: """ Set value by key. Args: key (Any): Any Hashable object. value (Any): Any object. Raises: TypeError: If key or value is ``None``. Note: Triggers the event ``cache_item_adding`` before adding the item. The Event is a ``CancelEventArgs`` and can be canceled. Triggers the event ``cache_item_added`` after adding the item. The Event is a ``EventArgs``. Triggers the event ``cache_item_updating`` before updating the item. The Event is a ``CancelEventArgs`` and can be canceled. Triggers the event ``cache_item_updated`` after updating the item. The Event is a ``EventArgs``. The event args ``event_data`` is a ``DotDict`` instance that contains the ``key``, ``value`` and ``is_new`` of the item being added or updated. """ if key is None or value is None: raise TypeError("Key and value must not be None.") is_new = self[key] is None if is_new: cargs = CancelEventArgs(self) cargs.event_data = DotDict(key=key, value=value, is_new=is_new) self.trigger_event("cache_item_adding", cargs) if cargs.cancel: return eargs = EventArgs.from_args(cargs) else: cargs = CancelEventArgs(self) cargs.event_data = DotDict(key=key, value=value, is_new=is_new) self.trigger_event("cache_item_updating", cargs) if cargs.cancel: return eargs = EventArgs.from_args(cargs) self._cache[key] = (value, datetime.now(timezone.utc)) if is_new: self.trigger_event("cache_item_added", eargs) else: self.trigger_event("cache_item_updated", eargs)
[docs] def __contains__(self, key: Any) -> bool: return False if key is None else self[key] is not None
[docs] def __delitem__(self, key: Any) -> None: """ Remove key. Args: key (Any): Any Hashable object. Raises: TypeError: If key is ``None``. Note: Triggers the event ``cache_item_removing`` before removing the item. The Event is a ``CancelEventArgs`` and can be canceled. Triggers the event ``cache_item_removed`` after removing the item. The Event is a ``EventArgs``. The event args ``event_data`` is a ``DotDict`` instance that contains the key of the item being removed. """ if key is None: raise TypeError("Key must not be None.") if key in self: cargs = CancelEventArgs(self) cargs.event_data = DotDict(key=key) self.trigger_event("cache_item_removing", cargs) if cargs.cancel: return del self._cache[key] eargs = EventArgs.from_args(cargs) self.trigger_event("cache_item_removed", eargs)
def __repr__(self) -> str: return f"TimeBasedCache({self.seconds})" def __str__(self) -> str: return f"TimeBasedCache({self.seconds})"
[docs] def __len__(self) -> int: return len(self._cache)
def __del__(self): # not reliable but better than nothing self.stop_timer() # endregion Dunder Methods # region Properties @property def seconds(self) -> float: """ Gets/Sets Cache expiration time in seconds. """ return self._seconds @seconds.setter def seconds(self, value: float) -> None: self._delta = timedelta(seconds=value) self._seconds = self._delta.total_seconds() self._expiration_time = datetime.now(timezone.utc) + self._delta self.clear_expired() @property def cleanup_interval(self) -> float: """ Gets/Sets Cache cleanup interval in seconds. """ return self._timeout @cleanup_interval.setter def cleanup_interval(self, value: float) -> None: self._timeout = max(value, 0) if self._timeout > 0: self.start_timer() else: self.stop_timer()
# endregion Properties