Source code for gordo.workflow.config_elements.normalized_config

# -*- coding: utf-8 -*-

from typing import List, Optional, Type, Dict, Any
from copy import copy

from gordo.machine.validators import fix_runtime
from gordo.workflow.workflow_generator.helpers import patch_dict
from gordo.machine import Machine
from gordo import __version__
from packaging.version import parse
from pydantic import parse_obj_as, BaseModel

from .schemas import BuilderPodRuntime, PodRuntime, Volume


def _calculate_influx_resources(nr_of_machines):
    return {
        "requests": {
            # The requests must be limited to keep the machine schedulable
            "memory": min(3000 + (220 * nr_of_machines), 28000),  # Between 3G and 28G
            "cpu": min(500 + (10 * nr_of_machines), 4000),  # Between 500m and 4000m
        },
        "limits": {
            "memory": min(3000 + (220 * nr_of_machines), 48000),
            "cpu": 10000 + (20 * nr_of_machines),
        },
    }


ConfigDict = Dict[str, Any]


[docs]class NormalizedConfig: """ Handles the conversion of a single Machine representation in config format and updates it with any features which are 'left out' inside of ``globals`` key or the default config globals held here. """ SPLITED_DOCKER_IMAGES: ConfigDict = { "runtime": { "deployer": {"image": "gordo-deploy"}, "server": {"image": "gordo-model-server"}, "prometheus_metrics_server": {"image": "gordo-model-server"}, "builder": {"image": "gordo-model-builder"}, "client": {"image": "gordo-client"}, } } UNIFYING_GORDO_VERSION: str = "1.2.0" UNIFIED_DOCKER_IMAGES: ConfigDict = { "runtime": { "deployer": {"image": "gordo-base"}, "server": {"image": "gordo-base"}, "prometheus_metrics_server": {"image": "gordo-base"}, "builder": {"image": "gordo-base"}, "client": {"image": "gordo-base"}, } } DEFAULT_CONFIG_GLOBALS: ConfigDict = { "runtime": { "reporters": [], "server": { "resources": { "requests": {"memory": 3000, "cpu": 1000}, "limits": {"memory": 6000, "cpu": 2000}, } }, "prometheus_metrics_server": { "resources": { "requests": {"memory": 200, "cpu": 100}, "limits": {"memory": 1000, "cpu": 200}, } }, "builder": { "resources": { "requests": {"memory": 3900, "cpu": 1001}, "limits": {"memory": 31200, "cpu": 1001}, }, "remote_logging": {"enable": False}, }, "client": { "resources": { "requests": {"memory": 3500, "cpu": 100}, "limits": {"memory": 4000, "cpu": 2000}, }, "max_instances": 30, }, "influx": {"enable": True}, }, "evaluation": { "cv_mode": "full_build", "scoring_scaler": "sklearn.preprocessing.MinMaxScaler", "metrics": [ "explained_variance_score", "r2_score", "mean_squared_error", "mean_absolute_error", ], }, } def __init__( self, config: dict, project_name: str, gordo_version: Optional[str] = None, model_builder_env: Optional[dict] = None, ): if gordo_version is None: gordo_version = __version__ default_globals = self.get_default_globals(gordo_version) default_globals["runtime"]["influx"][ # type: ignore "resources" ] = _calculate_influx_resources( # type: ignore len(config["machines"]) ) passed_globals = config.get("globals", dict()) # keeping it for back-compatibility if model_builder_env is not None and not ( passed_globals and "runtime" in passed_globals and "builder" in passed_globals["runtime"] and "env" in passed_globals["runtime"]["builder"] ): if "builder" not in default_globals["runtime"]: default_globals["runtime"]["builder"] = {} default_globals["runtime"]["builder"]["env"] = model_builder_env patched_globals = patch_dict(default_globals, passed_globals) patched_globals = self.prepare_patched_globals(patched_globals) self.project_name = project_name self.machines: List[Machine] = [ Machine.from_config( conf, project_name=project_name, config_globals=patched_globals ) for conf in config["machines"] ] self.globals: dict = patched_globals
[docs] @staticmethod def prepare_runtime(runtime: dict) -> dict: def prepare_pod_runtime(name: str, schema: Type[BaseModel] = PodRuntime): if name in runtime: # TODO handling pydantic.ValidationError pod_runtime = parse_obj_as(schema, runtime[name]) runtime[name] = pod_runtime.dict(exclude_none=True) prepare_pod_runtime("builder", BuilderPodRuntime) if "volumes" in runtime: volumes = parse_obj_as(List[Volume], runtime["volumes"]) runtime["volumes"] = [volume.dict(exclude_none=True) for volume in volumes] return runtime
[docs] @classmethod def prepare_patched_globals(cls, patched_globals: dict) -> dict: runtime = fix_runtime(patched_globals.get("runtime")) runtime = cls.prepare_runtime(runtime) patched_globals["runtime"] = runtime return patched_globals
[docs] @classmethod def get_default_globals(cls, gordo_version: str) -> dict: current_version = parse(gordo_version) unifying_version = parse(cls.UNIFYING_GORDO_VERSION) if current_version >= unifying_version: docker_images = cls.UNIFIED_DOCKER_IMAGES else: docker_images = cls.SPLITED_DOCKER_IMAGES default_globals = cls.DEFAULT_CONFIG_GLOBALS return patch_dict(copy(default_globals), docker_images)