Source code for automaton

# -*- coding: utf-8 -*-
#
# Copyright 2015 Federico Ficarelli
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A minimal finite-state machine implementation.
"""

import re
from weakref import WeakKeyDictionary
from itertools import chain, product, filterfalse
from collections import namedtuple, Iterable

import tabulate as tabulator
import networkx as nx


###################################################
__author__ = 'Federico Ficarelli'
__copyright__ = 'Copyright (c) 2015 Federico Ficarelli'
__license__ = 'Apache License Version 2.0'
__version__ = '1.3.1'

VERSION = __version__
# bumpversion can only search for {current_version}
# so we have to parse the version here.
version_info_t = namedtuple('version_info_t', 'major minor patch')  # pylint: disable=invalid-name
VERSION_INFO = version_info_t(*(int(part) for part in re.match(r'(\d+)\.(\d+).(\d+)', __version__).groups()))
###################################################


__all__ = (
    "Event",
    "Automaton",
    "AutomatonError",
    "DefinitionError",
    "InvalidTransitionError",
    "transitiontable",
    "stategraph",
    "get_table",
)


[docs]class AutomatonError(Exception): """ Exception representing a generic error occurred in the automaton. """ pass
[docs]class DefinitionError(AutomatonError): """ Exception representing an error occurred during the automaton definition. """ pass
[docs]class InvalidTransitionError(AutomatonError): """ Exception representing an invalid transition. """ pass
EventBase = namedtuple("Event", ("source_states", "dest_state")) def unique_everseen(iterable, key=None): # pragma: no cover """List unique elements, preserving order. Remember all elements ever seen. >>> list(unique_everseen('AAAABBBCCDAABBB')) ['A', 'B', 'C', 'D'] >>> list(unique_everseen('ABBCcAD', str.lower)) ['A', 'B', 'C', 'D'] .. note:: Recipe taken from: https://docs.python.org/3.6/library/itertools.html """ seen = set() seen_add = seen.add if key is None: for element in filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element class _EventProxy(object): # pylint: disable=too-few-public-methods """ Proxy class to access :class:`~automaton.Event` attributes. Forwards every attribute access to the underlying proxied :class:`~automaton.Event`. .. important: Unlike :class:`~automaton.Event`, instances of this type are callables that trigger event transition in the proxied :class:`~automaton.Automaton` instance. """ def __init__(self, event, automaton): self.__event = event self.__automaton = automaton def __getattr__(self, name): return getattr(self.__event, name) def __call__(self): return self.__automaton.event(self.name)
[docs]class Event(EventBase): """ Class that represents a state transition. .. important:: This is a read-only data descriptor type. Parameters ---------- source_states : any The transition source state. dest_state : any The transition destination state. """ def __new__(cls, source_states, dest_state): # Fix source states: if isinstance(source_states, str) or not isinstance(source_states, Iterable): source_states = (source_states,) instance = super().__new__(cls, source_states, dest_state) instance._event_name = None # pylint: disable=protected-access instance._event_delegate = None # pylint: disable=protected-access return instance @property def name(self): """ The actual user-defined name of the event as an :class:`~automaton.Automaton` class member. If None, the Event isn't bounded to any specific :class:`~automaton.Automaton` subclass. """ return self._event_name
[docs] def edges(self, data=False): """ Provides all the single transition edges associated to this event. .. note:: Since an event can have multiple source states, in graph terms it represents a set of state-to-state edges. Parameters ---------- data : bool, optional If set, data associated to the corresponding edge will be added to the edge tuple. Defaults to `False`. Yields ------ (any, any) or (any, any, dict) All the `(source, dest)` tuples representing the graph edges associated to the event. If `data` parameter is set, each tuple will be appended with the `dict` containing edge data (by default the `'event'` key containing the event name). """ components = [self.source_states, (self.dest_state, )] if data: components.append((dict(event=self.name), )) yield from product(*components)
def _bind(self, name): """ Binds the :class:`~automaton.Event` instance to a particular event name. .. warning:: This method is intended to be called by the automaton definition mechanics. Parameters ---------- name : str The event name to be bounded. """ self._event_name = name def __get__(self, instance, owner): """ Enables the descriptor semantics on :class:`~automaton.Event` instances. """ if instance is None: return self else: if not hasattr(self, '__bound_instances'): # pylint: disable=attribute-defined-outside-init self.__bound_instances = WeakKeyDictionary() if instance not in self.__bound_instances: self.__bound_instances[instance] = _EventProxy(event=self, automaton=instance) return self.__bound_instances[instance] def __set__(self, instance, value): """ Enables the descriptor semantics on :class:`~automaton.Event` instances. Raises ------ AttributeError To enforce read-only data descriptor semantics. """ raise AttributeError("Can't set attribute")
class AutomatonMeta(type): """ The metaclass that must be applied on the ``automaton``-enabled classes' hierarchy. The :meth:`~automaton.AutomatonMeta.__new__` method builds the actual finite-state machine based on the user-provided events definition. Raises ------ DefinitionError If the states graph isn't connected (that is leads to unreachable states). """ def __new__(mcs, class_name, class_bases, class_dict): cls = super().__new__(mcs, class_name, class_bases, class_dict) events = {} states = set() for attr in dir(cls): value = getattr(cls, attr) if isinstance(value, Event): # Important: bind event to its name as a class member value._bind(attr) # pylint: disable=protected-access # Collect states states.update(value[0]) # Iterable of sources states, let's update the set states.add(value[1]) # Collect events events[attr] = value if len(states) != 0: # if we are dealing with an automaton definition class... # 1. Setup class members: cls.__states__ = states cls.__events__ = events if cls.__default_initial_state__ is not None \ and cls.__default_initial_state__ not in cls.__states__: raise DefinitionError("Default initial state '{}' unknown.".format(cls.__default_initial_state__)) if cls.__default_accepting_states__ is not None: for state in cls.__default_accepting_states__: if state not in cls.__states__: raise DefinitionError("Default accepting state '{}' unknown.".format(state)) # 2. Build graph graph = nx.MultiDiGraph() graph.add_nodes_from(states) graph.add_edges_from(chain.from_iterable(list(event.edges(data=True)) for event in events.values())) # 3. Check states graph consistency: if not nx.is_weakly_connected(graph): components = list(nx.weakly_connected_component_subgraphs(graph)) raise DefinitionError( "The state graph contains {} connected components: {}".format( len(components), ", ".join("{}".format(c.nodes()) for c in components)) ) # 4. Save cls.__graph__ = nx.freeze(graph) # 5. Docstring substitution if cls.__doc__: cls.__doc__ = cls.__doc__.format(automaton=cls) return cls def __format__(cls, fmt): return __automaton_format__(cls, fmt)
[docs]class Automaton(metaclass=AutomatonMeta): """ Base class for automaton types. In order to define an automaton, this class must be the base for the provided machine definition: >>> class MyClass(Automaton): ... transition = Event("state_a", "state_b") Parameters ---------- initial_state : any, optional The initial state for this automaton instance. Defaults to None. Note that if automaton type has no default initial state (specified via :attr:`~automaton.__default_initial_state__`), this argument must be specified. initial_event : any, optional The initial event to be fired to deduce the initial state. Defaults to `None`. Please note that `initial_state` and and `initial_event` arguments *are mutually exclusive*, specifying both of them will raise a `TypeError`. .. note:: Since the *destination state* of an event is a *single state* and due to the fact that events are class attributes (so each event name is unique), *deducing the initial state through a initial event is a well defined operation*. accepting_states : iterable, optional The accepting states for this automaton instance. Defaults to None. Note that if automaton type has default accepting states (specified via :attr:`~automaton.__default_accepting_states__`), this argument takes precedence over that. Raises ------ DefinitionError The automaton type has no default initial state while no custom initial state specified during construction *or* the specified initial state is unknown. TypeError Both `initial_state` and `initial_event` arguments have been specified while they are mutually exclusive. """ __default_initial_state__ = None """any: The default initial state for the automaton type.""" __default_accepting_states__ = None """iterable: The default accepting states for the automaton type.""" def __init__(self, initial_state=None, initial_event=None, accepting_states=None): # # 0. Initial event setup # if initial_event: if initial_state: raise TypeError("'initial_state' and 'initial_event' parameters can't be specified together") if initial_event not in self.__events__: # pylint: disable=no-member raise InvalidTransitionError("Unrecognized event '{}'.".format(initial_event)) initial_state = getattr(self, initial_event).dest_state # # 1. Initial state setup # if initial_state is None: if self.__default_initial_state__ is None: raise DefinitionError( "No default initial state defined for '{}', must be specified " "in costruction.".format(self.__class__) ) else: initial_state = self.__default_initial_state__ # Check initial state correctness if initial_state not in self.__states__: # pylint: disable=no-member raise DefinitionError("Initial state '{}' unknown.".format(initial_state)) # ...and finally set initial state. self._state = initial_state # # 2. Accepting states setup # if accepting_states is None: if self.__default_accepting_states__ is None: accepting_states = () else: accepting_states = self.__default_accepting_states__ for state in accepting_states: if state not in self.__states__: # pylint: disable=no-member raise DefinitionError("Accepting state '{}' unknown.".format(state)) # ...and finally set accepting states. self._accepting_states = frozenset(accepting_states) @property def state(self): """ Gets the current state of the automaton instance. Returns ------- any The current state. """ return self._state @property def is_accepted(self): """ Gets the current automaton's acceptance state. Returns ------- bool True if the current state is an accepting state, False otherwise. """ return self._state in self._accepting_states
[docs] def event(self, event_name): """ Signals the occurrence of an event and evolves the automaton to the destination state. Parameters ---------- do_event : any The event to be performed on the automaton. Raises ------ InvalidTransitionError The specified event is unknown. """ if event_name not in self.__events__: # pylint: disable=no-member raise InvalidTransitionError("Unrecognized event '{}'.".format(event_name)) transition = self.__events__[event_name] # pylint: disable=no-member if self._state not in transition.source_states: raise InvalidTransitionError( "The specified event '{}' is invalid in current state '{}'.".format(transition, self._state)) self._state = transition.dest_state
@classmethod def _get_cut(cls, *states, inbound=True): """ Retrieves all the events that form a cut for the specified subgraphs. Parameters ---------- states : tuple(any) The states subset. inbound : bool If set, the inbound events will be returned, outbound otherwise. Defaults to `True`. Raises ------ KeyError When an unknown state is found while iterating. Yields ------ any All the inbound or outbound events. """ unknown = set(states) - cls.__states__ # pylint: disable=no-member if unknown: raise KeyError("Unknown states: {}".format(unknown)) edges_getter = \ cls.__graph__.in_edges if inbound else cls.__graph__.out_edges # pylint: disable=no-member yield from unique_everseen( edge[2]['event'] for edge in edges_getter(states, data=True) )
[docs] @classmethod def states(cls): """ Gives the automaton state set. Yields ------ any The iterator over the state set. """ yield from cls.__states__ # pylint: disable=no-member
[docs] @classmethod def events(cls): """ Gives the automaton events set. Yields ------ any The iterator over the events set. """ yield from cls.__events__ # pylint: disable=no-member
[docs] @classmethod def in_events(cls, *states): """ Retrieves all the inbound events entering the specified states with no duplicates. Parameters ---------- states : tuple(any) The states subset. Raises ------ KeyError When an unknown state is found while iterating. Yields ------ any The events entering the specified states. """ yield from cls._get_cut(*states, inbound=True)
[docs] @classmethod def out_events(cls, *states): """ Retrieves all the outbound events leaving the specified states with no duplicates. Raises ------ KeyError When an unknown state is found while iterating. Parameters ---------- states : tuple(any) The states subset. Yields ------ any The events that exit the specified states. """ yield from cls._get_cut(*states, inbound=False)
[docs] @classmethod def get_default_initial_state(cls): """ Gives the automaton default initial state. Returns ------- any The automaton default initial state. """ return cls.__default_initial_state__
def __str__(self): return '<{}@{}>'.format(self.__class__.__name__, self.state) def __format__(self, fmt): return __automaton_format__(self, fmt)
######################################################################### # Rendering stuff, everything beyond this point is formatting for humans. def __automaton_format__(automaton, fmt): """ Support for custom format specifiers. Parameters ---------- automaton : :class:`~automaton.Automaton` The automaton to be formatted. It can be both an instance or a class. fmt : str The format specifier. Returns ------- str The string representation according to the rquested format specifier. """ cls = automaton.__class__ if isinstance(automaton, Automaton) else automaton if fmt in stategraph.SUPPORTED_FORMATS: return stategraph(cls, fmt=fmt) elif fmt in transitiontable.SUPPORTED_FORMATS: return transitiontable(cls, fmt=fmt) else: return str(automaton)
[docs]def get_table(automaton, traversal=None): """ Build the transition table of the given automaton. Parameters ---------- automaton : :class:`~automaton.Automaton` The automaton to be rendered. It can be both a class and an instance. traversal : callable(graph), optional An optional callable used to yield the edges of the graph representation of the automaton. It must accept a `networkx.MultiDiGraph` as the first positional argument and yield one edge at a time as a tuple in the form ``(source_state, destination_state)``. The default traversal sorts the states in ascending order by inbound grade (number of incoming events). Yields ------ tuple(source, dest, event) Yields one row at a time as a tuple containing the source and destination node of the edge and the name of the event associated with the edge. """ graph = automaton.__graph__ if not traversal: traversal = lambda G: sorted(G.edges(), key=lambda e: len(G.in_edges(e[0]))) # Retrieve event names since networkx traversal # functions lack data retrieval events = nx.get_edge_attributes(graph, 'event') # -> (source, dest, key==0): event # Build raw data table to be rendered for source, dest in traversal(graph): yield (source, dest, events[source, dest, 0])
[docs]def stategraph(automaton, fmt=None, traversal=None): # pylint: disable=unused-argument """ Render an automaton's state-transition graph. A simple example will be formatted as follows: >>> class TrafficLight(Automaton): ... go = Event('red', 'green') ... slowdown = Event('green', 'yellow') ... stop = Event('yellow', 'red') >>> print( plantuml(TrafficLight) ) # doctest: +SKIP :: @startuml green --> yellow : slowdown yellow --> red : stop red --> green : go @enduml Parameters ---------- automaton : :class:`~automaton.Automaton` The automaton to be rendered. It can be both a class and an instance. fmt : str, optional Specifies the output format for the graph. Currently, the only supported format is `PlantUML <http://plantuml.com/state-diagram>`_. Defaults to ``plantuml``, unknown format specifiers are ignored. traversal : callable(graph), optional An optional callable used to sort the events. It has the same meaning as the ``traversal`` parameter of `automaton.transition_table`. Returns ------- str Returns the formatted state graph. """ graph = automaton.__graph__ source_nodes = filter(lambda n: not graph.in_edges(n), graph.nodes()) sink_nodes = filter(lambda n: not graph.out_edges(n), graph.nodes()) sources = [('[*]', node) for node in source_nodes] sinks = [(node, '[*]') for node in sink_nodes] table = get_table(automaton, traversal=traversal) return """@startuml {} {} {} @enduml""".format('\n'.join([' {} --> {}'.format(*row) for row in sources]), '\n'.join([' {} --> {} : {}'.format(*row) for row in table]), '\n'.join([' {} --> {}'.format(*row) for row in sinks]))
stategraph.SUPPORTED_FORMATS = frozenset([ 'plantuml', ])
[docs]def transitiontable(automaton, header=None, fmt=None, traversal=None): """ Render an automaton's transition table in text format. The transition table has three columns: source node, destination node and the name of the event. A simple example will be formatted as follows: >>> class TrafficLight(Automaton): ... go = Event('red', 'green') ... slowdown = Event('green', 'yellow') ... stop = Event('yellow', 'red') >>> tabulate(TrafficLight) # doctest: +SKIP :: ======== ====== ======== Source Dest Event ======== ====== ======== green yellow slowdown yellow red stop red green go ======== ====== ======== Parameters ---------- automaton : :class:`~automaton.Automaton` The automaton to be rendered. It can be both a class and an instance. header : list[str, str, str] An optional list of fields to be used as table headers. Defaults to a predefined header. fmt str, optional Specifies the output format for the table. All formats supported by `tabulate <https://pypi.python.org/pypi/tabulate>`_ package are supported (e.g.: ``rst`` for reStructuredText, ``pipe`` for Markdown). Defaults to ``rst``. traversal : callable(graph), optional An optional callable used to sort the events. It has the same meaning as the ``traversal`` parameter of `automaton.transition_table`. Returns ------- str Returns the formatted transition table. """ header = header or ['Source', 'Dest', 'Event'] fmt = fmt or 'rst' table = get_table(automaton=automaton, traversal=traversal) return tabulator.tabulate(table, header, tablefmt=fmt)
transitiontable.SUPPORTED_FORMATS = frozenset([ 'plain', 'simple', 'grid', 'fancy_grid', 'pipe', 'orgtbl', 'jira', 'psql', 'rst', 'mediawiki', 'moinmoin', 'html', 'latex', 'latex_booktabs', 'textile' ])