Source code for larch.reactive.pcore

"""
This module contains the core functionality of larch.reactive.
"""
import sys
import warnings
import logging
import operator
from bisect import insort_left
from collections import deque
from functools import reduce, update_wrapper
from weakref import ref

pointer_itemgetter = operator.itemgetter
pointer_attrgetter = operator.attrgetter


# __pragma__ ('ecom')
"""?
class LocalContext:
    pass


class GreenletExit(BaseException):
    pass
?"""

# __pragma__ ('skip')
pointer_attrgetter = type(pointer_attrgetter(""))  # for pypy
try:
    from gevent import GreenletExit
    from gevent.local import local as LocalContext

except ImportError:  # pragma: no cover
    class GreenletExit(BaseException):
        pass

    from threading import local as LocalContext

__all__ = ("rcontext", "CellBase", "ReactiveState", "Subject", "Container",
           "ResetContainer", "Rule", "IterRule", "pointer_attrgetter",
           "pointer_itemgetter", "pointer_resolve", "ReactiveWarning")
# __pragma__ ('noskip')

logger = logging.getLogger("larch.reactive")


def pointer_resolve(root, accessors):
    return reduce(lambda x, y: y(x), accessors, root)


class DecoratorContext:
    """
    A base class for objects that can be used
    as decorators and as contexts.
    """
    __slots__ = ()

    # __pragma__("kwargs")
    def call(self, func, *args, **kwargs):
        with self:
            result = func(*args, **kwargs)
        return result  # transcript bug
    # __pragma__("nokwargs")

    def __call__(self, func):
        # __pragma__("kwargs")
        def handler(*args, **kwargs):
            return self.call(func, *args, **kwargs)
        # __pragma__("nokwargs")

        result = update_wrapper(handler, func)
        result.decorated = func
        return result


class AtomicDecoratorContext(DecoratorContext):
    """
    The atomic decorator context. Rules will be executed at the end of
    the context.
    """

    def __enter__(self):
        rcontext._atomic_start()

    def __exit__(self, exc_type, exc_val, exc_tb):
        rcontext._atomic_end()
        return False


class SilentDecoratorContext(DecoratorContext):
    """
    The silent decorator context. Within this context a change of a cell
    will not trigger any rules.
    """
    __slots__ = ("old_notify", )

    def __enter__(self):
        # the atomic context ensures that silent will not influence
        # other greenlets
        rcontext._atomic_start()
        self.old_notify = rcontext.notify
        rcontext.notify = rcontext._dumy_notify

    def __exit__(self, exc_type, exc_val, exc_tb):
        rcontext.notify = self.old_notify
        rcontext._atomic_end()
        return False


class UntouchedDecoratorContext(DecoratorContext):
    """
    The untouched decorator context. This context makes only sense within
    rules. Within this context no cell will be touched.
    """
    __slots__ = ("old_touch", )

    def __enter__(self):
        self.old_touch = rcontext.touch
        rcontext.touch = rcontext._dumy_touch

    def __exit__(self, exc_type, exc_val, exc_tb):
        rcontext.touch = self.old_touch
        return False


class TouchedDecoratorContext(DecoratorContext):
    """
    The touch decorator context. The context makes only sense within
    a untouched context. Within this context cells will be touched again.
    """
    __slots__ = ("old_touch", )

    def __enter__(self):
        self.old_touch = rcontext.touch
        rcontext.touch = rcontext._touch

    def __exit__(self, exc_type, exc_val, exc_tb):
        rcontext.touch = self.old_touch
        return False


class ReactiveWarning(UserWarning):
    pass


