Module kvalchemy.client

Home to the KVAlchemy client.

Expand source code
"""
Home to the KVAlchemy client.
"""
import contextlib
import logging
from typing import Any, Callable, Iterable, Union

import backoff
from sqlalchemy import create_engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, scoped_session, sessionmaker

from kvalchemy.models import KEY_MAX_LENGTH, TAG_MAX_LENGTH, Base, KVStore, ValueMixIn
from kvalchemy.proxy import Proxy
from kvalchemy.time import ExpirationType, to_expire
from kvalchemy.values import ENOVAL

log = logging.getLogger(__name__)


retry_integrity_errors = backoff.on_exception(
    backoff.constant, IntegrityError, interval=0.1, max_time=30
)

DEFAULT_TAG = "__default__"


class KVAlchemy:
    """
    Client for working with the key-value store.
    """

    def __init__(self, url: str, create_models: bool = True) -> None:
        """
        Initializes the KVAlchemy client.

        Takes in the sqlalchemy url to connect to the database, along
        with an option to ensure the necessary db models are created.
        """
        self.url = url
        self._engine = create_engine(url, pool_pre_ping=True)

        if create_models:
            Base.metadata.create_all(self._engine)

        self._session_factory = sessionmaker(bind=self._engine)
        self._session = scoped_session(self._session_factory)

    def __iter__(self) -> Iterable[str]:
        """
        Returns an iterable of all non-expired keys in the store.
        """
        with self.session(commit=False) as session:
            query = (
                session.query(KVStore)
                .filter(KVStore.non_expired_filter())
                .order_by(KVStore.key.asc(), KVStore.tag.asc())
            )
            for key in query.all():
                yield key

    def __len__(self) -> int:
        """
        Returns the number of non-expired key-value pairs in the store.
        """
        count = 0
        for _ in self:
            count += 1
        return count

    @contextlib.contextmanager
    def session(
        self, commit: bool = True, delete_expired: bool = True
    ) -> Iterable[Session]:
        """
        Contextmanager to obtain a temp session with the underlying database.

        If commit is True, the session will be committed after the block.
        If delete_expired is True, any expired keys will be deleted before exiting the block.
            Note that delete_expired only applies if commit is True.
        """
        with self._session() as session:
            yield session

            if commit:
                # End the user's session via commit.
                session.commit()

                if delete_expired:
                    # Now we're starting a new transaction to delete expired keys.
                    session.query(KVStore).filter(
                        ~KVStore.non_expired_filter()
                    ).delete()

                    # commit the delete transaction
                    session.commit()

    @retry_integrity_errors
    def get(
        self,
        key: str,
        default: Any = ENOVAL,
        tag: str = DEFAULT_TAG,
        return_expiration: bool = False,
    ) -> Any:
        """
        Retrieves the value for the given key and tag.

        If the key/tag combo is not found (or expired), and a default is provided, the
        default value is returned. If no default is provided, a KeyError is raised.

        If return_expiration is True, will return the (value, (expiration datetime or None)) as a tuple
        """
        with self.session() as session:
            query = (
                session.query(KVStore)
                .filter(KVStore.non_expired_filter())
                .filter_by(key=key, tag=tag)
            )
            result = query.one_or_none()

            if result is None:
                result = ValueMixIn(default)

            if result.value is ENOVAL:
                raise KeyError(f"key: {key}, tag: {tag}")

            if return_expiration:
                return result.value, getattr(result, "expire", None)

            return result.value

    @retry_integrity_errors
    def set(
        self,
        key: str,
        value: Any,
        tag: str = DEFAULT_TAG,
        expire: ExpirationType = None,
    ) -> None:
        """
        Sets the given key/tag combo to the value provided.

        If expire is provided, it must be something that can be processed by
        the to_expire function in kvalchemy.time.
        """
        with self.session() as session:
            session.merge(
                KVStore(key=key, value=value, tag=tag, expire=to_expire(expire))
            )

    @retry_integrity_errors
    def delete(self, key: str, tag: str = DEFAULT_TAG) -> None:
        """
        Deletes the given key/tag combo from the store.
        """
        with self.session() as session:
            query = (
                session.query(KVStore)
                .filter(KVStore.non_expired_filter())
                .filter_by(key=key, tag=tag)
            )
            result = query.one_or_none()

            if result is not None:
                session.delete(result)

    def pop(self, key: str, default: Any = ENOVAL, tag: str = DEFAULT_TAG) -> None:
        """
        Pops the given key/tag combo from the store.

        If the key/tag combo is not found (or expired), and a default is provided, the
        default value is returned. If no default is provided, a KeyError is raised.
        """
        sentinel = object()
        value = self.get(key, sentinel, tag)

        if value is sentinel:
            if default is ENOVAL:
                raise KeyError(f"key: {key}, tag: {tag}")
            else:
                value = default

        self.delete(key, tag)

        return value

    @retry_integrity_errors
    def clear(self) -> None:
        """
        Clears all key-value pairs from the store.
        """
        with self.session() as session:
            session.query(KVStore).delete()

    def get_proxy(
        self, key: str, default: Any = ENOVAL, tag: str = DEFAULT_TAG
    ) -> Proxy:
        """
        Returns a Proxy object for the given key, tag, default.
        """
        return Proxy(self, key, default, tag)

    @retry_integrity_errors
    def delete_tag(self, tag: str = DEFAULT_TAG) -> int:
        """
        Deletes all keys under a given tag. Defaults to the default tag.

        Returns the number of keys deleted.
        """
        with self.session() as session:
            return session.query(KVStore).filter(KVStore.tag == tag).delete()

    def memoize(
        self,
        expire: ExpirationType = None,
        expire_if: Union[bool, Callable] = False,
        skip_saving_to_cache_if: Union[bool, Callable] = False,
    ):
        """
        A decorator to memoize the results of a function into the key-value store.

        expire allows us to specify a ttl to expire this memoization on.

        expire_if allows a bool or callable, if callable it will be called (and should return a bool).
            If it is True, we'll expire the cache and allow the func to be called.
            If it is False, the cache will be hit per the existing ttl.

        skip_saving_to_cache_if allows a bool or callable, if callable it will be called with a single param of the value about to be returned
            (and should return a bool). This is only checked if the underlying function is otherwised called.
            If it is True, we will not save this value as memoized (so next call with hit the function again).
            If it is False, we will save the value to the cache as per normal.
        """
        if callable(expire):
            # we've been called like:
            # @memoize
            # without () at the end
            func = expire
            expire = None
        else:
            func = None

        def inner(func):
            # Warning: we're truncating to fit the tag
            tag = f"memoize.{func.__module__}_{func.__qualname__}_{expire!s}"[
                :TAG_MAX_LENGTH
            ]

            def wrapper(*args, **kwargs):
                # Warning: we're truncating to fit the key
                key = f"{args!s}_{kwargs!s}"[:KEY_MAX_LENGTH]

                # if you overwrite inner.expire_if then the callable would only get evaluated once
                # ... so don't do that.
                expire_if = inner.expire_if

                if callable(expire_if):
                    expire_if = expire_if()

                try_cache = not bool(expire_if)

                NO_RESULT = object()
                result = NO_RESULT
                if try_cache:
                    try:
                        result = self.get(key, tag=tag)
                    except KeyError:
                        pass
                else:
                    log.debug(f"expire_if is forcing us to ignore cache: {key}")

                if result == NO_RESULT:
                    result = func(*args, **kwargs)

                    skip_saving_to_cache = False
                    if callable(skip_saving_to_cache_if):
                        if skip_saving_to_cache_if(result):
                            skip_saving_to_cache = True
                    elif skip_saving_to_cache_if:
                        skip_saving_to_cache = skip_saving_to_cache_if

                    if skip_saving_to_cache:
                        log.debug(
                            f"skip_saving_to_cache_if is forcing us to not save to cache the value for key: {key}"
                        )
                    else:
                        self.set(key, result, tag=tag, expire=expire)

                return result

            wrapper.cache_clear = lambda: self.delete_tag(tag)
            return wrapper

        inner.expire_if = expire_if

        if func:
            return inner(func)
        else:
            return inner

