Source code for api.json_ai

# TODO: We need a better way to specify trainable_encoders
# TODO: lookup_encoder is awkward; similar to dtype, can we make a file with encoder_lookup? People may be interested
# in seeing where these come from and it's not clear that you need to look here.
# TODO: What does `target_class_distribution` and `positive_domain` do?
# TODO: generate_json_ai is really large; can we abstract it into smaller functions to make it more readable?
# TODO: add_implicit_values unit test ensures NO changes for a fully specified file.
# TODO: Please fix spelling on parallel_preped_encoders

from typing import Dict
from lightwood.helpers.templating import call, inline_dict, align
import black
from lightwood.api import dtype
import numpy as np
from lightwood.api.types import (
    JsonAI,
    TypeInformation,
    StatisticalAnalysis,
    Feature,
    Output,
    ProblemDefinition,
)


trainable_encoders = ('PretrainedLangEncoder', 'CategoricalAutoEncoder', 'TimeSeriesEncoder', 'ArrayEncoder')
ts_encoders = ('TimeSeriesEncoder', 'TsNumericEncoder')


[docs]def lookup_encoder( col_dtype: str, col_name: str, is_target: bool, problem_defintion: ProblemDefinition, is_target_predicting_encoder: bool, ): """ Assign a default encoder for a given column based on its data type, and whether it is a target. Encoders intake raw (but cleaned) data and return an feature representation. This function assigns, per data type, what the featurizer should be. This function runs on each column within the dataset available for model building to assign how it should be featurized. Users may override to create a custom encoder to enable their own featurization process. However, in order to generate a template JSON-AI, this code is run first. Users may edit the generated syntax and use custom approaches while model building. For each encoder, "args" may be passed. These args depend an encoder requires during its preparation call. :param col_dtype: A data-type of a column specified :param col_name: The name of the column :param is_target: Whether the column is the target for prediction. If true, only certain possible feature representations are allowed, particularly for complex data types. :param problem_definition: The ``ProblemDefinition`` criteria; this populates specifics on how models and encoders may be trained. :param is_target_predicting_encoder: """ # noqa tss = problem_defintion.timeseries_settings encoder_lookup = { dtype.integer: 'Integer.NumericEncoder', dtype.float: 'Float.NumericEncoder', dtype.binary: 'Binary.BinaryEncoder', dtype.categorical: 'Categorical.CategoricalAutoEncoder', dtype.tags: 'Tags.MultiHotEncoder', dtype.date: 'Date.DatetimeEncoder', dtype.datetime: 'Datetime.DatetimeEncoder', dtype.image: 'Image.Img2VecEncoder', dtype.rich_text: 'Rich_Text.PretrainedLangEncoder', dtype.short_text: 'Short_Text.CategoricalAutoEncoder', dtype.array: 'Array.ArrayEncoder', dtype.quantity: 'Quantity.NumericEncoder', } # If column is a target, only specific feature representations are allowed that enable supervised tasks target_encoder_lookup_override = { dtype.rich_text: 'Rich_Text.VocabularyEncoder', dtype.categorical: 'Categorical.OneHotEncoder', } # Assign a default encoder to each column. encoder_dict = {"module": encoder_lookup[col_dtype], "args": {}} # If the column is a target, ensure that the feature representation can enable supervised tasks if is_target: encoder_dict['args'] = {'is_target': 'True'} if col_dtype in target_encoder_lookup_override: encoder_dict['module'] = target_encoder_lookup_override[col_dtype] if col_dtype in (dtype.categorical, dtype.binary): if problem_defintion.unbias_target: encoder_dict['args']['target_class_distribution'] = '$statistical_analysis.target_class_distribution' if col_dtype in (dtype.integer, dtype.float, dtype.array): encoder_dict['args'][ "positive_domain" ] = "$statistical_analysis.positive_domain" # Time-series representations require more advanced flags if tss.is_timeseries: gby = tss.group_by if tss.group_by is not None else [] if col_name in tss.order_by + tss.historical_columns: encoder_dict['module'] = col_dtype.capitalize() + ".TimeSeriesEncoder" encoder_dict['args']['original_type'] = f'"{col_dtype}"' encoder_dict['args']['target'] = "self.target" encoder_dict['args']['grouped_by'] = f"{gby}" if is_target: if col_dtype in [dtype.integer]: encoder_dict['args']['grouped_by'] = f"{gby}" encoder_dict['module'] = "Integer.TsNumericEncoder" if col_dtype in [dtype.float]: encoder_dict['args']['grouped_by'] = f"{gby}" encoder_dict['module'] = "Float.TsNumericEncoder" if tss.nr_predictions > 1: encoder_dict['args']['grouped_by'] = f"{gby}" encoder_dict['args']['timesteps'] = f"{tss.nr_predictions}" encoder_dict['module'] = 'Array.TsArrayNumericEncoder' if '__mdb_ts_previous' in col_name: encoder_dict['module'] = col_dtype.capitalize() + '.ArrayEncoder' encoder_dict['args']['original_type'] = f'"{tss.target_type}"' encoder_dict['args']['window'] = f'{tss.window}' # Set arguments for the encoder if encoder_dict['module'] == "Rich_Text.PretrainedLangEncoder" and not is_target: encoder_dict['args']['output_type'] = "$dtype_dict[$target]" for encoder_name in trainable_encoders: if encoder_name == encoder_dict['module'].split(".")[1]: encoder_dict['args'][ "stop_after" ] = "$problem_definition.seconds_per_encoder" if is_target_predicting_encoder: encoder_dict['args']['embed_mode'] = 'False' return encoder_dict
[docs]def generate_json_ai( type_information: TypeInformation, statistical_analysis: StatisticalAnalysis, problem_definition: ProblemDefinition, ) -> JsonAI: """ Given ``TypeInformation``, ``StatisticalAnalysis``, and the ``ProblemDefinition``, generate a JSON config file with the necessary elements of the ML pipeline populated. :param TypeInformation: Specifies what data types each column within the dataset are :param statistical_analysis: :param problem_definition: Specifies details of the model training/building procedure, as defined by ``ProblemDefinition`` :returns: JSON-AI object with fully populated details of the ML pipeline """ # noqa target = problem_definition.target input_cols = [] for col_name, col_dtype in type_information.dtypes.items(): if ( col_name not in type_information.identifiers and col_dtype not in (dtype.invalid, dtype.empty) and col_name != target ): input_cols.append(col_name) is_target_predicting_encoder = False # Single text column classification if ( len(input_cols) == 1 and type_information.dtypes[input_cols[0]] in (dtype.rich_text) and type_information.dtypes[target] in (dtype.categorical, dtype.binary) ): is_target_predicting_encoder = True if is_target_predicting_encoder: mixers = [{ 'module': 'Unit', 'args': { 'target_encoder': '$encoders[self.target]', 'stop_after': '$problem_definition.seconds_per_mixer' } }] else: mixers = [{ 'module': 'Neural', 'args': { 'fit_on_dev': True, 'stop_after': '$problem_definition.seconds_per_mixer', 'search_hyperparameters': True } }] if not problem_definition.timeseries_settings.is_timeseries or \ problem_definition.timeseries_settings.nr_predictions <= 1: mixers.extend([{ 'module': 'LightGBM', 'args': { 'stop_after': '$problem_definition.seconds_per_mixer', 'fit_on_dev': True } }, { 'module': 'Regression', 'args': { 'stop_after': '$problem_definition.seconds_per_mixer', } } ]) elif problem_definition.timeseries_settings.nr_predictions > 1: mixers.extend([{ 'module': 'LightGBMArray', 'args': { 'fit_on_dev': True, 'stop_after': '$problem_definition.seconds_per_mixer', 'n_ts_predictions': '$problem_definition.timeseries_settings.nr_predictions' } }]) if problem_definition.timeseries_settings.use_previous_target: mixers.extend([ { 'module': 'SkTime', 'args': { 'stop_after': '$problem_definition.seconds_per_mixer', 'n_ts_predictions': '$problem_definition.timeseries_settings.nr_predictions', }, } ]) outputs = {target: Output( data_dtype=type_information.dtypes[target], encoder=None, mixers=mixers, ensemble={ 'module': 'BestOf', 'args': { 'accuracy_functions': '$accuracy_functions', } } )} if ( problem_definition.timeseries_settings.is_timeseries and problem_definition.timeseries_settings.nr_predictions > 1 ): list(outputs.values())[0].data_dtype = dtype.array list(outputs.values())[0].encoder = lookup_encoder( type_information.dtypes[target], target, True, problem_definition, False ) features: Dict[str, Feature] = {} for col_name in input_cols: col_dtype = type_information.dtypes[col_name] dependency = [] encoder = lookup_encoder( col_dtype, col_name, False, problem_definition, is_target_predicting_encoder ) for encoder_name in ts_encoders: if ( problem_definition.timeseries_settings.is_timeseries and encoder_name == encoder['module'].split(".")[1] ): if problem_definition.timeseries_settings.group_by is not None: for group in problem_definition.timeseries_settings.group_by: dependency.append(group) if problem_definition.timeseries_settings.use_previous_target: dependency.append(f"__mdb_ts_previous_{target}") if len(dependency) > 0: feature = Feature(encoder=encoder, dependency=dependency) else: feature = Feature(encoder=encoder) features[col_name] = feature # Decide on the accuracy functions to use if list(outputs.values())[0].data_dtype in [dtype.integer, dtype.float, dtype.date, dtype.datetime]: accuracy_functions = ['r2_score'] elif list(outputs.values())[0].data_dtype in [dtype.categorical, dtype.tags, dtype.binary]: accuracy_functions = ['balanced_accuracy_score'] elif list(outputs.values())[0].data_dtype in [dtype.array]: accuracy_functions = ['evaluate_array_accuracy'] else: raise Exception(f'Please specify a custom accuracy function for output type {data_dtype}') if problem_definition.time_aim is None and ( problem_definition.seconds_per_mixer is None or problem_definition.seconds_per_encoder is None): problem_definition.time_aim = 1000 + np.log( statistical_analysis.nr_rows / 10 + 1) * np.sum( [4 if x in [dtype.rich_text, dtype.short_text, dtype.array, dtype.video, dtype.audio, dtype.image] else 1 for x in type_information.dtypes.values()]) * 200 if problem_definition.time_aim is not None: nr_trainable_encoders = len([x for x in features.values() if x.encoder['module'].split('.')[1] in trainable_encoders]) nr_mixers = len(list(outputs.values())[0].mixers) encoder_time_budget_pct = max(3.3 / 5, 1.5 + np.log(nr_trainable_encoders + 1) / 5) if nr_trainable_encoders == 0: problem_definition.seconds_per_encoder = 0 else: problem_definition.seconds_per_encoder = int( problem_definition.time_aim * (encoder_time_budget_pct / nr_trainable_encoders)) problem_definition.seconds_per_mixer = int( problem_definition.time_aim * ((1 / encoder_time_budget_pct) / nr_mixers)) return JsonAI( cleaner=None, splitter=None, analyzer=None, explainer=None, features=features, outputs=outputs, imports=None, problem_definition=problem_definition, identifiers=type_information.identifiers, timeseries_transformer=None, timeseries_analyzer=None, accuracy_functions=accuracy_functions, )
[docs]def add_implicit_values(json_ai: JsonAI) -> JsonAI: """ To enable brevity in writing, auto-generate the "unspecified/missing" details required in the ML pipeline. :params: json_ai: ``JsonAI`` object that describes the ML pipeline that may not have every detail fully specified. :returns: ``JSONAI`` object with all necessary parameters that were previously left unmentioned filled in. """ problem_definition = json_ai.problem_definition imports = [ 'from lightwood.mixer import Neural', 'from lightwood.mixer import LightGBM', 'from lightwood.mixer import LightGBMArray', 'from lightwood.mixer import SkTime', 'from lightwood.mixer import Unit', 'from lightwood.mixer import Regression', 'from lightwood.ensemble import BestOf', 'from lightwood.data import cleaner', 'from lightwood.data import transform_timeseries, timeseries_analyzer', 'from lightwood.data import splitter', 'from lightwood.analysis import model_analyzer, explain', 'from sklearn.metrics import r2_score, balanced_accuracy_score, accuracy_score', 'import pandas as pd', 'from lightwood.helpers.seed import seed', 'from lightwood.helpers.log import log', 'import lightwood', 'from lightwood.api import *', 'from lightwood.mixer import BaseMixer', 'from lightwood.encoder import BaseEncoder, __ts_encoders__', 'from lightwood.encoder import Array, Binary, Categorical, Date, Datetime, Float, Image, Integer, Quantity, Rich_Text, Short_Text, Tags', # noqa 'from lightwood.ensemble import BaseEnsemble', 'from typing import Dict, List', 'from lightwood.helpers.parallelism import mut_method_call', 'from lightwood.data.encoded_ds import ConcatedEncodedDs', 'from lightwood import ProblemDefinition'] if json_ai.imports is None: json_ai.imports = imports else: json_ai.imports.extend(imports) for feature in [list(json_ai.outputs.values())[0], *json_ai.features.values()]: encoder_import = feature.encoder['module'] if "." in encoder_import: continue imports.append(f"from lightwood.encoder import {encoder_import}") if problem_definition.timeseries_settings.use_previous_target: imports.append('from lightwood.encoder import ArrayEncoder') # Add implicit arguments # @TODO: Consider removing once we have a proper editor in studio mixers = json_ai.outputs[json_ai.problem_definition.target].mixers for i in range(len(mixers)): if mixers[i]['module'] == 'Unit': pass elif mixers[i]['module'] == 'Neural': mixers[i]['args']['target_encoder'] = mixers[i]['args'].get('target_encoder', '$encoders[self.target]') mixers[i]['args']['target'] = mixers[i]['args'].get('target', '$target') mixers[i]['args']['dtype_dict'] = mixers[i]['args'].get('dtype_dict', '$dtype_dict') mixers[i]['args']['input_cols'] = mixers[i]['args'].get('input_cols', '$input_cols') mixers[i]['args']['timeseries_settings'] = mixers[i]['args'].get( 'timeseries_settings', '$problem_definition.timeseries_settings') mixers[i]['args']['net'] = mixers[i]['args'].get( 'net', '"DefaultNet"' if not problem_definition.timeseries_settings.is_timeseries or not problem_definition.timeseries_settings.use_previous_target else '"ArNet"') elif mixers[i]['module'] == 'LightGBM': mixers[i]['args']['target'] = mixers[i]['args'].get('target', '$target') mixers[i]['args']['dtype_dict'] = mixers[i]['args'].get('dtype_dict', '$dtype_dict') mixers[i]['args']['input_cols'] = mixers[i]['args'].get('input_cols', '$input_cols') elif mixers[i]['module'] == 'Regression': mixers[i]['args']['target'] = mixers[i]['args'].get('target', '$target') mixers[i]['args']['dtype_dict'] = mixers[i]['args'].get('dtype_dict', '$dtype_dict') mixers[i]['args']['target_encoder'] = mixers[i]['args'].get('target_encoder', '$encoders[self.target]') elif mixers[i]['module'] == 'LightGBMArray': mixers[i]['args']['target'] = mixers[i]['args'].get('target', '$target') mixers[i]['args']['dtype_dict'] = mixers[i]['args'].get('dtype_dict', '$dtype_dict') mixers[i]['args']['input_cols'] = mixers[i]['args'].get('input_cols', '$input_cols') elif mixers[i]['module'] == 'SkTime': mixers[i]['args']['target'] = mixers[i]['args'].get('target', '$target') mixers[i]['args']['dtype_dict'] = mixers[i]['args'].get('dtype_dict', '$dtype_dict') mixers[i]['args']['ts_analysis'] = mixers[i]['args'].get('ts_analysis', '$ts_analysis') ensemble = json_ai.outputs[json_ai.problem_definition.target].ensemble ensemble['args']['target'] = ensemble['args'].get('target', '$target') ensemble['args']['data'] = ensemble['args'].get('data', 'test_data') ensemble['args']['mixers'] = ensemble['args'].get('mixers', '$mixers') for name in json_ai.features: if json_ai.features[name].dependency is None: json_ai.features[name].dependency = [] if json_ai.features[name].data_dtype is None: json_ai.features[name].data_dtype = ( json_ai.features[name].encoder['module'].split(".")[0].lower() ) # Add implicit phases # @TODO: Consider removing once we have a proper editor in studio if json_ai.cleaner is None: json_ai.cleaner = { "module": "cleaner", "args": { "pct_invalid": "$problem_definition.pct_invalid", "ignore_features": "$problem_definition.ignore_features", "identifiers": "$identifiers", "data": "data", "dtype_dict": "$dtype_dict", "target": "$target", "mode": "$mode", "timeseries_settings": "$problem_definition.timeseries_settings", "anomaly_detection": "$problem_definition.anomaly_detection", }, } if json_ai.splitter is None: json_ai.splitter = { 'module': 'splitter', 'args': { 'tss': '$problem_definition.timeseries_settings', 'data': 'data', 'k': 'nsubsets' } } if json_ai.analyzer is None: json_ai.analyzer = { "module": "model_analyzer", "args": { "stats_info": "$statistical_analysis", "ts_cfg": "$problem_definition.timeseries_settings", "accuracy_functions": "$accuracy_functions", "predictor": "$ensemble", "data": "test_data", "train_data": "train_data", "target": "$target", "disable_column_importance": "False", "dtype_dict": "$dtype_dict", "fixed_significance": None, "confidence_normalizer": False, "positive_domain": "$statistical_analysis.positive_domain", }, } if json_ai.explainer is None: json_ai.explainer = { "module": "explain", "args": { "timeseries_settings": "$problem_definition.timeseries_settings", "positive_domain": "$statistical_analysis.positive_domain", "fixed_confidence": "$problem_definition.fixed_confidence", "anomaly_detection": "$problem_definition.anomaly_detection", "anomaly_error_rate": "$problem_definition.anomaly_error_rate", "anomaly_cooldown": "$problem_definition.anomaly_cooldown", "data": "data", "encoded_data": "encoded_data", "predictions": "df", "analysis": "$runtime_analyzer", "ts_analysis": "$ts_analysis" if problem_definition.timeseries_settings.is_timeseries else None, "target_name": "$target", "target_dtype": "$dtype_dict[self.target]", }, } if problem_definition.timeseries_settings.is_timeseries: if json_ai.timeseries_transformer is None: json_ai.timeseries_transformer = { "module": "transform_timeseries", "args": { "timeseries_settings": "$problem_definition.timeseries_settings", "data": "data", "dtype_dict": "$dtype_dict", "target": "$target", "mode": "$mode", }, } if json_ai.timeseries_analyzer is None: json_ai.timeseries_analyzer = { "module": "timeseries_analyzer", "args": { "timeseries_settings": "$problem_definition.timeseries_settings", "data": "data", "dtype_dict": "$dtype_dict", "target": "$target", }, } return json_ai
[docs]def code_from_json_ai(json_ai: JsonAI) -> str: """ Generates a custom ``PredictorInterface`` given the specifications from ``JsonAI`` object. :param json_ai: ``JsonAI`` object with fully specified parameters :returns: Automated syntax of the ``PredictorInterface`` object. """ # Fill in any missing values json_ai = add_implicit_values(json_ai) encoder_dict = {json_ai.problem_definition.target: call(list(json_ai.outputs.values())[0].encoder)} dependency_dict = {} dtype_dict = { json_ai.problem_definition.target: f"""'{list(json_ai.outputs.values())[0].data_dtype}'""" } for col_name, feature in json_ai.features.items(): if col_name not in json_ai.problem_definition.ignore_features: encoder_dict[col_name] = call(feature.encoder) dependency_dict[col_name] = feature.dependency dtype_dict[col_name] = f"""'{feature.data_dtype}'""" # @TODO: Move into json-ai creation function (I think? Maybe? Let's discuss) if json_ai.problem_definition.timeseries_settings.use_previous_target: col_name = f'__mdb_ts_previous_{json_ai.problem_definition.target}' json_ai.problem_definition.timeseries_settings.target_type = list(json_ai.outputs.values())[0].data_dtype encoder_dict[col_name] = call(lookup_encoder(list(json_ai.outputs.values())[0].data_dtype, col_name, False, json_ai.problem_definition, False, )) dependency_dict[col_name] = [] dtype_dict[col_name] = f"""'{list(json_ai.outputs.values())[0].data_dtype}'""" json_ai.features[col_name] = Feature(encoder=encoder_dict[col_name]) ignored_cols = json_ai.problem_definition.ignore_features input_cols = [x.replace("'", "\\'").replace('"', '\\"') for x in json_ai.features] input_cols = ','.join([f"""'{name}'""" for name in input_cols if name not in ignored_cols]) ts_transform_code = "" ts_analyze_code = "" ts_encoder_code = "" if json_ai.timeseries_transformer is not None: ts_transform_code = f""" log.info('Transforming timeseries data') data = {call(json_ai.timeseries_transformer)} """ ts_analyze_code = f""" self.ts_analysis = {call(json_ai.timeseries_analyzer)} """ if json_ai.timeseries_analyzer is not None: ts_encoder_code = """ if type(encoder) in __ts_encoders__: kwargs['ts_analysis'] = self.ts_analysis """ if json_ai.problem_definition.timeseries_settings.is_timeseries: ts_target_code = """ if encoder.is_target: encoder.normalizers = self.ts_analysis['target_normalizers'] encoder.group_combinations = self.ts_analysis['group_combinations'] """ else: ts_target_code = "" dataprep_body = f""" # The type of each column self.problem_definition = ProblemDefinition.from_dict({json_ai.problem_definition.to_dict()}) self.accuracy_functions = {json_ai.accuracy_functions} self.identifiers = {json_ai.identifiers} self.dtype_dict = {inline_dict(dtype_dict)} self.statistical_analysis = lightwood.data.statistical_analysis(data, self.dtype_dict, {json_ai.identifiers}, self.problem_definition) self.mode = 'train' # How columns are encoded self.encoders = {inline_dict(encoder_dict)} # Which column depends on which self.dependencies = {inline_dict(dependency_dict)} # self.input_cols = [{input_cols}] log.info('Cleaning the data') data = {call(json_ai.cleaner)} {ts_transform_code} {ts_analyze_code} nsubsets = {json_ai.problem_definition.nsubsets} log.info(f'Splitting the data into {{nsubsets}} subsets') subsets = {call(json_ai.splitter)} log.info('Preparing the encoders') encoder_preping_dict = {{}} enc_preping_data = pd.concat(subsets[0:nsubsets-1]) for col_name, encoder in self.encoders.items(): if not encoder.is_nn_encoder: encoder_preping_dict[col_name] = [encoder, enc_preping_data[col_name], 'prepare'] log.info(f'Encoder preping dict length of: {{len(encoder_preping_dict)}}') parallel_preped_encoders = mut_method_call(encoder_preping_dict) for col_name, encoder in parallel_preped_encoders.items(): self.encoders[col_name] = encoder if self.target not in parallel_preped_encoders: self.encoders[self.target].prepare(enc_preping_data[self.target]) for col_name, encoder in self.encoders.items(): if encoder.is_nn_encoder: priming_data = pd.concat(subsets[0:nsubsets-1]) kwargs = {{}} if self.dependencies[col_name]: kwargs['dependency_data'] = {{}} for col in self.dependencies[col_name]: kwargs['dependency_data'][col] = {{ 'original_type': self.dtype_dict[col], 'data': priming_data[col] }} {align(ts_encoder_code, 3)} # This assumes target encoders are also prepared in parallel, might not be true if hasattr(encoder, 'uses_target'): kwargs['encoded_target_values'] = parallel_preped_encoders[self.target].encode(priming_data[self.target]) encoder.prepare(priming_data[col_name], **kwargs) {align(ts_target_code, 1)} """ dataprep_body = align(dataprep_body, 2) learn_body = f""" log.info('Featurizing the data') encoded_ds_arr = lightwood.encode(self.encoders, subsets, self.target) train_data = encoded_ds_arr[0:int(nsubsets*0.9)] test_data = encoded_ds_arr[int(nsubsets*0.9):] log.info('Training the mixers') self.mixers = [{', '.join([call(x) for x in list(json_ai.outputs.values())[0].mixers])}] trained_mixers = [] for mixer in self.mixers: try: mixer.fit(train_data) trained_mixers.append(mixer) except Exception as e: log.warning(f'Exception: {{e}} when training mixer: {{mixer}}') if {json_ai.problem_definition.strict_mode} and mixer.stable: raise e self.mixers = trained_mixers log.info('Ensembling the mixer') self.ensemble = {call(list(json_ai.outputs.values())[0].ensemble)} self.supports_proba = self.ensemble.supports_proba log.info('Analyzing the ensemble') self.model_analysis, self.runtime_analyzer = {call(json_ai.analyzer)} # Enable partial fit of model, after its trained, on validation data. This is ONLY to be used in cases where there is # an expectation of testing data and a continuously evolving pipeline; this assumes that all data available is # important to train with. for mixer in self.mixers: if {json_ai.problem_definition.fit_on_validation}: mixer.partial_fit(test_data, train_data) """ learn_body = align(learn_body, 2) predict_common_body = f""" self.mode = 'predict' log.info('Cleaning the data') data = {call(json_ai.cleaner)} {ts_transform_code} encoded_ds = lightwood.encode(self.encoders, data, self.target)[0] encoded_data = encoded_ds.get_encoded_data(include_target=False) """ predict_common_body = align(predict_common_body, 2) predict_body = f""" df = self.ensemble(encoded_ds) insights = {call(json_ai.explainer)} return insights """ predict_body = align(predict_body, 2) predict_proba_body = f""" df = self.ensemble(encoded_ds, predict_proba=True) insights = {call(json_ai.explainer)} return insights """ predict_proba_body = align(predict_proba_body, 2) imports = "\n".join(json_ai.imports) predictor_code = f""" {imports} from lightwood.api import PredictorInterface class Predictor(PredictorInterface): target: str mixers: List[BaseMixer] encoders: Dict[str, BaseEncoder] ensemble: BaseEnsemble mode: str def __init__(self): seed({json_ai.problem_definition.seed_nr}) self.target = '{json_ai.problem_definition.target}' self.mode = 'innactive' def learn(self, data: pd.DataFrame) -> None: {dataprep_body} {learn_body} def predict(self, data: pd.DataFrame) -> pd.DataFrame: {predict_common_body} {predict_body} def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame: {predict_common_body} {predict_proba_body} """ predictor_code = black.format_str(predictor_code, mode=black.FileMode()) return predictor_code
[docs]def validate_json_ai(json_ai: JsonAI) -> bool: """ Checks the validity of a ``JsonAI`` object :param json_ai: A ``JsonAI`` object :returns: Wether the JsonAI is valid, i.e. doesn't contain prohibited values, unknown values and can be turned into code. """ # noqa from lightwood.api.high_level import predictor_from_code, code_from_json_ai try: predictor_from_code(code_from_json_ai(json_ai)) return True except Exception: return False