aalpy.utils.HelperFunctions

View Source
import string
from collections import defaultdict


def extend_set(list_to_extend: list, new_elements: list) -> list:
    """
    Helper function to extend a list while maintaining set property.
    They are stored as lists, so with this function set property is maintained.
    :return

    Returns:

        list of elements that were added to the set

    """
    set_repr = set(list_to_extend)
    added_elements = [s for s in new_elements if s not in set_repr]
    list_to_extend.extend(added_elements)
    return added_elements


def all_prefixes(li):
    """
    Returns all prefixes of a list.

    Args:
      li: list from which to compute all prefixes

    Returns:
      list of all prefixes

    """
    return [tuple(li[:i + 1]) for i in range(len(li))]


def all_suffixes(li):
    """
    Returns all suffixes of a list.

    Args:
      li: list from which to compute all suffixes

    Returns:
      list of all suffixes

    """
    return [tuple(li[len(li) - i - 1:]) for i in range(len(li))]


def profile_function(function: callable, sort_key='cumtime'):
    """

    Args:
      function: callable: 
      sort_key:  (Default value = 'cumtime')

    Returns:
        prints the profiling results
    """
    import cProfile
    pr = cProfile.Profile()
    pr.enable()
    function()
    pr.disable()
    pr.print_stats(sort=sort_key)


def random_string_generator(size=10, chars=string.ascii_lowercase + string.digits):
    """

    Args:

      size:  (Default value = 10)
      chars:  (Default value = string.ascii_lowercase + string.digits)

    Returns:

        a random string of length size
    """
    import random
    return ''.join(random.choice(chars) for _ in range(size))


def print_learning_info(info: dict):
    """
    Print learning statistics.
    """
    print('-----------------------------------')
    print('Learning Finished.')
    print('Learning Rounds:  {}'.format(info['learning_rounds']))
    print('Number of states: {}'.format(info['automaton_size']))
    print('Time (in seconds)')
    print('  Total                : {}'.format(info['total_time']))
    print('  Learning algorithm   : {}'.format(info['learning_time']))
    print('  Conformance checking : {}'.format(info['eq_oracle_time']))
    print('Learning Algorithm')
    print(' # Membership Queries  : {}'.format(info['queries_learning']))
    if 'cache_saved' in info.keys():
        print(' # MQ Saved by Caching : {}'.format(info['cache_saved']))
    print(' # Steps               : {}'.format(info['steps_learning']))
    print('Equivalence Query')
    print(' # Membership Queries  : {}'.format(info['queries_eq_oracle']))
    print(' # Steps               : {}'.format(info['steps_eq_oracle']))
    print('-----------------------------------')


def print_observation_table(ot, table_type):
    """
    Prints the whole observation table.

    Args:

        ot: observation table
        table_type: 'det', 'non-det', or 'stoc'

    """
    if table_type == 'det':
        s_set, extended_s, e_set, table = ot.S, ot.s_dot_a(), ot.E, ot.T
    elif table_type == 'non-det':
        s_set, extended_s, e_set = ot.S, ot.get_extended_S(), ot.E
        table = ot.sul.pta.get_table(s_set + extended_s, e_set)
    elif table_type == 'abstracted-non-det':
        s_set, extended_s, e_set, table = ot.S, ot.S_dot_A, ot.E, ot.T
    else:
        s_set, extended_s, e_set, table = ot.S, ot.get_extended_s(), ot.E, ot.T

    headers = [str(e) for e in e_set]
    s_rows = []
    extended_rows = []
    headers.insert(0, 'Prefixes / E set')
    for s in s_set:
        row = [str(s)]
        if table_type == 'det':
            row.extend(str(e) for e in table[s])
        else:
            row.extend(str(table[s][e]) for e in e_set)
        s_rows.append(row)
    for s in extended_s:
        row = [str(s)]
        if table_type == 'det':
            row.extend(str(e) for e in table[s])
        else:
            row.extend(str(table[s][e]) for e in e_set)
        extended_rows.append(row)

    table = [headers] + s_rows
    columns = defaultdict(int)
    for i in table + extended_rows:
        for index, el in enumerate(i):
            columns[index] = max(columns[index], len(el))

    row_len = 0
    for row in table:
        row = "|".join(element.ljust(columns[ind] + 1) for ind, element in enumerate(row))
        print("-" * len(row))
        row_len = len(row)
        print(row)
    print('=' * row_len)
    for row in extended_rows:
        row = "|".join(element.ljust(columns[ind] + 1) for ind, element in enumerate(row))
        print("-" * len(row))
        print(row)
    print('-' * row_len)