Classes

class KVAlchemy (url: str, create_models: bool = True)

Client for working with the key-value store.

Initializes the KVAlchemy client.

Takes in the sqlalchemy url to connect to the database, along with an option to ensure the necessary db models are created.

Expand source code
class KVAlchemy:
    """
    Client for working with the key-value store.
    """

    def __init__(self, url: str, create_models: bool = True) -> None:
        """
        Initializes the KVAlchemy client.

        Takes in the sqlalchemy url to connect to the database, along
        with an option to ensure the necessary db models are created.
        """
        self.url = url
        self._engine = create_engine(url, pool_pre_ping=True)

        if create_models:
            Base.metadata.create_all(self._engine)

        self._session_factory = sessionmaker(bind=self._engine)
        self._session = scoped_session(self._session_factory)

    def __iter__(self) -> Iterable[str]:
        """
        Returns an iterable of all non-expired keys in the store.
        """
        with self.session(commit=False) as session:
            query = (
                session.query(KVStore)
                .filter(KVStore.non_expired_filter())
                .order_by(KVStore.key.asc(), KVStore.tag.asc())
            )
            for key in query.all():
                yield key

    def __len__(self) -> int:
        """
        Returns the number of non-expired key-value pairs in the store.
        """
        count = 0
        for _ in self:
            count += 1
        return count

    @contextlib.contextmanager
    def session(
        self, commit: bool = True, delete_expired: bool = True
    ) -> Iterable[Session]:
        """
        Contextmanager to obtain a temp session with the underlying database.

        If commit is True, the session will be committed after the block.
        If delete_expired is True, any expired keys will be deleted before exiting the block.
            Note that delete_expired only applies if commit is True.
        """
        with self._session() as session:
            yield session

            if commit:
                # End the user's session via commit.
                session.commit()

                if delete_expired:
                    # Now we're starting a new transaction to delete expired keys.
                    session.query(KVStore).filter(
                        ~KVStore.non_expired_filter()
                    ).delete()

                    # commit the delete transaction
                    session.commit()

    @retry_integrity_errors
    def get(
        self,
        key: str,
        default: Any = ENOVAL,
        tag: str = DEFAULT_TAG,
        return_expiration: bool = False,
    ) -> Any:
        """
        Retrieves the value for the given key and tag.

        If the key/tag combo is not found (or expired), and a default is provided, the
        default value is returned. If no default is provided, a KeyError is raised.

        If return_expiration is True, will return the (value, (expiration datetime or None)) as a tuple
        """
        with self.session() as session:
            query = (
                session.query(KVStore)
                .filter(KVStore.non_expired_filter())
                .filter_by(key=key, tag=tag)
            )
            result = query.one_or_none()

            if result is None:
                result = ValueMixIn(default)

            if result.value is ENOVAL:
                raise KeyError(f"key: {key}, tag: {tag}")

            if return_expiration:
                return result.value, getattr(result, "expire", None)

            return result.value

    @retry_integrity_errors
    def set(
        self,
        key: str,
        value: Any,
        tag: str = DEFAULT_TAG,
        expire: ExpirationType = None,
    ) -> None:
        """
        Sets the given key/tag combo to the value provided.

        If expire is provided, it must be something that can be processed by
        the to_expire function in kvalchemy.time.
        """
        with self.session() as session:
            session.merge(
                KVStore(key=key, value=value, tag=tag, expire=to_expire(expire))
            )

    @retry_integrity_errors
    def delete(self, key: str, tag: str = DEFAULT_TAG) -> None:
        """
        Deletes the given key/tag combo from the store.
        """
        with self.session() as session:
            query = (
                session.query(KVStore)
                .filter(KVStore.non_expired_filter())
                .filter_by(key=key, tag=tag)
            )
            result = query.one_or_none()

            if result is not None:
                session.delete(result)

    def pop(self, key: str, default: Any = ENOVAL, tag: str = DEFAULT_TAG) -> None:
        """
        Pops the given key/tag combo from the store.

        If the key/tag combo is not found (or expired), and a default is provided, the
        default value is returned. If no default is provided, a KeyError is raised.
        """
        sentinel = object()
        value = self.get(key, sentinel, tag)

        if value is sentinel:
            if default is ENOVAL:
                raise KeyError(f"key: {key}, tag: {tag}")
            else:
                value = default

        self.delete(key, tag)

        return value

    @retry_integrity_errors
    def clear(self) -> None:
        """
        Clears all key-value pairs from the store.
        """
        with self.session() as session:
            session.query(KVStore).delete()

    def get_proxy(
        self, key: str, default: Any = ENOVAL, tag: str = DEFAULT_TAG
    ) -> Proxy:
        """
        Returns a Proxy object for the given key, tag, default.
        """
        return Proxy(self, key, default, tag)

    @retry_integrity_errors
    def delete_tag(self, tag: str = DEFAULT_TAG) -> int:
        """
        Deletes all keys under a given tag. Defaults to the default tag.

        Returns the number of keys deleted.
        """
        with self.session() as session:
            return session.query(KVStore).filter(KVStore.tag == tag).delete()

    def memoize(
        self,
        expire: ExpirationType = None,
        expire_if: Union[bool, Callable] = False,
        skip_saving_to_cache_if: Union[bool, Callable] = False,
    ):
        """
        A decorator to memoize the results of a function into the key-value store.

        expire allows us to specify a ttl to expire this memoization on.

        expire_if allows a bool or callable, if callable it will be called (and should return a bool).
            If it is True, we'll expire the cache and allow the func to be called.
            If it is False, the cache will be hit per the existing ttl.

        skip_saving_to_cache_if allows a bool or callable, if callable it will be called with a single param of the value about to be returned
            (and should return a bool). This is only checked if the underlying function is otherwised called.
            If it is True, we will not save this value as memoized (so next call with hit the function again).
            If it is False, we will save the value to the cache as per normal.
        """
        if callable(expire):
            # we've been called like:
            # @memoize
            # without () at the end
            func = expire
            expire = None
        else:
            func = None

        def inner(func):
            # Warning: we're truncating to fit the tag
            tag = f"memoize.{func.__module__}_{func.__qualname__}_{expire!s}"[
                :TAG_MAX_LENGTH
            ]

            def wrapper(*args, **kwargs):
                # Warning: we're truncating to fit the key
                key = f"{args!s}_{kwargs!s}"[:KEY_MAX_LENGTH]

                # if you overwrite inner.expire_if then the callable would only get evaluated once
                # ... so don't do that.
                expire_if = inner.expire_if

                if callable(expire_if):
                    expire_if = expire_if()

                try_cache = not bool(expire_if)

                NO_RESULT = object()
                result = NO_RESULT
                if try_cache:
                    try:
                        result = self.get(key, tag=tag)
                    except KeyError:
                        pass
                else:
                    log.debug(f"expire_if is forcing us to ignore cache: {key}")

                if result == NO_RESULT:
                    result = func(*args, **kwargs)

                    skip_saving_to_cache = False
                    if callable(skip_saving_to_cache_if):
                        if skip_saving_to_cache_if(result):
                            skip_saving_to_cache = True
                    elif skip_saving_to_cache_if:
                        skip_saving_to_cache = skip_saving_to_cache_if

                    if skip_saving_to_cache:
                        log.debug(
                            f"skip_saving_to_cache_if is forcing us to not save to cache the value for key: {key}"
                        )
                    else:
                        self.set(key, result, tag=tag, expire=expire)

                return result

            wrapper.cache_clear = lambda: self.delete_tag(tag)
            return wrapper

        inner.expire_if = expire_if

        if func:
            return inner(func)
        else:
            return inner

