Source code for larch.reactive.pointer

"""
Provides a pointer to an attribute
"""
import sys
import operator
from weakref import ref
# __pragma__ ('skip')
from .core import rcontext, pointer_attrgetter, pointer_itemgetter, pointer_resolve
__all__ = ("PointerBase", "Pointer", "PointerMap", "PointerState", "PointerExpression", "SELF",
           "ResolveError", "merge_pointers", "NOTHING")
# __pragma__ ('noskip')
# __pragma__ ('ecom')
"""?
from .pcore import rcontext, pointer_attrgetter, pointer_itemgetter, pointer_resolve
?"""
# __pragma__ ('noecom')


def _check_exception(exc, pointer_state, exception_type, to_transfrom):
    # __pragma__ ('skip')
    type_, _, traceback = sys.exc_info()
    if isinstance(exc, to_transfrom):
        raise exception_type(repr(pointer_state), exc).with_traceback(traceback)

    exc.args += (repr(pointer_state),)
    raise exc.with_traceback(traceback)
    # __pragma__ ('noskip')
    """?
    console.error("Exception in pointer", repr(exc))
    if isinstance(exc, to_transfrom):
        raise exception_type()
    raise exc
    ?"""


def _check_setter_exception(exc, pointer_state, exception_type):
    _check_exception(exc, pointer_state, exception_type,
                     (AttributeError, IndexError, KeyError, ReferenceError))


def _check_getter_exception(exc, pointer_state, exception_type):
    _check_exception(exc, pointer_state, exception_type,
                     (AttributeError, TypeError, IndexError, KeyError, ReferenceError))


class MetaNothing(type):
    def __init__(self, name, bases, dict_):
        super().__init__(name, bases, dict_)

    def __bool__(self):
        return False


class NOTHING(metaclass=MetaNothing):
    pass


_RNOTHING = ref(NOTHING)


# __pragma__ ('opov')
class ResolveError(ValueError):
    def __init__(self, path, original):
        self.path = path
        self.original = original

    def __repr__(self):
        return f"<{self.__class__.__name__} path={self.path} original={repr(self.original)}>"


class StrongRef:
    __slots__ = ("ref",)

    def __init__(self, ref):
        self.ref = ref

    def __call__(self):
        return self.ref


def create_ref(obj):
    try:
        return ref(obj)
    except TypeError:
        return StrongRef(obj)


