aalpy.learning_algs.stochastic_passive.Alergia
View Source
import time from collections import defaultdict from bisect import insort from aalpy.automata import MarkovChain, MdpState, Mdp, McState, MooreMachine, MooreState, StochasticMealyState, \ StochasticMealyMachine from aalpy.learning_algs.stochastic_passive.CompatibilityChecker import HoeffdingCompatibility from aalpy.learning_algs.stochastic_passive.FPTA import create_fpta state_automaton_map = {'mc': (McState, MarkovChain), 'mdp': (MdpState, Mdp), 'moore': (MooreState, MooreMachine), 'smm': (StochasticMealyState, StochasticMealyMachine)} class Alergia: def __init__(self, data, automaton_type, eps=0.005, compatibility_checker=None, optimize_for='accuracy', print_info=False): assert eps == 'auto' or 0 < eps <= 2 assert optimize_for in {'memory', 'accuracy'} self.automaton_type = automaton_type self.print_info = print_info self.optimize_for = optimize_for if eps == 'auto': eps = 10 / sum(len(d) - 1 for d in data) # len - 1 to ignore initial output self.diff_checker = HoeffdingCompatibility(eps) if not compatibility_checker else compatibility_checker pta_start = time.time() self.immutableTreeRoot, self.mutableTreeRoot = create_fpta(data, automaton_type, optimize_for) pta_time = round(time.time() - pta_start, 2) if self.print_info: print(f'PTA Construction Time: {pta_time}') def compatibility_test(self, a, b): if self.automaton_type != 'smm' and a.output != b.output: return False if not a.children.values() or not b.children.values(): return True if self.automaton_type != 'moore' and not self.diff_checker.check_difference(a, b): return False for el in set(a.children.keys()).intersection(b.children.keys()): if not self.compatibility_test(a.children[el], b.children[el]): return False return True def merge(self, q_r, q_b): t_q_b = self.get_blue_node(q_b) b_prefix = q_b.getPrefix() to_update = self.mutableTreeRoot for p in b_prefix[:-1]: to_update = to_update.children[p] to_update.children[b_prefix[-1]] = q_r self.fold(q_r, q_b, t_q_b) def fold(self, red, blue, blue_tree_node): for i, c in blue.children.items(): if i in red.children.keys(): red.input_frequency[i] += blue_tree_node.input_frequency[i] self.fold(red.children[i], blue.children[i], self.get_blue_node(blue.children[i])) else: red.children[i] = blue.children[i] red.input_frequency[i] = blue_tree_node.input_frequency[i] def run(self): start_time = time.time() red = [self.mutableTreeRoot] # representative nodes and will be included in the final output model blue = self.mutableTreeRoot.successors() # intermediate successors scheduled for testing while blue: lex_min_blue = min(list(blue), key=lambda x: len(x.getPrefix())) merged = False for q_r in red: if self.compatibility_test(self.get_blue_node(q_r), self.get_blue_node(lex_min_blue)): self.merge(q_r, lex_min_blue) merged = True break if not merged: insort(red, lex_min_blue) blue.clear() for r in red: for s in r.successors(): if s not in red: blue.append(s) assert sorted(red, key=lambda x: len(x.getPrefix())) == red if self.automaton_type != 'moore': self.normalize(red) for i, r in enumerate(red): r.state_id = f'q{i}' if self.print_info: print(f'Alergia Learning Time: {round(time.time() - start_time, 2)}') print(f'Alergia Learned {len(red)} state automaton.') return self.to_automaton(red) def normalize(self, red): red_sorted = sorted(list(red), key=lambda x: len(x.getPrefix())) for r in red_sorted: r.children_prob = dict() # Initializing in here saves many unnecessary initializations if self.automaton_type == 'mc': total_output = sum(r.input_frequency.values()) for i in r.input_frequency.keys(): r.children_prob[i] = r.input_frequency[i] / total_output else: outputs_per_input = defaultdict(int) for io, freq in r.input_frequency.items(): outputs_per_input[io[0]] += freq for io in r.input_frequency.keys(): r.children_prob[io] = r.input_frequency[io] / outputs_per_input[io[0]] def get_blue_node(self, red_node): if self.optimize_for == 'memory': return red_node blue = self.immutableTreeRoot for p in red_node.getPrefix(): blue = blue.children[p] return blue def to_automaton(self, red): s_c = state_automaton_map[self.automaton_type][0] a_c = state_automaton_map[self.automaton_type][1] states = [] initial_state = None red_mdp_map = dict() for s in red: if self.automaton_type != 'smm': automaton_state = s_c(s.state_id, output=s.output) else: automaton_state = s_c(s.state_id) automaton_state.prefix = s.getPrefix() states.append(automaton_state) red_mdp_map[tuple(s.getPrefix())] = automaton_state red_mdp_map[automaton_state.state_id] = s if not s.getPrefix(): initial_state = automaton_state for s in states: red_eq = red_mdp_map[s.state_id] for io, c in red_eq.children.items(): destination = red_mdp_map[tuple(c.getPrefix())] i = io if self.automaton_type == 'mc' else io[0] if self.automaton_type == 'mdp': s.transitions[i].append((destination, red_eq.children_prob[io])) elif self.automaton_type == 'mc': s.transitions.append((destination, red_eq.children_prob[i])) elif self.automaton_type == 'smm': s.transitions[i].append((destination, io[1], red_eq.children_prob[io])) else: s.transitions[i] = destination return a_c(initial_state, states) def run_Alergia(data, automaton_type, eps=0.005, compatibility_checker=None, optimize_for='accuracy', print_info=False): """ Run Alergia or IOAlergia on provided data. Args: data: data either in a form [[I,I,I],[I,I,I],...] if learning Markov Chains or [[O,(I,O),(I,O)...], [O,(I,O_,...],..,] if learning MDPs (I represents input, O output). Note that in whole data first symbol of each entry should be the same (Initial output of the MDP/MC). eps: epsilon value if you are using default HoeffdingCompatibility. If it is set to 'auto' it will be computed as 10/(all steps in the data) automaton_type: either 'mdp' if you wish to learn an MDP, 'mc' if you want to learn Markov Chain, 'moore' if you want to learn Moore Machine (underlying structure is deterministic), or 'smm' if you want to learn stochastic Mealy machine optimize_for: either 'memory' or 'accuracy'. memory will use 50% less memory, but will be more inaccurate. compatibility_checker: impl. of class CompatibilityChecker, HoeffdingCompatibility with eps value by default (note: not interchangeable, depends on data) print_info: Returns: mdp or markov chain """ assert automaton_type in {'mdp', 'mc', 'moore', 'smm'} alergia = Alergia(data, eps=eps, automaton_type=automaton_type, optimize_for=optimize_for, compatibility_checker=compatibility_checker, print_info=print_info) model = alergia.run() del alergia.mutableTreeRoot, alergia.immutableTreeRoot, alergia return model def run_JAlergia(path_to_data_file, automaton_type, path_to_jAlergia_jar, eps=0.005, heap_memory='-Xmx2048M', optimize_for='accuracy'): """ Run Alergia or IOAlergia on provided data. Args: path_to_data_file: either a data in a list of lists or a path to file containing data. Form [[I,I,I],[I,I,I],...] if learning Markov Chains or [[O,I,O,I,O...], [O,(I,O_,...],..,] if learning MDPs (I represents input, O output). Note that in whole data first symbol of each entry should be the same (Initial output of the MDP/MC). eps: epsilon value heap_memory: java heap memory flag, increase if heap is full optimize_for: either 'memory' or 'accuracy'. memory will use 50% less memory, but will be more inaccurate. automaton_type: either 'mdp' if you wish to learn an MDP, 'mc' if you want to learn Markov Chain, or 'smm' if you want to learn stochastic Mealy machine Returns: learnedModel """ # TODO rename path_to_data_file to data in next versions of AALpy after 20. may assert automaton_type in {'mdp', 'smm', 'mc'} assert optimize_for in {'memory', 'accuracy'} import os import subprocess from aalpy.utils.FileHandler import load_automaton_from_file save_file = "jAlergiaModel.dot" delete_tmp_file = False if os.path.exists(save_file): os.remove(save_file) if os.path.exists(path_to_jAlergia_jar): path_to_jAlergia_jar = os.path.abspath(path_to_jAlergia_jar) else: print(f'JAlergia jar not found at {path_to_jAlergia_jar}.') return if isinstance(path_to_data_file, str): if os.path.exists(path_to_data_file): abs_path = os.path.abspath(path_to_data_file) else: print('Input file not found.') return else: if not isinstance(path_to_data_file, (list, tuple)): print('Data should be either a list of sequences or a path to the data file.') with open('jAlergiaInputs.txt', 'w') as f: for seq in path_to_data_file: f.write(','.join([str(s) for s in seq])) delete_tmp_file = True abs_path = os.path.abspath('jAlergiaInputs.txt') optimize_for = optimize_for[:3] subprocess.call(['java', heap_memory, '-jar', path_to_jAlergia_jar, '-input', abs_path, '-eps', str(eps), '-type', automaton_type, '-optim', optimize_for]) if not os.path.exists(save_file): print("JAlergia error occurred.") return model = load_automaton_from_file(save_file, automaton_type=automaton_type) os.remove(save_file) if delete_tmp_file: os.remove('jAlergiaInputs.txt') return model
View Source
class Alergia: def __init__(self, data, automaton_type, eps=0.005, compatibility_checker=None, optimize_for='accuracy', print_info=False): assert eps == 'auto' or 0 < eps <= 2 assert optimize_for in {'memory', 'accuracy'} self.automaton_type = automaton_type self.print_info = print_info self.optimize_for = optimize_for if eps == 'auto': eps = 10 / sum(len(d) - 1 for d in data) # len - 1 to ignore initial output self.diff_checker = HoeffdingCompatibility(eps) if not compatibility_checker else compatibility_checker pta_start = time.time() self.immutableTreeRoot, self.mutableTreeRoot = create_fpta(data, automaton_type, optimize_for) pta_time = round(time.time() - pta_start, 2) if self.print_info: print(f'PTA Construction Time: {pta_time}') def compatibility_test(self, a, b): if self.automaton_type != 'smm' and a.output != b.output: return False if not a.children.values() or not b.children.values(): return True if self.automaton_type != 'moore' and not self.diff_checker.check_difference(a, b): return False for el in set(a.children.keys()).intersection(b.children.keys()): if not self.compatibility_test(a.children[el], b.children[el]): return False return True def merge(self, q_r, q_b): t_q_b = self.get_blue_node(q_b) b_prefix = q_b.getPrefix() to_update = self.mutableTreeRoot for p in b_prefix[:-1]: to_update = to_update.children[p] to_update.children[b_prefix[-1]] = q_r self.fold(q_r, q_b, t_q_b) def fold(self, red, blue, blue_tree_node): for i, c in blue.children.items(): if i in red.children.keys(): red.input_frequency[i] += blue_tree_node.input_frequency[i] self.fold(red.children[i], blue.children[i], self.get_blue_node(blue.children[i])) else: red.children[i] = blue.children[i] red.input_frequency[i] = blue_tree_node.input_frequency[i] def run(self): start_time = time.time() red = [self.mutableTreeRoot] # representative nodes and will be included in the final output model blue = self.mutableTreeRoot.successors() # intermediate successors scheduled for testing while blue: lex_min_blue = min(list(blue), key=lambda x: len(x.getPrefix())) merged = False for q_r in red: if self.compatibility_test(self.get_blue_node(q_r), self.get_blue_node(lex_min_blue)): self.merge(q_r, lex_min_blue) merged = True break if not merged: insort(red, lex_min_blue) blue.clear() for r in red: for s in r.successors(): if s not in red: blue.append(s) assert sorted(red, key=lambda x: len(x.getPrefix())) == red if self.automaton_type != 'moore': self.normalize(red) for i, r in enumerate(red): r.state_id = f'q{i}' if self.print_info: print(f'Alergia Learning Time: {round(time.time() - start_time, 2)}') print(f'Alergia Learned {len(red)} state automaton.') return self.to_automaton(red) def normalize(self, red): red_sorted = sorted(list(red), key=lambda x: len(x.getPrefix())) for r in red_sorted: r.children_prob = dict() # Initializing in here saves many unnecessary initializations if self.automaton_type == 'mc': total_output = sum(r.input_frequency.values()) for i in r.input_frequency.keys(): r.children_prob[i] = r.input_frequency[i] / total_output else: outputs_per_input = defaultdict(int) for io, freq in r.input_frequency.items(): outputs_per_input[io[0]] += freq for io in r.input_frequency.keys(): r.children_prob[io] = r.input_frequency[io] / outputs_per_input[io[0]] def get_blue_node(self, red_node): if self.optimize_for == 'memory': return red_node blue = self.immutableTreeRoot for p in red_node.getPrefix(): blue = blue.children[p] return blue def to_automaton(self, red): s_c = state_automaton_map[self.automaton_type][0] a_c = state_automaton_map[self.automaton_type][1] states = [] initial_state = None red_mdp_map = dict() for s in red: if self.automaton_type != 'smm': automaton_state = s_c(s.state_id, output=s.output) else: automaton_state = s_c(s.state_id) automaton_state.prefix = s.getPrefix() states.append(automaton_state) red_mdp_map[tuple(s.getPrefix())] = automaton_state red_mdp_map[automaton_state.state_id] = s if not s.getPrefix(): initial_state = automaton_state for s in states: red_eq = red_mdp_map[s.state_id] for io, c in red_eq.children.items(): destination = red_mdp_map[tuple(c.getPrefix())] i = io if self.automaton_type == 'mc' else io[0] if self.automaton_type == 'mdp': s.transitions[i].append((destination, red_eq.children_prob[io])) elif self.automaton_type == 'mc': s.transitions.append((destination, red_eq.children_prob[i])) elif self.automaton_type == 'smm': s.transitions[i].append((destination, io[1], red_eq.children_prob[io])) else: s.transitions[i] = destination return a_c(initial_state, states)
#  
Alergia(
data,
automaton_type,
eps=0.005,
compatibility_checker=None,
optimize_for='accuracy',
print_info=False
)
View Source
def __init__(self, data, automaton_type, eps=0.005, compatibility_checker=None, optimize_for='accuracy', print_info=False): assert eps == 'auto' or 0 < eps <= 2 assert optimize_for in {'memory', 'accuracy'} self.automaton_type = automaton_type self.print_info = print_info self.optimize_for = optimize_for if eps == 'auto': eps = 10 / sum(len(d) - 1 for d in data) # len - 1 to ignore initial output self.diff_checker = HoeffdingCompatibility(eps) if not compatibility_checker else compatibility_checker pta_start = time.time() self.immutableTreeRoot, self.mutableTreeRoot = create_fpta(data, automaton_type, optimize_for) pta_time = round(time.time() - pta_start, 2) if self.print_info: print(f'PTA Construction Time: {pta_time}')
View Source
def compatibility_test(self, a, b): if self.automaton_type != 'smm' and a.output != b.output: return False if not a.children.values() or not b.children.values(): return True if self.automaton_type != 'moore' and not self.diff_checker.check_difference(a, b): return False for el in set(a.children.keys()).intersection(b.children.keys()): if not self.compatibility_test(a.children[el], b.children[el]): return False return True
View Source
def merge(self, q_r, q_b): t_q_b = self.get_blue_node(q_b) b_prefix = q_b.getPrefix() to_update = self.mutableTreeRoot for p in b_prefix[:-1]: to_update = to_update.children[p] to_update.children[b_prefix[-1]] = q_r self.fold(q_r, q_b, t_q_b)
View Source
def fold(self, red, blue, blue_tree_node): for i, c in blue.children.items(): if i in red.children.keys(): red.input_frequency[i] += blue_tree_node.input_frequency[i] self.fold(red.children[i], blue.children[i], self.get_blue_node(blue.children[i])) else: red.children[i] = blue.children[i] red.input_frequency[i] = blue_tree_node.input_frequency[i]
View Source
def run(self): start_time = time.time() red = [self.mutableTreeRoot] # representative nodes and will be included in the final output model blue = self.mutableTreeRoot.successors() # intermediate successors scheduled for testing while blue: lex_min_blue = min(list(blue), key=lambda x: len(x.getPrefix())) merged = False for q_r in red: if self.compatibility_test(self.get_blue_node(q_r), self.get_blue_node(lex_min_blue)): self.merge(q_r, lex_min_blue) merged = True break if not merged: insort(red, lex_min_blue) blue.clear() for r in red: for s in r.successors(): if s not in red: blue.append(s) assert sorted(red, key=lambda x: len(x.getPrefix())) == red if self.automaton_type != 'moore': self.normalize(red) for i, r in enumerate(red): r.state_id = f'q{i}' if self.print_info: print(f'Alergia Learning Time: {round(time.time() - start_time, 2)}') print(f'Alergia Learned {len(red)} state automaton.') return self.to_automaton(red)
View Source
def normalize(self, red): red_sorted = sorted(list(red), key=lambda x: len(x.getPrefix())) for r in red_sorted: r.children_prob = dict() # Initializing in here saves many unnecessary initializations if self.automaton_type == 'mc': total_output = sum(r.input_frequency.values()) for i in r.input_frequency.keys(): r.children_prob[i] = r.input_frequency[i] / total_output else: outputs_per_input = defaultdict(int) for io, freq in r.input_frequency.items(): outputs_per_input[io[0]] += freq for io in r.input_frequency.keys(): r.children_prob[io] = r.input_frequency[io] / outputs_per_input[io[0]]
View Source
def get_blue_node(self, red_node): if self.optimize_for == 'memory': return red_node blue = self.immutableTreeRoot for p in red_node.getPrefix(): blue = blue.children[p] return blue
View Source
def to_automaton(self, red): s_c = state_automaton_map[self.automaton_type][0] a_c = state_automaton_map[self.automaton_type][1] states = [] initial_state = None red_mdp_map = dict() for s in red: if self.automaton_type != 'smm': automaton_state = s_c(s.state_id, output=s.output) else: automaton_state = s_c(s.state_id) automaton_state.prefix = s.getPrefix() states.append(automaton_state) red_mdp_map[tuple(s.getPrefix())] = automaton_state red_mdp_map[automaton_state.state_id] = s if not s.getPrefix(): initial_state = automaton_state for s in states: red_eq = red_mdp_map[s.state_id] for io, c in red_eq.children.items(): destination = red_mdp_map[tuple(c.getPrefix())] i = io if self.automaton_type == 'mc' else io[0] if self.automaton_type == 'mdp': s.transitions[i].append((destination, red_eq.children_prob[io])) elif self.automaton_type == 'mc': s.transitions.append((destination, red_eq.children_prob[i])) elif self.automaton_type == 'smm': s.transitions[i].append((destination, io[1], red_eq.children_prob[io])) else: s.transitions[i] = destination return a_c(initial_state, states)
#  
def
run_Alergia(
data,
automaton_type,
eps=0.005,
compatibility_checker=None,
optimize_for='accuracy',
print_info=False
):
View Source
def run_Alergia(data, automaton_type, eps=0.005, compatibility_checker=None, optimize_for='accuracy', print_info=False): """ Run Alergia or IOAlergia on provided data. Args: data: data either in a form [[I,I,I],[I,I,I],...] if learning Markov Chains or [[O,(I,O),(I,O)...], [O,(I,O_,...],..,] if learning MDPs (I represents input, O output). Note that in whole data first symbol of each entry should be the same (Initial output of the MDP/MC). eps: epsilon value if you are using default HoeffdingCompatibility. If it is set to 'auto' it will be computed as 10/(all steps in the data) automaton_type: either 'mdp' if you wish to learn an MDP, 'mc' if you want to learn Markov Chain, 'moore' if you want to learn Moore Machine (underlying structure is deterministic), or 'smm' if you want to learn stochastic Mealy machine optimize_for: either 'memory' or 'accuracy'. memory will use 50% less memory, but will be more inaccurate. compatibility_checker: impl. of class CompatibilityChecker, HoeffdingCompatibility with eps value by default (note: not interchangeable, depends on data) print_info: Returns: mdp or markov chain """ assert automaton_type in {'mdp', 'mc', 'moore', 'smm'} alergia = Alergia(data, eps=eps, automaton_type=automaton_type, optimize_for=optimize_for, compatibility_checker=compatibility_checker, print_info=print_info) model = alergia.run() del alergia.mutableTreeRoot, alergia.immutableTreeRoot, alergia return model
Run Alergia or IOAlergia on provided data.
Args:
data: data either in a form [[I,I,I],[I,I,I],...] if learning Markov Chains or [[O,(I,O),(I,O)...],
[O,(I,O_,...],..,] if learning MDPs (I represents input, O output).
Note that in whole data first symbol of each entry should be the same (Initial output of the MDP/MC).
eps: epsilon value if you are using default HoeffdingCompatibility. If it is set to 'auto' it will be computed
as 10/(all steps in the data)
automaton_type: either 'mdp' if you wish to learn an MDP, 'mc' if you want to learn Markov Chain, 'moore'
if you want to learn Moore Machine (underlying structure is deterministic), or 'smm' if you
want to learn stochastic Mealy machine
optimize_for: either 'memory' or 'accuracy'. memory will use 50% less memory, but will be more inaccurate.
compatibility_checker: impl. of class CompatibilityChecker, HoeffdingCompatibility with eps value by default
(note: not interchangeable, depends on data)
print_info:
Returns:
mdp or markov chain
#  
def
run_JAlergia(
path_to_data_file,
automaton_type,
path_to_jAlergia_jar,
eps=0.005,
heap_memory='-Xmx2048M',
optimize_for='accuracy'
):
View Source
def run_JAlergia(path_to_data_file, automaton_type, path_to_jAlergia_jar, eps=0.005, heap_memory='-Xmx2048M', optimize_for='accuracy'): """ Run Alergia or IOAlergia on provided data. Args: path_to_data_file: either a data in a list of lists or a path to file containing data. Form [[I,I,I],[I,I,I],...] if learning Markov Chains or [[O,I,O,I,O...], [O,(I,O_,...],..,] if learning MDPs (I represents input, O output). Note that in whole data first symbol of each entry should be the same (Initial output of the MDP/MC). eps: epsilon value heap_memory: java heap memory flag, increase if heap is full optimize_for: either 'memory' or 'accuracy'. memory will use 50% less memory, but will be more inaccurate. automaton_type: either 'mdp' if you wish to learn an MDP, 'mc' if you want to learn Markov Chain, or 'smm' if you want to learn stochastic Mealy machine Returns: learnedModel """ # TODO rename path_to_data_file to data in next versions of AALpy after 20. may assert automaton_type in {'mdp', 'smm', 'mc'} assert optimize_for in {'memory', 'accuracy'} import os import subprocess from aalpy.utils.FileHandler import load_automaton_from_file save_file = "jAlergiaModel.dot" delete_tmp_file = False if os.path.exists(save_file): os.remove(save_file) if os.path.exists(path_to_jAlergia_jar): path_to_jAlergia_jar = os.path.abspath(path_to_jAlergia_jar) else: print(f'JAlergia jar not found at {path_to_jAlergia_jar}.') return if isinstance(path_to_data_file, str): if os.path.exists(path_to_data_file): abs_path = os.path.abspath(path_to_data_file) else: print('Input file not found.') return else: if not isinstance(path_to_data_file, (list, tuple)): print('Data should be either a list of sequences or a path to the data file.') with open('jAlergiaInputs.txt', 'w') as f: for seq in path_to_data_file: f.write(','.join([str(s) for s in seq])) delete_tmp_file = True abs_path = os.path.abspath('jAlergiaInputs.txt') optimize_for = optimize_for[:3] subprocess.call(['java', heap_memory, '-jar', path_to_jAlergia_jar, '-input', abs_path, '-eps', str(eps), '-type', automaton_type, '-optim', optimize_for]) if not os.path.exists(save_file): print("JAlergia error occurred.") return model = load_automaton_from_file(save_file, automaton_type=automaton_type) os.remove(save_file) if delete_tmp_file: os.remove('jAlergiaInputs.txt') return model
Run Alergia or IOAlergia on provided data.
Args:
path_to_data_file: either a data in a list of lists or a path to file containing data.
Form [[I,I,I],[I,I,I],...] if learning Markov Chains or
[[O,I,O,I,O...], [O,(I,O_,...],..,] if learning MDPs (I represents input, O output).
Note that in whole data first symbol of each entry should be the same (Initial output of the MDP/MC).
eps: epsilon value
heap_memory: java heap memory flag, increase if heap is full
optimize_for: either 'memory' or 'accuracy'. memory will use 50% less memory, but will be more inaccurate.
automaton_type: either 'mdp' if you wish to learn an MDP, 'mc' if you want to learn Markov Chain,
or 'smm' if you
want to learn stochastic Mealy machine
Returns:
learnedModel