Methods

def clear(self) ‑> None

Clears all key-value pairs from the store.

Expand source code
@retry_integrity_errors
def clear(self) -> None:
    """
    Clears all key-value pairs from the store.
    """
    with self.session() as session:
        session.query(KVStore).delete()
def delete(self, key: str, tag: str = '__default__') ‑> None

Deletes the given key/tag combo from the store.

Expand source code
@retry_integrity_errors
def delete(self, key: str, tag: str = DEFAULT_TAG) -> None:
    """
    Deletes the given key/tag combo from the store.
    """
    with self.session() as session:
        query = (
            session.query(KVStore)
            .filter(KVStore.non_expired_filter())
            .filter_by(key=key, tag=tag)
        )
        result = query.one_or_none()

        if result is not None:
            session.delete(result)
def delete_tag(self, tag: str = '__default__') ‑> int

Deletes all keys under a given tag. Defaults to the default tag.

Returns the number of keys deleted.

Expand source code
@retry_integrity_errors
def delete_tag(self, tag: str = DEFAULT_TAG) -> int:
    """
    Deletes all keys under a given tag. Defaults to the default tag.

    Returns the number of keys deleted.
    """
    with self.session() as session:
        return session.query(KVStore).filter(KVStore.tag == tag).delete()
def get(self, key: str, default: Any = <object object>, tag: str = '__default__', return_expiration: bool = False) ‑> Any

