# -*- coding: utf-8 -*-
"""
This module contains code for generating the Gordo server Flask application.
Running this module will run the application using Flask's development webserver.
Gunicorn can be used to run the application as `gevent` async workers by using the
:func:`~gordo.server.server.run_server` function.
"""
import os
import json
import logging
import timeit
import typing
import subprocess
from functools import wraps
import yaml
from flask import Flask, g, request, current_app, make_response, jsonify
from typing import Optional, Any, Dict
from gordo.server import views
from gordo.dependencies import configure_once
from gordo import __version__
from prometheus_client import CollectorRegistry
from .prometheus import GordoServerPrometheusMetrics
logger = logging.getLogger(__name__)
[docs]def enable_prometheus():
return os.getenv("ENABLE_PROMETHEUS", "false") != "false"
[docs]class Config:
"""Server config"""
def __init__(self):
self.MODEL_COLLECTION_DIR_ENV_VAR = "MODEL_COLLECTION_DIR"
self.EXPECTED_MODELS = yaml.safe_load(os.getenv("EXPECTED_MODELS", "[]"))
self.ENABLE_PROMETHEUS = enable_prometheus()
self.PROJECT = os.getenv("PROJECT")
[docs]def adapt_proxy_deployment(wsgi_app: typing.Callable) -> typing.Callable:
"""
Decorator specific to fixing behind-proxy-issues when on Kubernetes and
using Envoy proxy.
Parameters
----------
wsgi_app: typing.Callable
The underlying WSGI application of a flask app, for example
Notes
-----
Special note about deploying behind Ambassador, or prefixed proxy paths in general:
When deployed on kubernetes/ambassador there is a prefix in-front of the
server. ie::
/gordo/v0/some-project-name/some-target
The server itself only knows about routes to the right of such a prefix:
such as ``/metadata`` or ``/predictions`` when in reality, the full path is::
/gordo/v0/some-project-name/some-target/metadata
This is solved by getting the current application's assigned prefix,
where ``HTTP_X_ENVOY_ORIGINAL_PATH`` is the *full* path, including the prefix.
and ``PATH_INFO`` is the actual relative path the server knows about.
This function wraps the WSGI app itself to map the current full path
to the assigned route function.
ie. ``/metadata`` -> metadata route function, by default, but updates
``/gordo/v0/some-project-name/some-target/metadata`` -> metadata route function
Returns
-------
Callable
Example
-------
>>> app = Flask(__name__)
>>> app.wsgi_app = adapt_proxy_deployment(app.wsgi_app)
"""
@wraps(wsgi_app)
def wrapper(environ, start_response):
# Script name can be "/gordo/v0/some-project-name/some-target/metadata"
script_name = environ.get("HTTP_X_ENVOY_ORIGINAL_PATH", "")
if script_name:
# PATH_INFO could be either "/" or some local route such as "/metadata"
path_info = environ.get("PATH_INFO", "")
if path_info.rstrip("/"):
# PATH_INFO must be something like "/metadata" or other local path
# To figure out the prefix/script_name we remove it from the
# full HTTP_X_ENVOY_ORIGINAL_PATH, so that something such as
# /gordo/v0/some-project-name/some-target/metadata, becomes
# /gordo/v0/some-project-name/some-target/
script_name = script_name.replace(path_info, "")
environ["SCRIPT_NAME"] = script_name
# Now we can just ensure the PATH_INFO reflects the locally known path
# such as /metadata and not /gordo/v0/some-project-name/some-target/metadata
if path_info.startswith(script_name):
environ["PATH_INFO"] = path_info[len(script_name) :]
scheme = environ.get("HTTP_X_FORWARDED_PROTO", "")
if scheme:
environ["wsgi.url_scheme"] = scheme
return wsgi_app(environ, start_response)
return wrapper
[docs]def create_prometheus_metrics(
project: Optional[str] = None, registry: Optional[CollectorRegistry] = None
) -> GordoServerPrometheusMetrics:
arg_labels = [("gordo_name", "model")]
info = {"version": __version__}
if project is not None:
info["project"] = project
else:
arg_labels.append(("gordo_project", "project"))
return GordoServerPrometheusMetrics(
args_labels=arg_labels,
info=info,
ignore_paths=["/healthcheck"],
registry=registry,
)
[docs]def build_app(
config: Optional[Dict[str, Any]] = None,
prometheus_registry: Optional[CollectorRegistry] = None,
):
"""
Build app and any associated routes
"""
configure_once()
app = Flask(__name__)
app.config.from_object(Config())
if config is not None:
app.config.update(**config)
app.register_blueprint(views.base_blueprint)
app.register_blueprint(views.anomaly_blueprint)
app.wsgi_app = adapt_proxy_deployment(app.wsgi_app) # type: ignore
app.url_map.strict_slashes = False # /path and /path/ are ok.
if app.config["ENABLE_PROMETHEUS"]:
prometheus_metrics = create_prometheus_metrics(
project=app.config.get("PROJECT"), registry=prometheus_registry
)
prometheus_metrics.prepare_app(app)
elif prometheus_registry is not None:
logger.warning("Ignoring non empty prometheus_registry argument")
@app.before_request
def _start_timer():
g.start_time = timeit.default_timer()
@app.before_request
def _set_revision_and_collection_dir():
g.collection_dir = os.environ[
current_app.config["MODEL_COLLECTION_DIR_ENV_VAR"]
]
g.current_revision = os.path.basename(g.collection_dir)
# If a specific revision was requested, update collection_dir
g.revision = request.args.get("revision") or request.headers.get("revision")
if g.revision:
g.collection_dir = os.path.join(g.collection_dir, "..", g.revision)
try:
os.listdir(g.collection_dir) # List dir to ensure it exists
except FileNotFoundError:
return make_response(
jsonify({"error": f"Revision '{g.revision}' not found."}), 410
)
else:
g.revision = g.current_revision
@app.after_request
def _revision_used(response):
if response.is_json:
data = response.get_json()
data["revision"] = g.revision
response.set_data(json.dumps(data).encode())
response.headers["revision"] = g.revision
return response
@app.after_request
def _log_time_taken(response):
runtime_s = timeit.default_timer() - g.start_time
logger.debug(f"Total runtime for request: {runtime_s}s")
response.headers["Server-Timing"] = f"request_walltime_s;dur={runtime_s}"
return response
@app.route("/healthcheck")
def base_healthcheck():
return "", 200
@app.route("/server-version")
def server_version():
return jsonify({"version": __version__})
return app
[docs]def run_cmd(cmd):
"""
Run a shell command and handle CalledProcessError and OSError types
Note
----
This function is abstracted from :func:`~gordo.server.server.run_server`
in order to test the calling of commands that would allow the subprocess call to
break, depending on how it is parameterized. For example, calling this without
sending `stderr` to stdout will cause a segmentation fault when calling an
executable that does not exist.
"""
subprocess.check_call(cmd, stderr=subprocess.STDOUT)
[docs]def run_server(
host: str,
port: int,
workers: int,
log_level: str,
config_module: Optional[str] = None,
worker_connections: Optional[int] = None,
threads: Optional[int] = None,
worker_class: str = "gthread",
server_app: str = "gordo.server.server:build_app()",
):
"""
Run application with Gunicorn server using Gevent Async workers
Parameters
----------
host: str
The host to run the server on.
port: int
The port to run the server on.
workers: int
The number of worker processes for handling requests.
log_level: str
The log level for the `gunicorn` webserver. Valid log level names can be found
in the [gunicorn documentation](http://docs.gunicorn.org/en/stable/settings.html#loglevel).
config_module: str
The config module. Will be passed with `python:` [prefix](https://docs.gunicorn.org/en/stable/settings.html#config).
worker_connections: int
The maximum number of simultaneous clients per worker process.
threads: str
The number of worker threads for handling requests.
worker_class: str
The type of workers to use.
server_app: str
The application to run
"""
cmd = [
"gunicorn",
"--bind",
f"{host}:{port}",
"--log-level",
log_level,
"--error-logfile",
"-",
"--access-logfile",
"-",
"--worker-class",
worker_class,
"--worker-tmp-dir",
"/dev/shm",
"--workers",
str(workers),
]
if config_module is not None:
cmd.extend(("--config", "python:" + config_module))
if worker_class == "gthread":
if threads is not None:
cmd.extend(("--threads", str(threads)))
else:
if worker_connections is not None:
cmd.extend(("--worker-connections", str(worker_connections)))
cmd.append(server_app)
run_cmd(cmd)
if __name__ == "__main__":
app = build_app()
# Run development webserver
app.run("0.0.0.0", 5555, debug=True, threaded=False)