class PointerState:
    __slots__ = ("root", "path", "accessors")

    def __init__(self, root=_RNOTHING, path=(), accessors=()):
        # root must be a weak reference or a StrongRef
        self.root = root
        self.path = path
        self.accessors = accessors

    def __eq__(self, other):
        if other is not None and self.__class__ == other.__class__:
            return self.root == other.root and self.path == other.path
        return False

    def __ne__(self, other):
        return not (self == other)

    def __bool__(self):
        try:
            self.get()
            return False if self.root is _RNOTHING else True
        except ResolveError:
            return False

    def __len__(self):
        return len(self.accessors)

    def __hash__(self):
        return hash(self.path)

    def new_item(self, index):
        path = self.path + (index,)
        accessors = self.accessors + (pointer_itemgetter(index), )
        return self.create_state(self.root, path, accessors)

    def new_attr(self, name):
        path = self.path + (name,)
        accessors = self.accessors + (pointer_attrgetter(name), )
        return self.create_state(self.root, path, accessors)

    def create_state(self, root, path, accessors):
        return self.__class__(root, path, accessors)

    def get(self):
        return self.delegate_get(self.root())

    def delegate_get(self, root):
        try:
            return pointer_resolve(root, self.accessors)
        except Exception as e:
            _check_getter_exception(e, self, ResolveError)

    def set(self, value, root=None):
        parent, getter, setter = self.split(root)
        try:
            setter(parent, value)
        except Exception as e:
            _check_setter_exception(e, self, ResolveError)
        return value

    def split(self, root=None):
        def setitem(obj, value):
            obj[self.path[-1]] = value

        def setattr_(obj, value):
            setattr(obj, self.path[-1], value)
            return value

        if root is None:
            root = self.root()

        # __pragma__ ('tconv')
        if not self.accessors:
            def invalid_setter(obj, value):
                raise TypeError("cannot set")

            return root, (lambda i: i), invalid_setter
        # __pragma__ ('notconv')

        setter = setitem if type(self.accessors[-1]) is pointer_itemgetter else setattr_
        try:
            parent = pointer_resolve(root, self.accessors[:-1])
        except Exception as e:
            _check_getter_exception(e, self, ResolveError)

        return parent, self.accessors[-1], setter

    def is_same(self, other):
        """returns True if other and self refers to the same object"""
        me = self
        try:
            if len(me.path) < len(other.path):
                other = other.sub_state(len(other.path) - len(me.path))
            else:
                me = me.sub_state(len(me.path) - len(other.path))
        except ResolveError:  # pragma: no cover
            return False

        def tmt(a):
            return tuple(map(type, a))

        if isinstance(me.root, ref):
            if not isinstance(other.root, ref) or me.root() != other.root():
                return False
        elif me.root != other.root:
            return False

        return (me.path == other.path
                and tmt(me.accessors) == tmt(other.accessors))

    def sub_state(self, count):
        """returns a state resolved to count path elements"""
        if not count:
            return self

        access = self.accessors
        try:
            obj = pointer_resolve(self.root(), access[:count])
            return self.create_state(
                ref(obj), self.path[count:], access[count:])
        except Exception as e:  # pragma: no cover
            _check_getter_exception(e, self, ResolveError)

    def merge_to(self, state):
        return self.create_state(state.root, state.path + self.path,
                                 state.accessors + self.accessors)

    def __getstate__(self):
        return self.root(), self.path, self.accessor_to_pickle()

    def __setstate__(self, state):
        root, self.path, accessors = state
        self.root = _RNOTHING if root is NOTHING else create_ref(root)
        self.pickle_to_accessor(accessors)

    def accessor_to_pickle(self):
        mapper = {pointer_itemgetter: "i", pointer_attrgetter: "a"}
        return "".join(mapper[type(a)] for a in self.accessors)

    def pickle_to_accessor(self, state):
        mapper = {"i": pointer_itemgetter, "a": pointer_attrgetter}
        self.accessors = tuple(mapper[g](a) for g, a in zip(state, self.path))

    def __str__(self):
        try:
            return "".join(
                f"[{name}]" if type(accessor) == pointer_itemgetter else f".{name}"
                for accessor, name in zip(self.accessors, self.path))
        except Exception as e:  # pragma: no cover
            e.args += (self.path, )
            raise e

    def __repr__(self):
        return f"{repr(self.root())}{str(self)}"


class _DumyState:
    __slots__ = ("value",)

    def __init__(self, value):
        self.value = value

    def __eq__(self, other):
        return self.__class__ == other.__class__ and self.value == other.value

    def __repr__(self):
        return str(self.value)

    def get(self):
        return self.value

    def delegate_get(self, root):
        return self.value

    def merge_to(self, state):
        return self.value


def dumy_state(a):
    return a if isinstance(a, PointerState) else _DumyState(a)


class _ExpressionRoot:
    # root for PointerExpression
    __slots__ = ("args", )

    def __init__(self, args):
        self.args = tuple(a.__state__ if isinstance(a, Pointer) else dumy_state(a) for a in args)

    def __eq__(self, other):
        if other is not None and self.__class__ == other.__class__:
            return self.args == other.args
        return False

    def merge_args(self, state):
        return (a.merge_to(state) for a in self.args)

    def merge_to(self, state):
        return self.__class__(self.merge_args(state))

    def __getstate__(self):
        return self.args

    def __setstate__(self, state):
        self.args = state