Retrieves the value for the given key and tag.

If the key/tag combo is not found (or expired), and a default is provided, the default value is returned. If no default is provided, a KeyError is raised.

If return_expiration is True, will return the (value, (expiration datetime or None)) as a tuple

Expand source code
@retry_integrity_errors
def get(
    self,
    key: str,
    default: Any = ENOVAL,
    tag: str = DEFAULT_TAG,
    return_expiration: bool = False,
) -> Any:
    """
    Retrieves the value for the given key and tag.

    If the key/tag combo is not found (or expired), and a default is provided, the
    default value is returned. If no default is provided, a KeyError is raised.

    If return_expiration is True, will return the (value, (expiration datetime or None)) as a tuple
    """
    with self.session() as session:
        query = (
            session.query(KVStore)
            .filter(KVStore.non_expired_filter())
            .filter_by(key=key, tag=tag)
        )
        result = query.one_or_none()

        if result is None:
            result = ValueMixIn(default)

        if result.value is ENOVAL:
            raise KeyError(f"key: {key}, tag: {tag}")

        if return_expiration:
            return result.value, getattr(result, "expire", None)

        return result.value
def get_proxy(self, key: str, default: Any = <object object>, tag: str = '__default__') ‑> Proxy

Returns a Proxy object for the given key, tag, default.