def is_suffix_of(suffix, trace) -> bool:
    """

    Args:
      suffix: target suffix
      trace: trace in question

    Returns:

        True if suffix is the suffix of trace.
    """
    if len(trace) < len(suffix):
        return False
    else:
        return trace[-len(suffix):] == suffix


def get_cex_prefixes(cex, automaton_type):
    """
    Returns all prefixes of the stochastic automaton.

    Args:
        cex: counterexample
        automaton_type: `mdp` or `smm`

    Returns:

        all prefixes of the counterexample based on the `automaton_type`
    """
    if automaton_type == 'mdp':
        return [tuple(cex[:i + 1]) for i in range(0, len(cex), 2)]
    return [tuple(cex[:i]) for i in range(0, len(cex) + 1, 2)]


def get_available_oracles_and_err_msg():
    from aalpy.oracles import RandomWalkEqOracle
    from aalpy.oracles import RandomWordEqOracle
    available_oracles = {RandomWalkEqOracle, RandomWordEqOracle}

    available_oracles_msg = 'Warning! Only Random Walk and Random Word oracles are supported for non-deterministic and ' \
                            'stochastic learning. If you have implemented the custom oracle, set the custom_oracle ' \
                            'flag to True. '

    return available_oracles, available_oracles_msg


def make_input_complete(automaton, missing_transition_go_to='self_loop'):
    """
    Makes the automaton input complete/enabled. If a input is not defined in a state, it will lead to the self loop.
    In case of Mealy Machines, Stochastic Mealy machines and ONFSM 'epsilon' is used as output.
    Self loop simply loops the transition back to state which was not input complete,
    whereas 'sink_state' leads all transitions to a newly added sink state. If transitions have output
    (Mealy machines and their derivatives), 'epsilon' is used as an output value. If a state has an output value,
    it is either False (in case of DFA) or 'sink_state' in case of Moore machines and its derivatives.

    Args:

        automaton: automaton that is potentially not input complete
        missing_transition_go_to: either 'self_loop' or 'sink_state'.

    Returns:

        an input complete automaton
    """
    from aalpy.base import DeterministicAutomaton
    from aalpy.automata import Dfa, MooreState, MealyMachine, Mdp, StochasticMealyMachine, Onfsm, \
        DfaState, MealyState, MooreMachine, OnfsmState, MdpState, StochasticMealyState

    assert missing_transition_go_to in {'self_loop', 'sink_state'}

    input_al = automaton.get_input_alphabet()

    if automaton.is_input_complete():
        return automaton

    sink_state = None
    sink_state_type_dict = {Dfa: DfaState(state_id='sink', is_accepting=False),
                            MooreMachine: MooreState(state_id='sink', output='sink_state'),
                            MealyMachine: MealyState(state_id='sink'),
                            Onfsm: OnfsmState(state_id='sink'),
                            Mdp: MdpState(state_id='sink', output='sink_state'),
                            StochasticMealyMachine: StochasticMealyState(state_id='sink')}

    if missing_transition_go_to == 'sink_state':
        sink_state = sink_state_type_dict[automaton.__class__]
        automaton.states.append(sink_state)

    for state in automaton.states:
        for i in input_al:
            if i not in state.transitions.keys():
                if missing_transition_go_to == 'self_loop':
                    if isinstance(automaton, DeterministicAutomaton):
                        state.transitions[i] = state
                        if isinstance(automaton, MealyMachine):
                            state.output_fun[i] = 'epsilon'
                    if isinstance(automaton, Onfsm):
                        state.transitions[i].append(('epsilon', state))
                    if isinstance(automaton, Mdp):
                        state.transitions[i].append((state, 1.))
                    if isinstance(automaton, StochasticMealyMachine):
                        state.transitions[i].append((state, 'epsilon', 1.))
                else:
                    if isinstance(automaton, Dfa):
                        state.transitions[i] = sink_state
                    if isinstance(automaton, MooreMachine):
                        state.transitions[i] = sink_state
                    if isinstance(automaton, MealyMachine):
                        state.transitions[i] = sink_state
                        state.output_fun[i] = 'epsilon'
                    if isinstance(automaton, Onfsm):
                        state.transitions[i].append(('epsilon', sink_state))
                    if isinstance(automaton, Mdp):
                        state.transitions[i].append((sink_state, 1.))
                    if isinstance(automaton, StochasticMealyMachine):
                        state.transitions[i].append((sink_state, 'epsilon', 1.))

    return automaton