[docs] class ReactiveContext(LocalContext): """ A ReactiveContext is the core driver for reactive behaviour. It manages the subject - observer relation between cells and rules, and cares for the execution order of rules. Only one ReativeContext exists in a program that can be accessed by: >>> larch.reactive.rcontext """ def __init__(self): self.atomic_count = 0 self.atomic_start_round = 0 self._call_counter = 0 self.current_observer = None self.touch = self._dumy_touch self.notify = self._notify self.old_values = {} # the subject values before an atomic operation. self.observers = [] # all observers in the active atomic operation in their call order self.observer_count = {} # how often is one observer in the observers list self.callbacks = deque() # sequence of all callbacks to notify the end of notification process, # transformed from set to list to keep the callbacks order. self.atomic = AtomicDecoratorContext() @property def untouched(self): """UntouchedDectoratorContext: a new context.""" return UntouchedDecoratorContext() @property def touched(self): """TouchedDecoratorContext: a new context.""" return TouchedDecoratorContext() @property def silent(self): """SilentDecoratorContext: a new context.""" return SilentDecoratorContext() @property def rounds(self): """int: the count of atomic operations, since program start.""" return self.atomic_start_round @property def transaction_level(self): """int: count of recursive atomic operations.""" return self.atomic_count @property def inside_rule(self): """bool: `True` if the current frame is inside a rule.""" return self.current_observer @property def inside_touch(self): """bool: `True` if the current frame is inside a touch context.""" return self.touch == self._touch def _atomic_start(self): self.atomic_start_round = self._call_counter self.atomic_count += 1 def _atomic_end(self): if self.atomic_count != 1: self.atomic_count -= 1 return # self.atomic_count == 1 observers = self.observers observer_count = self.observer_count greenlet_exit = False # execute rules try: # a "for o in observers" loop is not possible # observer.notify() can change new cells and modify # the observers list # the following lines correctly run the testcases # test_renew_cell and test_one_run in test_reactive.py for i in range(10000): # avoid an infinite loop # __pragma__ ('tconv') if not observers: observer_count.clear() break # __pragma__ ('notconv') # observers are sorted in reverse order observer = observers.pop() key_observer = id(observer) if observer_count[key_observer] <= 1: del observer_count[key_observer] try: observer.notify() except GreenletExit: greenlet_exit = True logger.exception( "GreenletExit during observer notification", stack_info=True) else: # this observer occurs multiple times. Only call the # latest occurence observer_count[key_observer] -= 1 else: self.callbacks.clear() exp = RuntimeError("possible endless recursion {0}".format(observers)) del observers[:] observer_count.clear() raise exp finally: self.atomic_count = 0 self.active_greenlet = None self.old_values.clear() greenlet_exit = self._execute_callbacks(greenlet_exit) if greenlet_exit: raise GreenletExit() def _execute_callbacks(self, greenlet_exit): callbacks = self.callbacks self.callbacks = deque() for c in callbacks: try: c() except Exception as e: # __pragma__ ('skip') exc_type, exc_value, tb = sys.exc_info() from .exception import format_full_exception_tb tb = format_full_exception_tb() msg = 'Exception while executing callback: "{0}"\n{1}' warnings.warn(msg.format(exc_value, tb), ReactiveWarning) logger.exception("Exception while executing callback %r: %r", c, e, stack_info=True) # __pragma__ ('noskip') """? console.trace("Exception while executing callback", repr(e), repr(c), e.stack) ?""" except GreenletExit: greenlet_exit = True logger.exception("GreenletExit while executing callback %r", c, stack_info=True) return greenlet_exit def rule_call(self, rule, args, kwargs): old_touch = self.touch old_observer = self.current_observer self.current_observer = rule self.touch = self._touch try: rule.__call__(args, kwargs) except GreenletExit: raise except BaseException as e: # __pragma__ ('skip') exc_type, exc_value, tb = sys.exc_info() from .exception import format_full_exception_tb tb = format_full_exception_tb() msg = 'Exception while calling rule "{0}({1!r}, {2!r})": "{3!r}"\n{4}' warnings.warn(msg.format(rule, args, kwargs, exc_value, tb), ReactiveWarning) logger.exception("Exception while calling rule %r", e, stack_info=True) # __pragma__ ('noskip') """? console.trace("Exception while calling rule", repr(e), repr(rule), e.stack) ?""" finally: self.touch = old_touch self.current_observer = old_observer def _notify(self, subject, old_value): id_subject = id(subject) self.old_values.setdefault(id_subject, old_value) self.emit(subject.get_observers()) subject.clear_observers() def _dumy_notify(self, subject, old_value): pass def emit(self, observers): my_observers = self.observers my_observer_count = self.observer_count for o in observers: insort_left(my_observers, o) ko = id(o) my_observer_count[ko] = my_observer_count.get(ko, 0) + 1 def push_callback(self, callback): self.callbacks.append(callback) if self.transaction_level else callback() def shift_callback(self, callback): self.callbacks.appendleft(callback) if self.transaction_level else callback() def call_counter(self): self._call_counter += 1 return self._call_counter def _touch(self, subject): subject.add_observer(self.current_observer) return True def _dumy_touch(self, subject): return False
rcontext = ReactiveContext() class Observer: level = 0 __slots__ = ("last_call", ) def notify(self, initial=False): pass # pragma: no cover def __lt__(self, other): # observers are sorted in reverse order! return (self.level > other.level or self.level == other.level and self.last_call > other.last_call) def __eq__(self, other): return self.level == other.level and self.last_call == other.last_call class IterRule(Observer): """A Rule represents a `rule` instance method of a reactive class""" __slots__ = ("level", "method", "holder", "__weakref__") def __init__(self, holder, method, level, last_call): self.holder = holder self.method = method self.level = level self.last_call = last_call def notify(self, args=(), kwargs={}): self.last_call = rcontext.call_counter() rcontext.rule_call(self, args, kwargs) def __repr__(self): if self.holder() is None: return (f"<Rule obsolete {self.method.__name__} " f"({self.level}, {self.last_call})>") return (f"<Rule {self.method.__name__}({self.level}, " f"{self.last_call}) of {self.holder()!r}>") def __call__(self, args, kwargs): holder = self.holder() if holder is not None: iterator = self.method(holder, *args, **kwargs) def call_iterator(): try: next(iterator) rcontext.push_callback(call_iterator) except StopIteration: pass except RuntimeError as e: if e.args != ('generator raised StopIteration',): raise call_iterator() # __: skip """? if iterator: call_iterator() ?""" # __pragma__ ('skip') class Rule(IterRule): def __call__(self, args, kwargs): holder = self.holder() if holder is not None: try: self.method(holder, *args, **kwargs) except StopIteration: pass # __pragma__ ('noskip') class Subject: """ An observer's subject """ __slots__ = ("_observers", "_last_counter") def __init__(self): self._observers = {} """ A set of observers that will be called when the subjects value changes. """ self._last_counter = 0 """ The variable is used to check if a observer still depends on this subject if observer.last_call > self._last_counter ==> the observer does not depend anymore. (see TestReactive.test_decouple of test_reactive.py) """ def touch(self): rcontext.touch(self) def changed(self, old_value=None): # __pragma__ ('tconv') if self._observers: rcontext.atomic.call(rcontext.notify, self, old_value) # __pragma__ ('notconv') def take(self, other): """takes over the attributes of another subject""" self._observers.update(other._observers) self._last_counter = max(self._last_counter, other._last_counter) def get_observers(self): """Returns the subject's observers""" return self.filter_observers(self._observers.values()) def filter_observers(self, observers): last_counter = self._last_counter observers = (o() for o in observers) return [o for o in observers if o is not None and o.last_call <= last_counter] def add_observer(self, observer): if observer: self._last_counter = max(observer.last_call, self._last_counter) self._observers[id(observer)] = ref(observer) def clear_observers(self): self._observers = {} def _cell_to_container(reactive): return [c.create_container(reactive) for c in reactive.__reactive_cells__.values()] class _NOVAL: pass def _state_from_dict(state, reactive, dict_): for c in reactive.__reactive_cells__.values(): val = dict_.get(c.name, _NOVAL) if val is not _NOVAL: state[c.index]._value = val def _state_as_dict(state, reactive): result = {} for c in reactive.__reactive_cells__.values(): result[c.name] = state[c.index]._value return result # __pragma__ ('skip') class ReactiveState(list): """The state of a reactive objects. Contains all cell container.""" def __init__(self, reactive): self[:] = _cell_to_container(reactive) def from_dict(self, reactive, dict_): _state_from_dict(self, reactive, dict_) def as_dict(self, reactive): return _state_as_dict(self, reactive) # __pragma__ ('noskip') """? __pragma__("js", "{}", ''' var ReactiveState_ = function(reactive) { this.push.apply(this, _cell_to_container(reactive)); return this } ReactiveState_.prototype = Object.create(Array.prototype); ReactiveState_.prototype.from_dict = function(reactive, dict_) { _state_from_dict(this, reactive, dict_); } ReactiveState_.prototype.as_dict = function(reactive) { return _state_as_dict(this, reactive); } export var ReactiveState = function(reactive) { return new ReactiveState_(reactive); } ReactiveState_.prototype.__class__ = ReactiveState; deque.__name__ = 'ReactiveState'; deque.__bases__ = [list]; ''') ?"""
[docs] class Container(Subject): """A cell container that stores a cell's value.""" __slots__ = ("_value", ) def __init__(self, value): super().__init__() self._value = value # see ccore.pyx Container.__dealloc__ rcontext.old_values.pop(id(self), None)
[docs] def set_value(self, value): """Sets the containers value.""" old_value = self._value self._value = value # __pragma__ ("opov") if old_value != value or type(old_value) != type(value): # __pragma__ ("noopov") self.changed(old_value) return True return False
[docs] def get_value(self): """Returns the container's value""" self.touch() return self._value
def __repr__(self): return "<{0}({1:x}): {2!r}>".format(self.__class__.__name__, id(self), self._value)
class ResetContainer(Container): def __init__(self, value): super().__init__(value) self._reset_value = value def set_value(self, value): old_value = self._value self._value = value if old_value != value or type(old_value) != type(value): def reset(): self._value = self._reset_value with rcontext.atomic: self.changed(self._reset_value) rcontext.shift_callback(reset) return True return False class CellBase: """A descriptor to assign cell containers to objects.""" __slots__ = ("name", "index") __container__ = Container # factory def __init__(self): self.name = None self.index = -1 def __init_cell__(self, name, index): self.name = name self.index = index def __get__(self, holder, owner): if holder is None: return self return holder.__reactive_state__[self.index].get_value() def __set__(self, holder, value): holder.__reactive_state__[self.index].set_value(value) def get_container(self, holder): return holder.__reactive_state__[self.index] def set_container(self, holder, value): # if we replace the container we have to keep the rules value.take(self.get_container(holder)) holder.__reactive_state__[self.index] = value # __pragma__ ('noecom')