class _ExpressionOperandRoot(_ExpressionRoot):
    __slots__ = ("operand", "kwargs")

    def __init__(self, operand, args, kwargs):
        super().__init__(args)
        self.operand = operand
        self.kwargs = {
            k: v.__state__ if isinstance(v, Pointer) else _DumyState(v)
            for k, v in kwargs.items()}

    def __eq__(self, other):
        if super().__eq__(other):
            return self.operand == other.operand
        return False

    def __call__(self):
        args = [a.get() for a in self.args]
        kwargs = {k: v.get() for k, v in self.kwargs.items()}
        return self.apply(args, kwargs)

    def evaluate(self, root):
        # like call but with different root in argument
        args = [a.delegate_get(root) for a in self.args]
        kwargs = {k: v.delegate_get(root) for k, v in self.kwargs.items()}
        return self.apply(args, kwargs)

    def apply(self, args, kwargs):
        return self.operand(*args, **kwargs)

    def __repr__(self):
        return f"{self.operand.__name__}{self.args}{self.kwargs}"

    def merge_kwargs(self, state):
        return {k: v.merge_to(state) for k, v in self.kwargs.items()}

    def merge_to(self, state):
        return self.__class__(self.operand, self.merge_args(state),
                              self.merge_kwargs(state))

    def __getstate__(self):
        return self.operand, self.args, self.kwargs

    def __setstate__(self, state):
        try:
            self.operand, self.args, self.kwargs = state
        except ValueError:
            self.operand, self.args = state
            self.kwargs = {}


class _ExpressionOperandRootApply(_ExpressionOperandRoot):
    def apply(self, args, kwargs):
        return self.operand(args)


class _ExpressionAndRoot(_ExpressionRoot):
    def __call__(self):
        return self.apply(lambda v: v.get())

    def evaluate(self, root):
        return self.apply(lambda v: v.delegate_get(root))

    def apply(self, getter):
        try:
            values = (getter(a) for a in self.args)
            v = False
            # __pragma__ ('tconv')
            for v in values:
                if not v:
                    return False
            # __pragma__ ('notconv')
            return v
        except Exception:
            return False

    def __repr__(self):
        return f"and{self.args}"


class _ExpressionOrRoot(_ExpressionRoot):
    def __call__(self):
        return self.apply(lambda v: v.get())

    def evaluate(self, root):
        return self.apply(lambda v: v.delegate_get(root))

    def apply(self, getter):
        def eval_(v):
            try:
                v = getter(v)
            except Exception:
                return False
            return v

        values = (eval_(a) for a in self.args)
        # __pragma__ ('tconv')
        for v in values:
            if v:
                return v
        # __pragma__ ('notconv')
        return v

    def __repr__(self):
        return f"or{self.args}"