#   def extend_set(list_to_extend: list, new_elements: list) -> list:
View Source
def extend_set(list_to_extend: list, new_elements: list) -> list:
    """
    Helper function to extend a list while maintaining set property.
    They are stored as lists, so with this function set property is maintained.
    :return

    Returns:

        list of elements that were added to the set

    """
    set_repr = set(list_to_extend)
    added_elements = [s for s in new_elements if s not in set_repr]
    list_to_extend.extend(added_elements)
    return added_elements

Helper function to extend a list while maintaining set property. They are stored as lists, so with this function set property is maintained. :return

Returns:

list of elements that were added to the set
#   def all_prefixes(li):
View Source
def all_prefixes(li):
    """
    Returns all prefixes of a list.

    Args:
      li: list from which to compute all prefixes

    Returns:
      list of all prefixes

    """
    return [tuple(li[:i + 1]) for i in range(len(li))]

Returns all prefixes of a list.

Args: li: list from which to compute all prefixes

Returns: list of all prefixes

#   def all_suffixes(li):
View Source
def all_suffixes(li):
    """
    Returns all suffixes of a list.

    Args:
      li: list from which to compute all suffixes

    Returns:
      list of all suffixes

    """
    return [tuple(li[len(li) - i - 1:]) for i in range(len(li))]

Returns all suffixes of a list.

Args: li: list from which to compute all suffixes

Returns: list of all suffixes

#   def profile_function(function: <built-in function callable>, sort_key='cumtime'):
View Source
def profile_function(function: callable, sort_key='cumtime'):
    """

    Args:
      function: callable: 
      sort_key:  (Default value = 'cumtime')

    Returns:
        prints the profiling results
    """
    import cProfile
    pr = cProfile.Profile()
    pr.enable()
    function()
    pr.disable()
    pr.print_stats(sort=sort_key)

Args: function: callable: sort_key: (Default value = 'cumtime')

Returns: prints the profiling results

#   def random_string_generator(size=10, chars='abcdefghijklmnopqrstuvwxyz0123456789'):
View Source
def random_string_generator(size=10, chars=string.ascii_lowercase + string.digits):
    """

    Args:

      size:  (Default value = 10)
      chars:  (Default value = string.ascii_lowercase + string.digits)

    Returns:

        a random string of length size
    """
    import random
    return ''.join(random.choice(chars) for _ in range(size))

Args:

size: (Default value = 10) chars: (Default value = string.ascii_lowercase + string.digits)

Returns:

a random string of length size
#   def is_suffix_of(suffix, trace) -> bool:
View Source
def is_suffix_of(suffix, trace) -> bool:
    """

    Args:
      suffix: target suffix
      trace: trace in question

    Returns:

        True if suffix is the suffix of trace.
    """
    if len(trace) < len(suffix):
        return False
    else:
        return trace[-len(suffix):] == suffix

Args: suffix: target suffix trace: trace in question

Returns:

True if suffix is the suffix of trace.
#   def get_cex_prefixes(cex, automaton_type):
View Source
def get_cex_prefixes(cex, automaton_type):
    """
    Returns all prefixes of the stochastic automaton.

    Args:
        cex: counterexample
        automaton_type: `mdp` or `smm`

    Returns:

        all prefixes of the counterexample based on the `automaton_type`
    """
    if automaton_type == 'mdp':
        return [tuple(cex[:i + 1]) for i in range(0, len(cex), 2)]
    return [tuple(cex[:i]) for i in range(0, len(cex) + 1, 2)]

Returns all prefixes of the stochastic automaton.

Args: cex: counterexample automaton_type: mdp or smm

Returns:

all prefixes of the counterexample based on the `automaton_type`
#   def get_available_oracles_and_err_msg():
View Source
def get_available_oracles_and_err_msg():
    from aalpy.oracles import RandomWalkEqOracle
    from aalpy.oracles import RandomWordEqOracle
    available_oracles = {RandomWalkEqOracle, RandomWordEqOracle}

    available_oracles_msg = 'Warning! Only Random Walk and Random Word oracles are supported for non-deterministic and ' \
                            'stochastic learning. If you have implemented the custom oracle, set the custom_oracle ' \
                            'flag to True. '

    return available_oracles, available_oracles_msg
#   def make_input_complete(automaton, missing_transition_go_to='self_loop'):
View Source
def make_input_complete(automaton, missing_transition_go_to='self_loop'):
    """
    Makes the automaton input complete/enabled. If a input is not defined in a state, it will lead to the self loop.
    In case of Mealy Machines, Stochastic Mealy machines and ONFSM 'epsilon' is used as output.
    Self loop simply loops the transition back to state which was not input complete,
    whereas 'sink_state' leads all transitions to a newly added sink state. If transitions have output
    (Mealy machines and their derivatives), 'epsilon' is used as an output value. If a state has an output value,
    it is either False (in case of DFA) or 'sink_state' in case of Moore machines and its derivatives.

    Args:

        automaton: automaton that is potentially not input complete
        missing_transition_go_to: either 'self_loop' or 'sink_state'.

    Returns:

        an input complete automaton
    """
    from aalpy.base import DeterministicAutomaton
    from aalpy.automata import Dfa, MooreState, MealyMachine, Mdp, StochasticMealyMachine, Onfsm, \
        DfaState, MealyState, MooreMachine, OnfsmState, MdpState, StochasticMealyState

    assert missing_transition_go_to in {'self_loop', 'sink_state'}

    input_al = automaton.get_input_alphabet()

    if automaton.is_input_complete():
        return automaton

    sink_state = None
    sink_state_type_dict = {Dfa: DfaState(state_id='sink', is_accepting=False),
                            MooreMachine: MooreState(state_id='sink', output='sink_state'),
                            MealyMachine: MealyState(state_id='sink'),
                            Onfsm: OnfsmState(state_id='sink'),
                            Mdp: MdpState(state_id='sink', output='sink_state'),
                            StochasticMealyMachine: StochasticMealyState(state_id='sink')}

    if missing_transition_go_to == 'sink_state':
        sink_state = sink_state_type_dict[automaton.__class__]
        automaton.states.append(sink_state)

    for state in automaton.states:
        for i in input_al:
            if i not in state.transitions.keys():
                if missing_transition_go_to == 'self_loop':
                    if isinstance(automaton, DeterministicAutomaton):
                        state.transitions[i] = state
                        if isinstance(automaton, MealyMachine):
                            state.output_fun[i] = 'epsilon'
                    if isinstance(automaton, Onfsm):
                        state.transitions[i].append(('epsilon', state))
                    if isinstance(automaton, Mdp):
                        state.transitions[i].append((state, 1.))
                    if isinstance(automaton, StochasticMealyMachine):
                        state.transitions[i].append((state, 'epsilon', 1.))
                else:
                    if isinstance(automaton, Dfa):
                        state.transitions[i] = sink_state
                    if isinstance(automaton, MooreMachine):
                        state.transitions[i] = sink_state
                    if isinstance(automaton, MealyMachine):
                        state.transitions[i] = sink_state
                        state.output_fun[i] = 'epsilon'
                    if isinstance(automaton, Onfsm):
                        state.transitions[i].append(('epsilon', sink_state))
                    if isinstance(automaton, Mdp):
                        state.transitions[i].append((sink_state, 1.))
                    if isinstance(automaton, StochasticMealyMachine):
                        state.transitions[i].append((sink_state, 'epsilon', 1.))

    return automaton

Makes the automaton input complete/enabled. If a input is not defined in a state, it will lead to the self loop. In case of Mealy Machines, Stochastic Mealy machines and ONFSM 'epsilon' is used as output. Self loop simply loops the transition back to state which was not input complete, whereas 'sink_state' leads all transitions to a newly added sink state. If transitions have output (Mealy machines and their derivatives), 'epsilon' is used as an output value. If a state has an output value, it is either False (in case of DFA) or 'sink_state' in case of Moore machines and its derivatives.

Args:

automaton: automaton that is potentially not input complete
missing_transition_go_to: either 'self_loop' or 'sink_state'.

Returns:

an input complete automaton