# -*- coding: utf-8 -*-
import logging
import pydoc
import copy
import typing # noqa
from typing import Union, Dict, Any, Iterable
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator
from tensorflow.keras.models import Sequential
logger = logging.getLogger(__name__)
[docs]def import_locate(import_path: str) -> Any:
return pydoc.locate(import_path)
[docs]def from_definition(
pipe_definition: Union[str, Dict[str, Dict[str, Any]]]
) -> Union[FeatureUnion, Pipeline]:
"""
Construct a Pipeline or FeatureUnion from a definition.
Example
-------
>>> import yaml
>>> from gordo import serializer
>>> raw_config = '''
... sklearn.pipeline.Pipeline:
... steps:
... - sklearn.decomposition.PCA:
... n_components: 3
... - sklearn.pipeline.FeatureUnion:
... - sklearn.decomposition.PCA:
... n_components: 3
... - sklearn.pipeline.Pipeline:
... - sklearn.preprocessing.MinMaxScaler
... - sklearn.decomposition.TruncatedSVD:
... n_components: 2
... - sklearn.ensemble.RandomForestClassifier:
... max_depth: 3
... '''
>>> config = yaml.safe_load(raw_config)
>>> scikit_learn_pipeline = serializer.from_definition(config)
Parameters
---------
pipe_definition
List of steps for the Pipeline / FeatureUnion
constructor_class
What to place the list of transformers into,
either sklearn.pipeline.Pipeline/FeatureUnion
Returns
-------
sklearn.pipeline.Pipeline
pipeline
"""
# Avoid some mutation
definition = copy.deepcopy(pipe_definition)
return _build_step(definition)
def _build_branch(
definition: Iterable[Union[str, Dict[Any, Any]]],
constructor_class=Union[Pipeline, None],
):
"""
Builds a branch of the tree and optionally constructs the class with the given
leafs of the branch, if constructor_class is not none. Otherwise just the
built leafs are returned.
"""
steps = [_build_step(step) for step in definition]
return steps if constructor_class is None else constructor_class(steps)
def _build_scikit_branch(
definition: Iterable[Union[str, Dict[Any, Any]]],
constructor_class=Union[Pipeline, None],
):
"""
Exactly like :func:`~_build_branch` except it's expected this is going to
be a list of tuples, where the 0th element is the name of the step.
"""
steps = [(f"step_{i}", _build_step(step)) for i, step in enumerate(definition)]
return steps if constructor_class is None else constructor_class(steps)
def _build_step(
step: Union[str, Dict[str, Dict[str, Any]]]
) -> Union[FeatureUnion, Pipeline, BaseEstimator]:
"""
Build an isolated step within a transformer list, given a dict config
Parameters
----------
step: dict/str - A dict, with a single key and associated dict
where the associated dict are parameters for the
given step.
Example: {'sklearn.preprocessing.PCA':
{'n_components': 4}
}
Gives: PCA(n_components=4)
Alternatively, 'step' can be a single string, in
which case the step will be initiated w/ default
params.
Example: 'sklearn.preprocessing.PCA'
Gives: PCA()
Returns
-------
Scikit-Learn Transformer or BaseEstimator
"""
logger.debug(f"Building step: {step}")
# Here, 'step' _should_ be a dict with a single key
# and an associated dict containing parameters for the desired
# sklearn step. ie. {'sklearn.preprocessing.PCA': {'n_components': 2}}
if isinstance(step, dict):
if len(step.keys()) != 1:
return _load_param_classes(step)
import_str = list(step.keys())[0]
StepClass: Union[FeatureUnion, Pipeline, BaseEstimator] = import_locate(
import_str
)
if StepClass is None:
raise ImportError(f'Could not locate path: "{import_str}"')
params = step.get(import_str, dict())
if hasattr(StepClass, "from_definition"):
return getattr(StepClass, "from_definition")(params)
# Load any possible classes in the params if this is a dict of maybe kwargs
if isinstance(params, dict):
params = _load_param_classes(params)
# update any param values which are string locations to functions
if isinstance(params, dict):
for param, value in params.items():
if isinstance(value, str):
possible_func = import_locate(value)
if callable(possible_func):
params[param] = possible_func
# FeatureUnion or another Pipeline transformer
if any(StepClass == obj for obj in [FeatureUnion, Pipeline, Sequential]):
# Need to ensure the parameters to be supplied are valid FeatureUnion
# & Pipeline both take a list of transformers, but with different
# kwarg, here we pull out the list to keep _build_scikit_branch generic
if "transformer_list" in params:
params["transformer_list"] = _build_scikit_branch(
params["transformer_list"], None
)
elif "steps" in params:
params["steps"] = _build_scikit_branch(params["steps"], None)
# If params is an iterable, is has to be the first argument
# to the StepClass (FeatureUnion / Pipeline); a list of transformers
elif any(isinstance(params, obj) for obj in (tuple, list)):
steps = _build_scikit_branch(params, None)
return StepClass(steps)
elif isinstance(params, dict) and "layers" in params:
params["layers"] = _build_branch(params["layers"], None)
else:
raise ValueError(
f"Got {StepClass} but the supplied parameters"
f"seem invalid: {params}"
)
return StepClass(**params)
# If step is just a string, can initialize it without any params
# ie. "sklearn.preprocessing.PCA"
elif isinstance(step, str):
Step = import_locate(step) # type: Union[FeatureUnion, Pipeline, BaseEstimator]
if hasattr(Step, "from_definition"):
return getattr(Step, "from_definition")({})
else:
return Step() if Step is not None else step
else:
raise ValueError(
f"Expected step to be either a string or a dict," f"found: {type(step)}"
)
def _build_callbacks(definitions: list):
"""
Parameters
----------
definitions: List
List of callbacks definitions
Examples
--------
>>> callbacks=_build_callbacks([{'tensorflow.keras.callbacks.EarlyStopping': {'monitor': 'val_loss,', 'patience': 10}}])
>>> type(callbacks[0])
<class 'tensorflow.python.keras.callbacks.EarlyStopping'>
Returns
-------
dict
"""
callbacks = []
for callback in definitions:
callbacks.append(_build_step(callback))
return callbacks
def _load_param_classes(params: dict):
"""
Inspect the params' values and determine if any can be loaded as a class.
if so, update that param's key value as the instantiation of the class.
Additionally, if the value of the top level is a dict, and that dict's len(.keys()) == 1
AND that key can be loaded, it's assumed to be a class whose associated values
should be passed in as kwargs.
Parameters
----------
params: dict
key value pairs of kwargs, which can have full class paths defined.
Examples
--------
>>> params = {"key1": "value1"}
>>> assert _load_param_classes(params) == params # No modifications
# Load an actual model, without any kwargs
>>> from sklearn.ensemble import RandomForestRegressor
>>> params = {"base_estimator": "sklearn.ensemble.RandomForestRegressor"}
>>> print(_load_param_classes(params))
{'base_estimator': RandomForestRegressor()}
# Load an actual model, with kwargs
>>> params = {"base_estimator": {"sklearn.ensemble.RandomForestRegressor": {"n_estimators": 20}}}
>>> print(_load_param_classes(params))
{'base_estimator': RandomForestRegressor(n_estimators=20)}
Returns
-------
dict
Updated params which has any possible class paths loaded up as instantiated
objects
"""
params = copy.copy(params)
for key, value in params.items():
# If value is a simple string, try to load the model/class
if isinstance(value, str):
Model: Union[None, BaseEstimator, Pipeline] = import_locate(value)
if Model is not None:
if hasattr(Model, "from_definition"):
params[key] = getattr(Model, "from_definition")({})
elif isinstance(Model, type) and issubclass(Model, BaseEstimator):
params[key] = Model()
# For the next bit to work, the dict must have a single key (maybe) the class path,
# and its value must be a dict of kwargs
elif (
isinstance(value, dict)
and len(value.keys()) == 1
and isinstance(value[list(value.keys())[0]], dict)
):
import_path = list(value.keys())[0]
Model = import_locate(import_path)
sub_params = value[import_path]
if hasattr(Model, "from_definition"):
params[key] = getattr(Model, "from_definition")(sub_params)
elif Model is not None and isinstance(Model, type):
if issubclass(Model, Pipeline) or issubclass(Model, Sequential):
# Model is a Pipeline, so 'value' is the definition of that Pipeline
# Can can just re-use the entry to building a pipeline.
params[key] = from_definition(value)
else:
# Call this func again, incase there is nested occurances of this problem in these kwargs
kwargs = _load_param_classes(sub_params)
params[key] = Model(**kwargs) # type: ignore
elif key == "callbacks" and isinstance(value, list):
params[key] = _build_callbacks(value)
return params
[docs]def load_params_from_definition(definition: dict) -> dict:
"""
Deserialize each value from a dictionary. Could be used for preparing kwargs for methods
Parameters
----------
definition: dict
"""
if not isinstance(definition, dict):
raise ValueError(
"Expected definition to be a dict," f"found: {type(definition)}"
)
return _load_param_classes(definition)