Source code for gordo.machine.model.utils

# -*- coding: utf-8 -*-

import typing
import functools
import logging
from typing import Optional, Union, List
from datetime import timedelta, datetime

import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin

from gordo_dataset.sensor_tag import SensorTag

logger = logging.getLogger(__name__)


[docs]def metric_wrapper(metric, scaler: Optional[TransformerMixin] = None): """ Ensures that a given metric works properly when the model itself returns a y which is shorter than the target y, and allows scaling the data before applying the metrics. Parameters ---------- metric Metric which must accept y_true and y_pred of the same length scaler : Optional[TransformerMixin] Transformer which will be applied on y and y_pred before the metrics is calculated. Must have method `transform`, so for most scalers it must already be fitted on `y`. """ @functools.wraps(metric) def _wrapper(y_true, y_pred, *args, **kwargs): if scaler: logger.debug( "Transformer provided to metrics wrapper, scaling y and y_pred before " "passing to metrics" ) y_true = scaler.transform(y_true) y_pred = scaler.transform(y_pred) return metric(y_true[-len(y_pred) :], y_pred, *args, **kwargs) return _wrapper
[docs]def make_base_dataframe( tags: typing.Union[typing.List[SensorTag], typing.List[str]], model_input: np.ndarray, model_output: np.ndarray, target_tag_list: Optional[Union[List[SensorTag], List[str]]] = None, index: typing.Optional[np.ndarray] = None, frequency: typing.Optional[timedelta] = None, ) -> pd.DataFrame: """ Construct a dataframe which has a MultiIndex column consisting of top level keys 'model-input' and 'model-output'. Takes care of aligning model output if different than model input lengths, as setting column names based on passed tags and target_tag_list. Parameters ---------- tags: List[Union[str, SensorTag]] Tags which will be assigned to ``model-input`` and/or ``model-output`` if the shapes match. model_input: np.ndarray Original input given to the model model_output: np.ndarray Raw model output target_tag_list: Optional[Union[List[SensorTag], List[str]]] Tags to be assigned to ``model-output`` if not assinged but model output matches model input, ``tags`` will be used. index: Optional[np.ndarray] The index which should be assinged to the resulting dataframe, will be clipped to the length of ``model_output``, should the model output less than its input. frequency: Optional[datetime.timedelta] The spacing of the time between points. Returns ------- pd.DataFrame """ # Set target_tag_list to default to tags if not specified. target_tag_list = target_tag_list if target_tag_list is not None else tags # match length of output, and ensure we're working with numpy arrays, not pandas. model_input = getattr(model_input, "values", model_input)[-len(model_output) :, :] model_output = getattr(model_output, "values", model_output) names_n_values = (("model-input", model_input), ("model-output", model_output)) # Define the index which all series/dataframes will share index = ( index[-len(model_output) :] if index is not None else range(len(model_output)) ) # Series to hold the start times for each point or just 'None' values start_series = pd.Series( index if isinstance(index, pd.DatetimeIndex) else (None for _ in range(len(index))), index=index, ) # Calculate the end times if possible, or also all 'None's end_series = start_series.map( lambda start: (start + frequency).isoformat() if isinstance(start, datetime) and frequency is not None else None ) # Convert to isoformatted string for JSON serialization. start_series = start_series.map( lambda start: start.isoformat() if hasattr(start, "isoformat") else None ) # The resulting DF will be multiindex, so we define and initialize it here # with the start and end times from above. columns = pd.MultiIndex.from_product((("start", "end"), ("",))) data: pd.DataFrame = pd.DataFrame( {("start", ""): start_series, ("end", ""): end_series}, columns=columns, index=index, ) # Begin looping over the model-input and model-output; mapping them into # the multiindex column dataframe, and naming their second level labels as needed. name: str values: np.ndarray for (name, values) in filter(lambda nv: nv[1] is not None, names_n_values): _tags = tags if name == "model-input" else target_tag_list # Create the second level of column names, either as the tag names # or simple range of numbers if values.shape[1] == len(_tags): # map(...) to satisfy mypy to match second possible outcome second_lvl_names = map( str, (tag.name if isinstance(tag, SensorTag) else tag for tag in _tags) ) else: second_lvl_names = map(str, range(values.shape[1])) # Columns will be multi level with the title of the output on top # and specific names below, ie. ('model-output', 'tag-0') as a column columns = pd.MultiIndex.from_tuples( (name, sub_name) for sub_name in second_lvl_names ) # Pass valudes, offsetting any differences in length compared to index, as set by model-output size other = pd.DataFrame(values[-len(model_output) :], columns=columns, index=index) data = data.join(other) return data