Expand source code
def get_proxy(
    self, key: str, default: Any = ENOVAL, tag: str = DEFAULT_TAG
) -> Proxy:
    """
    Returns a Proxy object for the given key, tag, default.
    """
    return Proxy(self, key, default, tag)
def memoize(self, expire: Union[ForwardRef(None), int, float, datetime.timedelta, datetime.datetime] = None, expire_if: Union[bool, Callable] = False, skip_saving_to_cache_if: Union[bool, Callable] = False)

A decorator to memoize the results of a function into the key-value store.

expire allows us to specify a ttl to expire this memoization on.

expire_if allows a bool or callable, if callable it will be called (and should return a bool). If it is True, we'll expire the cache and allow the func to be called. If it is False, the cache will be hit per the existing ttl.

skip_saving_to_cache_if allows a bool or callable, if callable it will be called with a single param of the value about to be returned (and should return a bool). This is only checked if the underlying function is otherwised called. If it is True, we will not save this value as memoized (so next call with hit the function again). If it is False, we will save the value to the cache as per normal.

Expand source code
def memoize(
    self,
    expire: ExpirationType = None,
    expire_if: Union[bool, Callable] = False,
    skip_saving_to_cache_if: Union[bool, Callable] = False,
):
    """
    A decorator to memoize the results of a function into the key-value store.

    expire allows us to specify a ttl to expire this memoization on.

    expire_if allows a bool or callable, if callable it will be called (and should return a bool).
        If it is True, we'll expire the cache and allow the func to be called.
        If it is False, the cache will be hit per the existing ttl.

    skip_saving_to_cache_if allows a bool or callable, if callable it will be called with a single param of the value about to be returned
        (and should return a bool). This is only checked if the underlying function is otherwised called.
        If it is True, we will not save this value as memoized (so next call with hit the function again).
        If it is False, we will save the value to the cache as per normal.
    """
    if callable(expire):
        # we've been called like:
        # @memoize
        # without () at the end
        func = expire
        expire = None
    else:
        func = None

    def inner(func):
        # Warning: we're truncating to fit the tag
        tag = f"memoize.{func.__module__}_{func.__qualname__}_{expire!s}"[
            :TAG_MAX_LENGTH
        ]

        def wrapper(*args, **kwargs):
            # Warning: we're truncating to fit the key
            key = f"{args!s}_{kwargs!s}"[:KEY_MAX_LENGTH]

            # if you overwrite inner.expire_if then the callable would only get evaluated once
            # ... so don't do that.
            expire_if = inner.expire_if

            if callable(expire_if):
                expire_if = expire_if()

            try_cache = not bool(expire_if)

            NO_RESULT = object()
            result = NO_RESULT
            if try_cache:
                try:
                    result = self.get(key, tag=tag)
                except KeyError:
                    pass
            else:
                log.debug(f"expire_if is forcing us to ignore cache: {key}")

            if result == NO_RESULT:
                result = func(*args, **kwargs)

                skip_saving_to_cache = False
                if callable(skip_saving_to_cache_if):
                    if skip_saving_to_cache_if(result):
                        skip_saving_to_cache = True
                elif skip_saving_to_cache_if:
                    skip_saving_to_cache = skip_saving_to_cache_if

                if skip_saving_to_cache:
                    log.debug(
                        f"skip_saving_to_cache_if is forcing us to not save to cache the value for key: {key}"
                    )
                else:
                    self.set(key, result, tag=tag, expire=expire)

            return result

        wrapper.cache_clear = lambda: self.delete_tag(tag)
        return wrapper

    inner.expire_if = expire_if

    if func:
        return inner(func)
    else:
        return inner