[docs] class PointerExpression(PointerState): """A pointer expression""" def set(self, value, root=None): raise ValueError("Cannot set PointerExpression", self) def get(self): return super().delegate_get(self.root()) def delegate_get(self, root): return super().delegate_get(self.root.evaluate(root)) def merge_to(self, state): root = self.root.merge_to(state) return self.create_state(root, self.path, self.accessors) # __pragma__ ('kwargs') @classmethod def make(cls, pointer, operand, *args, **kwargs): return pointer.__class__(cls(_ExpressionOperandRoot(operand, args, kwargs))) # __pragma__ ('nokwargs') @classmethod def make_and(cls, pointer, *args): return pointer.__class__(cls(_ExpressionAndRoot(args))) @classmethod def make_or(cls, pointer, *args): return pointer.__class__(cls(_ExpressionOrRoot(args))) # __pragma__ ('kwargs')
[docs] @classmethod def call(cls, func, *args, **kwargs): """calls func with args""" return Pointer(cls(_ExpressionOperandRoot(func, args, kwargs)))
# __pragma__ ('nokwargs') @classmethod def apply(cls, func, *args): return Pointer(cls(_ExpressionOperandRootApply(func, args, {}))) def __getstate__(self): return self.root, self.path, self.accessor_to_pickle() def __setstate__(self, state): self.root, self.path, accessors = state self.pickle_to_accessor(accessors) def __repr__(self): return f"{repr(self.root)}{self}"
class PointerBase: __slots__ = ()
[docs] class Pointer(PointerBase): """ A placeholder to an object attributes and index chain (a generalization to attrgetter/itemgetter of module operator) to construct a pointer you call >>> pointer = Pointer(obj).a[0] to get a pointer's value you call >>> value = pointer() # same as obj.a[0] to set a pointer's value you call >>> pointer(value) # same as obj.a[0] = value """ __slots__ = ("__state__", "__weakref__") def __init__(self, state_or_obj=None): if state_or_obj is None: state_or_obj = PointerState() elif isinstance(state_or_obj, Pointer): state_or_obj = state_or_obj.__state__ elif not isinstance(state_or_obj, PointerState): state_or_obj = PointerState(create_ref(state_or_obj)) self.__state__ = state_or_obj def __get__(self, holder, owner): # delegator handler if holder is None: return self return self.__state__.delegate_get(holder) def __set__(self, holder, value): # delegator handler self.__state__.set(value, holder) def __bool__(self): return bool(self.__state__) def __getitem__(self, index): return self.__class__(self.__state__.new_item(index)) def __getattr__(self, name): return self.__class__(self.__state__.new_attr(name)) def __call__(self, value=NOTHING): return self.__state__.get() if value is NOTHING else self.__state__.set(value) def __hash__(self): return hash(self.__state__) def __eq__(self, other): if other is not None and self.__class__ == other.__class__: return self.__state__.is_same(other.__state__) return False def __ne__(self, other): return not self == other def __str__(self): with rcontext.untouched: return str(self.__state__) def __len__(self): return 0 def __iter__(self): raise TypeError("cannot iter %r" % self) def __repr__(self): return f"<{self.__class__.__name__}-{repr(self.__state__)}>" def __add__(self, other): return PointerExpression.make(self, operator.add, self, other) def __radd__(self, other): return PointerExpression.make(self, operator.add, other, self) def __sub__(self, other): return PointerExpression.make(self, operator.sub, self, other) def __rsub__(self, other): return PointerExpression.make(self, operator.sub, other, self) def __truediv__(self, other): return PointerExpression.make(self, operator.truediv, self, other) def __rtruediv__(self, other): return PointerExpression.make(self, operator.truediv, other, self) def __mul__(self, other): return PointerExpression.make(self, operator.mul, self, other) def __rmul__(self, other): return PointerExpression.make(self, operator.mul, other, self) def __and__(self, other): return PointerExpression.make_and(self, self, other) def __rand__(self, other): return PointerExpression.make_and(self, other, self) def __or__(self, other): return PointerExpression.make_or(self, self, other) def __ror__(self, other): return PointerExpression.make_or(self, other, self) def __reduce_ex__(self, protocol): return self.__class__, (self.__state__,)
SELF = Pointer() """This is actually not a Pointer but a trivial delegator""" class PointerMap: """A named value container which is able to handle pointer values""" __slots__ = ("__values", ) # __pragma__ ('kwargs') def __init__(self, **kwargs): self.__values = kwargs # __pragma__ ('nokwargs') def get(self, key, default=NOTHING): value = self.__values.get(key, default) if isinstance(value, PointerBase): return value() elif value is NOTHING: raise KeyError(key) return value def set(self, key, value): """Returns True if the key was a pointer""" val = self.__values.get(key, None) if isinstance(val, PointerBase) and not isinstance(value, PointerBase): val(value) return True self.__values[key] = value return False def remove(self, key): self.__values.pop(key, None) def update(self, dictionary): self.__values.update(dictionary) def keys(self): return self.__values.keys() def raw(self, key): return self.__values.get(key) def __contains__(self, key): return key in self.__values
[docs] def merge_pointers(*pointers): """ Merges several pointers together. Args: pointers: multiple pointers to merge Returns: Pointer: a chained pointer object. Example: >>> p1 = Pointer(obj1).a.b >>> p2 = Pointer(obj2).c.d >>> p3 = merge_pointers(p1, p2) >>> p3 == Pointer(obj1).a.b.c.d """ states = (p.__state__ for p in pointers) state = next(states) for s in states: state = s.merge_to(state) return Pointer(state)