def pop(self, key: str, default: Any = <object object>, tag: str = '__default__') ‑> None

Pops the given key/tag combo from the store.

If the key/tag combo is not found (or expired), and a default is provided, the default value is returned. If no default is provided, a KeyError is raised.

Expand source code
def pop(self, key: str, default: Any = ENOVAL, tag: str = DEFAULT_TAG) -> None:
    """
    Pops the given key/tag combo from the store.

    If the key/tag combo is not found (or expired), and a default is provided, the
    default value is returned. If no default is provided, a KeyError is raised.
    """
    sentinel = object()
    value = self.get(key, sentinel, tag)

    if value is sentinel:
        if default is ENOVAL:
            raise KeyError(f"key: {key}, tag: {tag}")
        else:
            value = default

    self.delete(key, tag)

    return value
def session(self, commit: bool = True, delete_expired: bool = True) ‑> Iterable[sqlalchemy.orm.session.Session]

Contextmanager to obtain a temp session with the underlying database.

If commit is True, the session will be committed after the block. If delete_expired is True, any expired keys will be deleted before exiting the block. Note that delete_expired only applies if commit is True.

Expand source code
@contextlib.contextmanager
def session(
    self, commit: bool = True, delete_expired: bool = True
) -> Iterable[Session]:
    """
    Contextmanager to obtain a temp session with the underlying database.

    If commit is True, the session will be committed after the block.
    If delete_expired is True, any expired keys will be deleted before exiting the block.
        Note that delete_expired only applies if commit is True.
    """
    with self._session() as session:
        yield session

        if commit:
            # End the user's session via commit.
            session.commit()

            if delete_expired:
                # Now we're starting a new transaction to delete expired keys.
                session.query(KVStore).filter(
                    ~KVStore.non_expired_filter()
                ).delete()

                # commit the delete transaction
                session.commit()
def set(self, key: str, value: Any, tag: str = '__default__', expire: Union[ForwardRef(None), int, float, datetime.timedelta, datetime.datetime] = None) ‑> None

Sets the given key/tag combo to the value provided.

If expire is provided, it must be something that can be processed by the to_expire function in kvalchemy.time.

Expand source code
@retry_integrity_errors
def set(
    self,
    key: str,
    value: Any,
    tag: str = DEFAULT_TAG,
    expire: ExpirationType = None,
) -> None:
    """
    Sets the given key/tag combo to the value provided.

    If expire is provided, it must be something that can be processed by
    the to_expire function in kvalchemy.time.
    """
    with self.session() as session:
        session.merge(
            KVStore(key=key, value=value, tag=tag, expire=to_expire(expire))
        )