All pages
Powered by GitBook
1 of 6

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Monitoring Anomalies with a Custom Metric

Estimated Completion Time: 18m.

Overview

In this tutorial, you will learn how to create a custom anomaly detection metric for a specific use case.

Let's take a problem described in the previous Train & Deploy Census Income Classification Model tutorial as a use case and census income dataset as a data source. We will monitor a model that classifies whether the income of a given person exceeds $50.000 per year.

By the end of this tutorial you will know how to:

  • Train a monitoring model

  • Deploy of a monitoring model with SDK

  • Manage сustom metrics with UI

  • Upload a monitoring model with CLI

Prerequisites

For this tutorial, you need to have Hydrosphere Platform deployed and Hydrosphere CLI (hs) along with Python SDK (hydrosdk) _**_installed on your local machine. If you don't have them yet, please follow these guides first:

  • Platform Installation

  • CLI

  • Python SDK

This tutorial is a sequel to the previous tutorial. Please complete it first to have a prepared dataset and a trained model deployed to the cluster:

  • Train & Deploy Census Income Classification Model

Train a Monitoring Model

We start with the steps we used for the common model. First, let's create a directory structure for our monitoring model with an /src folder containing an inference scriptfunc_main.py:

mkdir -p monitoring_model/src
cd monitoring_model
touch src/func_main.py

As a monitoring metric, we will use IsolationForest. You can learn how it works in its documentation.

To make sure that our monitoring model will see the same data as our prediction model, we are going to apply the training data that was saved previously for our monitoring model.

import joblib
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest

X_train = pd.read_csv('data/train.csv', index_col=0)

monitoring_model = IForest(contamination=0.04)

train_pred = monitoring_model.fit_predict(X_train) 

train_scores = monitoring_model.decision_function(X_train)

plt.hist(
    train_scores,
    bins=30, 
    alpha=0.5,
    density=True, 
    label="Train data outlier scores"
)

plt.vlines(monitoring_model.threshold_, 0, 1.9, label = "Threshold for marking outliers")
plt.gcf().set_size_inches(10, 5)
plt.legend()

dump(monitoring_model, "monitoring_model/monitoring_model.joblib")
Distribution of outlier scores

This is what the distribution of our inliers looks like. By choosing a contamination parameter we can adjust a threshold that will separate inliers from outliers accordingly. You have to be thorough in choosing it to avoid critical prediction mistakes. Otherwise, you can also stay with 'auto'. To create a monitoring metric, we have to deploy that IsolationForest model as a separate model on the Hydrosphere platform. Let's save a trained model for serving.

Deploy a Monitoring Model with SDK

First, let's create a new directory where we will store our inference script with declared serving function and its definitions. Put the following code inside the src/func_main.py file:

func_main.py
import numpy as np
from joblib import load

monitoring_model = load('/model/files/monitoring_model.joblib')

features = ['age', 'workclass', 'fnlwgt',
            'education', 'educational-num', 'marital-status',
            'occupation', 'relationship', 'race', 'gender',
            'capital-gain', 'capital-loss', 'hours-per-week',
            'native-country']

def predict(**kwargs):
    x = np.array([kwargs[feature] for feature in features]).reshape(1, len(features))
    predicted = monitoring_model.decision_function(x)

    return {"value": predicted.item()}

Next, we need to install the necessary libraries. Create a requirements.txt and add the following libraries to it:

joblib==0.13.2
numpy==1.16.2
scikit-learn==0.23.1

Just like with common models, we can use SDK to upload our monitoring model and bind it to the trained one. The steps are almost the same, but with some slight differences. First, since we want to predict the anomaly score instead of sample class, we need to change the type of output field from 'int64' to 'float64'.

Secondly, we need to apply a couple of new methods to create a metric. MetricSpec is responsible for creating a metric for a specific model, with specific MetricSpecConfig.

from hydrosdk.monitoring import MetricSpec, MetricSpecConfig, ThresholdCmpOp

path_mon = "monitoring_model/"
payload_mon = ['src/func_main.py', 
               'monitoring_model.joblib', 'requirements.txt']

monitoring_signature = SignatureBuilder('predict') 
for i in X_train.columns:
    monitoring_signature.with_input(i, 'int64', 'scalar')
monitor_signature = monitoring_signature.with_output('value', 'float64', 'scalar').build()

monitor_contract = ModelContract(predict=monitor_signature)

monitoring_model_local = LocalModel(name="adult_monitoring_model", 
                              install_command = 'pip install -r requirements.txt',
                              contract=monitor_contract,
                              runtime=DockerImage("hydrosphere/serving-runtime-python-3.7", "2.3.2", None),
                              payload=payload_mon,
                              path=path_mon)
monitoring_upload = monitoring_model_local.upload(cluster)
monitoring_upload.lock_till_released()

metric_config = MetricSpecConfig(monitoring_upload.id, monitoring_model.threshold_, ThresholdCmpOp.LESS)
metric_spec = MetricSpec.create(cluster, "custom_metric", model_find.id, metric_config)

Anomaly scores are obtained through traffic shadowing inside the Hydrosphere's engine after making a Servable, so you don't need to perform any additional manipulations.

Managing Custom Metrics with UI

Go to the UI to observe and manage all your models. Here you will find 3 models on the left panel:

  • adult_model - a model that we trained for prediction in the previous tutorial

  • adult_monitoring_model - our monitoring model

  • adult_model_metric - a model that was created by Automatic Outlier Detection

Click on the trained model and then on Monitoring. On the monitoring dasboard you now have two external metrics: the first one is auto_od_metric that was automatically generated by Automatic Outlier Detection, and the new one is custom_metric that we have just created. You can also change settings for existing metrics and configure the new ones in the Configure Metrics section:

During the prediction, you will get anomaly scores for each sample in the form of a chart with two lines. The curved line shows scores, while the horizontal dotted one is our threshold. When the curve intersects the threshold, it might be a sign of potential anomalousness. However, this is not always the case, since there are many factors that might affect this, so be careful about your final interpretation.

Uploading a Monitoring model with CLI

Just like in the case with all other types of models, we can define and upload a monitoring model using a resource definition. We have to pack our model with a model definition, like in the previous tutorial.

kind: Model
name: "adult_monitoring_model"
payload:
  - "src/"
  - "requirements.txt"
  - "monitoring_model.joblib"
runtime: "hydrosphere/serving-runtime-python-3.7:2.3.2"
install-command: "pip install -r requirements.txt"
contract:
  name: "predict"
  inputs:
    age:
      shape: scalar
      type: int64
    workclass:
      shape: scalar
      type: int64
    education:
      shape: scalar
      type: int64
    marital_status:
      shape: scalar
      type: int64
    occupation:
      shape: scalar
      type: int64
    relationship:
      shape: scalar
      type: int64
    race:
      shape: scalar
      type: int64
    sex:
      shape: scalar
      type: int64
    capital_gain:
      shape: scalar
      type: int64
    capital_loss:
      shape: scalar
      type: int64
    hours_per_week:
      shape: scalar
      type: int64
    country:
      shape: scalar
      type: int64
    classes:
      shape: scalar
      type: int64
  outputs:
    value:
      shape: scalar
      type: float64

Inputs of this model are the inputs of the target monitored model plus the outputs of that model. We will use the value field as an output for the monitoring model. The final directory structure should look like this:

.
├── monitoring_model.joblib
├── requirements.txt
├── serving.yaml
└── src
    └── func_main.py

From that folder, upload the model to the cluster:

hs apply -f serving.yaml

Now we have to attach the deployed Monitoring model as a custom metric. Let's create a monitoring metric for our pre-deployed classification model in the UI:

  1. From the Models section, select the target model you would like to deploy and select the desired model version.

  2. Open the Monitoring tab.

  3. At the bottom of the page click the Configure Metric button.

  4. From the opened window click the Add Metric button.

    1. Specify the name of the metric.

    2. Choose the monitoring model.

    3. Choose the version of the monitoring model.

    4. Select a comparison operator Greater. This means that if you have a metric value greater than a specified threshold, an alarm should be fired.

    5. Set the threshold value. In this case, it should be equal to the value of monitoring_model.threshold_.

    6. Click the Add Metric button.

That's it. Now you have a monitored income classifier deployed on the Hydrosphere platform.

Tutorials

Contents

  • A/B Deployment and Traffic Shadowing

  • Using Deployment Configurations

  • Train & Deploy Census Income Classification Model

  • Monitor Anomalies with a Custom Metric

  • Monitor External Models

Overview

This section contains tutorials to help you get started with the Hydrosphere platform. A tutorial shows how to accomplish a goal rather than a single basic task.

Typically, a tutorial has several sections. When a tutorial section has several pieces of code to illustrate it, they can be shown as a group of tabs that you can switch between.

For guides on performing more basic technical steps, please look in the How-To section:

  • How-To

Using Deployment Configurations

Estimated completion time: 11m.

This tutorial is relevant only for Kubernetes installation of Hydrosphere. Please refer to How to Install Hydrosphere on Kubernetes cluster.

Overview

In this tutorial, you will learn how to configure deployed Applications.

By the end of this tutorial you will know how to:

  • Train and upload an example model version

  • Create a Deployment Configuration

  • Create an Application from the uploaded model version with previously created deployment configuration

  • Examine settings of a Kubernetes cluster

Prerequisites

  • Hydrosphere platform installed in Kubernetes cluster

  • Python SDK / CLI

Upload a Model

In this section, we describe the resources required to create and upload an example model used in further sections. If you have no prior experience with uploading models to the Hydrosphere platform we suggest that you visit the Getting Started Tutorial.

Here are the resources used to train sklearn.ensemble.GradientBoostingClassifier and upload it to the Hydrosphere cluster.

requirements.txt is a list of Python dependencies used during the process of building model image.

numpy~=1.18
scipy==1.4.1
scikit-learn~=0.23

serving.yaml is a resource definition that describes how model should be built and uploaded to Hydrosphere platform.

serving.yaml
kind: Model
name: my-model
runtime: hydrosphere/serving-runtime-python-3.7:2.3.2
install-command: pip install -r requirements.txt
payload:
  - src/
  - requirements.txt
  - model.joblib
contract:
  name: infer
  inputs:
    x:
      shape: [30]
      type: double
  outputs:
    y:
      shape: scalar
      type: int64

train.py is used to generate a model.joblib which is loaded from func_main.py during model serving.

Run python train.py to generate model.joblib

train.py
import joblib
import pandas as pd
from sklearn.datasets import make_blobs
from sklearn.ensemble import GradientBoostingClassifier

# initialize data
X, y = make_blobs(n_samples=3000, n_features=30)

# create a model
model = GradientBoostingClassifier(n_estimators=200)
model.fit(X, y)

# Save training data and model
pd.DataFrame(X).to_csv("training_data.csv", index=False)
joblib.dump(model, "model.joblib")

func_main.py is a script which serves requests and produces responses.

func_main.py
import joblib
import numpy as np

# Load model once
model = joblib.load("/model/files/model.joblib")


def infer(x):
    # Make a prediction
    y = model.predict(x[np.newaxis])

    # Return the scalar representation of y
    return {"y": np.asscalar(y)}

Our folder structure should look like this:

dep_config_tutorial
├── model.joblib
├── train.py
├── requirements.txt
├── serving.yaml
└── src
    └── func_main.py

Do not forget to run python train.py to generate model.joblib!

After we have made sure that all files are placed correctly, we can upload the model to the Hydrosphere platform by running hs upload from the command line.

hs upload

Create a Deployment Configuration

Next, we are going to create and upload an instance of Deployment Configuration to the Hydrosphere platform.

Deployment Configurations describe with which Kubernetes settings Hydrosphere should deploy servables. You can specify Pod Affinity and Tolerations, the number of desired pods in deployment, ResourceRequirements, and Environment Variables for the model container, and HorizontalPodAutoScaler settings.

Created Deployment Configurations can be attached to Servables and Model Variants inside of Application.

Deployment Configurations are immutable and cannot be changed after they've been uploaded to the Hydrosphere platform.

You can create and upload Deployment Configuration to Hydrosphere via YAML Resource definition or via Python SDK.

For this tutorial, we'll create a deployment configuration with 2 initial pods per deployment, HPA, and FOO environment variable with value bar.

Create the deployment configuration resource definition:

deployment_configuration.yaml
kind: DeploymentConfiguration
name: my-dep-config
deployment:
  replicaCount: 2
hpa:
  minReplicas: 2
  maxReplicas: 4
  cpuUtilization: 70
container:
  env:
    FOO: bar

To upload it to the Hydrosphere platform, run:

hs apply -f deployment_configuration.yaml
from hydrosdk import Cluster, DeploymentConfigurationBuilder

cluster = Cluster("http://localhost")

dep_config_builder = DeploymentConfigurationBuilder("my-dep-config", cluster)
dep_config = dep_config_builder. \
    with_replicas(replica_count=2). \
    with_env({"FOO":"bar"}). \
    with_hpa(max_replicas=4,
             min_replicas=2,
             target_cpu_utilization_percentage=70).build()

Create an Application

Create the application resource definition:

application.yaml
kind: Application
name: my-app-with-config
pipeline:
  - - model: my-model:1
      weight: 100
      deploymentConfiguartion: my-config

To upload it to the Hydrosphere platform, run:

hs apply -f application.yaml
from application import ApplicationBuilder, ExecutionStageBuilder
from hydrosdk import ModelVersion, Cluster, DeploymentConfiguration

cluster = Cluster('http:\\localhost')
my_model = ModelVersion.find(cluster, "my-model", 1)
my_config = DeploymentConfiguration.find(cluster, "my-config")

stage = ExecutionStageBuilder().with_model_variant(model_version=my_model,
                                                   weight=100,
                                                   deployment_configuration=my_config).build()

app = ApplicationBuilder(cluster, "my-app-with-config").with_stage(stage).build()

Examine Kubernetes Settings

Replicas

You can check whether with_replicas was successful by calling kubectl get deployment -A -o wide and checking the READYcolumn.

HPA

To check whether with_hpa was successful you should get a list of all created Horizontal Pod Autoscaler Resources. You can do so by calling kubectl get hpa -A

The output is similar to:

NAME                        REFERENCE                                            TARGETS    MINPODS   MAXPODS   REPLICAS   AGE
my-model-1-tumbling-star    CrossVersionObjectReference/my-model-1-tumbling-star 20%/70%    2         4         2          1d

Environment Variables

To list all environment variables run kubectl exec my-model-1-tumbling-star -it /bin/bash and then execute the printenv command which prints ann system variables.

The output is similar to:

MY_MODEL_1_TUMBLING_STAR_SERVICE_PORT_GRPC=9091
...
FOO=bar

A/B Analysis for a Recommendation Model

Estimated completion time: 14 min.

Overview

In this tutorial, you will learn how to retrospectively compare the behavior of two different models.

By the end of this tutorial you will know how to:

  • Set up an A/B application

  • Analyze production data

Prerequisites

Set Up an A/B Application

Prepare a model for uploading

Upload Model A

We train and upload our model with 5 components as movie_rec:v1

Upload Model B

Next, we train and upload a new version of our original model with 20 components as movie_rec:v2

We can check that we have multiple versions of our model by running:

Create an Application

To create an A/B deployment we need to create an with a single execution stage consisting of two model variants. These model variants are our and correspondingly.

The following code will create such an application:

Invoking movie-ab-app

We'll simulate production data flow by repeatedly asking our model for recommendations.

Analyze production data

Read Data from parquet

Each request-response pair is stored in S3 (or in minio if deployed locally) in parquet files. We'll use fastparquet package to read these files and use s3fs package to connect to S3.

The only file in the feature-lake folder is ['feature-lake/movie_rec']. Data stored in S3 is stored under the following path: feature-lake/MODEL_NAME/MODEL_VERSION/YEAR/MONTH/DAY/*.parquet

Now that we have loaded the data, we can start analyzing it.

Compare production data with new labeled data

To compare differences between model versions we'll use two metrics:

  1. Latency - we compare the time delay between the request received and the response produced.

  2. Mean Top-3 Hit Rate - we compare recommendations to those the user has rated. If they match then increase the hit rate by 1. Do this for the complete test set to get the hit rate.

Latencies

Let's calculate the 95th percentile of our latency distributions per model version and plot them. Latencies are stored in the _hs_latency column in our dataframes.

In our case, the output was 13.0ms against 12.0ms. Results may differ.

Furthermore, we can visualize our data. To plot latency distribution we'll use the Matplotlib library.

Mean Top-3 Hit Rate

Next, we'll calculate hit rates. To do so, we need new labeled data. For recommender systems, this data is usually available after a user has clicked\watched\liked\rated the item we've recommended to him. We'll use the test part of movielens as labeled data.

To measure how well our models were recommending movies we'll use a hit rate metric. It calculates how many movies users have watched and rated with 4 or 5 out of 3 movies recommended to him.

In our case the mean_hit_rate variable is {'v1': 0.137, 'v2': 0.141} . Which means that the second model version is better in terms of hit rate.

You have successfully completed the tutorial! 🚀

Now you know how to read and analyze automatically stored data.

requirements.txt
lightfm==1.15
numpy~=1.18
joblib~=0.15
train_model.py
import sys

import joblib
from lightfm import LightFM
from lightfm.datasets import fetch_movielens

if __name__ == "__main__":
    no_components = int(sys.argv[1])
    print(f"Number of components is set to {no_components}")

    # Load the MovieLens 100k dataset. Only five
    # star ratings are treated as positive.
    data = fetch_movielens(min_rating=5.0)

    # Instantiate and train the model
    model = LightFM(no_components=no_components, loss='warp')
    model.fit(data['train'], epochs=30, num_threads=2)

    # Save the model
    joblib.dump(model, "model.joblib")
src/func_main.py
import joblib
import numpy as np
from lightfm import LightFM

# Load model once
model: LightFM = joblib.load("/model/files/model.joblib")

# Get all item ids
item_ids = np.arange(0, 1682)


def get_top_rank_item(user_id):
    # Calculate scores per item id
    y = model.predict(user_ids=[user_id], item_ids=item_ids)

    # Pick top 3
    top_3 = y.argsort()[:-4:-1]

    # Return {'top_1': ..., 'top_2': ..., 'top_3': ...}
    return dict([(f"top_{i + 1}", item_id) for i, item_id in enumerate(top_3)])
setup_runtime.sh
apt install --yes gcc
pip install -r requirements.txt
serving.yaml
kind: Model
name: movie_rec
runtime: hydrosphere/serving-runtime-python-3.7:2.3.2
install-command: chmod a+x setup_runtime.sh && ./setup_runtime.sh
payload:
  - src/
  - requirements.txt
  - model.joblib
  - setup_runtime.sh
contract:
  name: get_top_rank_item
  inputs:
    user_id:
      shape: scalar
      type: int64
  outputs:
    top_1:
      shape: scalar
      type: int64
    top_2:
      shape: scalar
      type: int64
    top_3:
      shape: scalar
      type: int64
python train_model.py 5
hs upload
python train_model.py 20
hs upload
hs model list
from hydrosdk import ModelVersion, Cluster
from hydrosdk.application import ApplicationBuilder, ExecutionStageBuilder

cluster = Cluster('http://localhost')

model_a = ModelVersion.find(cluster, "movie_rec", 1)
model_b = ModelVersion.find(cluster, "movie_rec", 2)

stage_builder = ExecutionStageBuilder()
stage = stage_builder.with_model_variant(model_version=model_a, weight=50). \
    with_model_variant(model_version=model_b, weight=50). \
    build()

app = ApplicationBuilder(cluster, "movie-ab-app").with_stage(stage).build()
import numpy as np
from hydrosdk import Cluster, Application
from tqdm.auto import tqdm

cluster = Cluster("http://localhost", grpc_address="localhost:9090")

app = Application.find(cluster, "movie-ab-app")
predictor = app.predictor()

user_ids = np.arange(0, 943)

for uid in tqdm(np.random.choice(user_ids, 2000, replace=True)):
    result = predictor.predict({"user_id": uid})
import fastparquet as fp
import s3fs

s3 = s3fs.S3FileSystem(client_kwargs={'endpoint_url': 'http://localhost:9000'},
                       key='minio', secret='minio123')

# The data is stored in `feature-lake` bucket by default 
# Lets print files in this folder
s3.ls("feature-lake/")
# We fetch all parquet files with glob
version_1_paths = s3.glob("feature-lake/movie_rec/1/*/*/*/*.parquet")
version_2_paths = s3.glob("feature-lake/movie_rec/2/*/*/*/*.parquet")

myopen = s3.open

# use s3fs as the filesystem to read parquet files into a pandas dataframe
fp_obj = fp.ParquetFile(version_1_paths, open_with=myopen)
df_1 = fp_obj.to_pandas()

fp_obj = fp.ParquetFile(version_2_paths, open_with=myopen)
df_2 = fp_obj.to_pandas()
latency_v1 = df_1._hs_latency
latency_v2 = df_2._hs_latency

p95_v1 =  latency_v1.quantile(0.95)
p95_v2 = latency_v2.quantile(0.95)
import matplotlib.pyplot as plt

# Resize the canvas
plt.gcf().set_size_inches(10, 5)

# Plot latency histograms
plt.hist(latency_v1, range=(0, 20),
 normed=True, bins=20, alpha=0.6, label="Latency Model v1")
plt.hist(latency_v2, range=(0, 20),
 normed=True, bins=20, alpha=0.6, label="Latency Model v2")

# Plot previously computed percentiles
plt.vlines(p95_v1, 0, 0.1, color="#1f77b4",
 label="95th percentile for model version 1")
plt.vlines(p95_v2, 0, 0.1, color="#ff7f0e",
 label="95th percentile for model version 2")

plt.legend()
plt.title("Latency Comparison between v1 and v2")
from lightfm.datasets import fetch_movielens

test_data = fetch_movielens(min_rating=5.0)['test']
test_data = test_data.toarray()

# Dict with model version as key and mean hit rate as value
mean_hit_rate = {}
for version, df in {"v1": df_1, "v2": df_2}.items():

    # Dict with user id as key and hit rate as value
    hit_rates = {}
    for x in df.itertuples():
        hit_rates[x.user_id] = 0

        for top_x in ("top_1", "top_2", "top_3"):
            hit_rates[x.user_id] += test_data[x.user_id, getattr(x, top_x)] >= 4

    mean_hit_rate[version] = round(sum(hit_rates.values()) / len(hit_rates), 3)
Installed Hydrosphere platform
Python SDK
Application
Model A
Model B

Train & Deploy Census Income Classification Model

Overview

In this tutorial, you will learn how to train and deploy a model for a classification task based on the Adult Dataset. The main steps of this process are data preparation, training a model, uploading a model to the cluster, and making a prediction on test samples.

By the end of this tutorial you will know how to:

  • Prepare data

  • Train a model

  • Deploy a model with SDK

  • Explore models via UI

  • Deploy a model with CLI and resource definition

Prerequisites

For this tutorial, you need to have Hydrosphere Platform deployed and Hydrosphere CLI (hs) along with Python SDK (hydrosdk) _**_installed on your local machine. If you don't have them yet, please follow these guides first:

  • Platform Installation

  • CLI

  • Python SDK

For this tutorial, you can use a local cluster. To ensure that, run hs cluster in your terminal. This command will show the name and server address of a cluster you’re currently using. If it shows that you're not using a local cluster, you can configure one with the following commands:

hs cluster add --name local --server http://localhost
hs cluster use local

Data preparation

Model training always requires some amount of initial preparation, most of which is data preparation. The Adult Dataset consists of 14 descriptors, 5 of which are numerical and 9 categorical, including the class column.

Categorical features are usually presented as strings. This is not an appropriate data type for sending it into a model, so we need to transform it first. We can remove rows that contain question marks in some samples. Once the preprocessing is complete, you can delete the DataFrame (df):

import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder  

df = pd.read_csv('adult.csv', sep = ',').replace({'?':np.nan}).dropna()

categorical_encoder = LabelEncoder()
categorical_features = ["workclass", "education", "marital-status", 
                        "occupation", "relationship", "race", "gender", 
                        "capital-gain", "capital-loss", "native-country", 'income']

numerical_features = ['age', 'fnlwgt', 'educational-num', 
                      'capital-gain', 'capital-loss', 'hours-per-week']

for column in categorical_features:
    df[column] = categorical_encoder.fit_transform(df[column])

X, y = df.drop('income', axis = 1), df['income']

del df

Training a model

There are many classifiers that you can potentially use for this step. In this example, we’ll apply the Random Forest classifier. After preprocessing, the dataset will be separated into train and test subsets. The test set will be used to check whether our deployed model can process requests on the cluster. After the training step, we can save a model with joblib.dump() in a model/ model folder.

 from sklearn.ensemble import RandomForestClassifier
 from sklearn.model_selection import train_test_split
 import joblib 

 train_X, test_X, train_y, test_y = train_test_split(X, y.astype(int), 
                                                    stratify=y,
                                                    test_size=0.2, 
                                                    random_state=random_seed)
 clf = RandomForestClassifier(n_estimators=20, 
                              max_depth=10,
                              n_jobs=5, 
                              random_state=random_seed).fit(train_X, train_y)

 joblib.dump(clf, 'model/model.joblib')

Deploy a model with SDK

The easiest way to upload a model to your cluster is by using Hydrosphere SDK. SDK allows Python developers to configure and manage the model lifecycle on the Hydrosphere platform. Before uploading a model, you need to connect to your cluster:

from hydrosdk.contract import SignatureBuilder, ModelContract
from hydrosdk.cluster import Cluster

cluster = Cluster("http-cluster-address", 
                 grpc_address="grpc-cluster-address", ssl=True,
                 grpc_credentials=ssl_channel_credentials())

Next, we need to create an inference script to be uploaded to the Hydrosphere platform. This script will be executed each time you are instantiating a model servable. Let's name our function file func_main.py and store it in the src folder inside the directory where your model is stored. Your directory structure should look like this:

.
└── model
    └── model.joblib
    └── src
        └── func_main.py

The code in the func_main.py should be as follows:

func_main.py
import pandas as pd
from joblib import load


clf = load('/model/files/model.joblib')

cols = ['age', 'workclass', 'fnlwgt',
 'education', 'educational-num', 'marital-status',
 'occupation', 'relationship', 'race', 'gender',
 'capital-gain', 'capital-loss', 'hours-per-week',
 'native-country']

def predict(**kwargs):
    X = pd.DataFrame.from_dict({'input': kwargs}, 
                               orient='index', columns = cols)
    predicted = clf.predict(X)

    return {"y": predicted[0]}

It’s important to make sure that variables will be in the right order after we transform our dictionary for a prediction. So in cols we preserve column names as a list sorted by order of their appearance in the DataFrame.

To start working with the model in a cluster, we need to install the necessary libraries used in func_main.py. Create a requirements.txt in the folder with your model and add the following libraries to it:

pandas==1.0.5
scikit-learn==0.23.2
joblib==0.16.0

After this, your model directory with all necessary dependencies should look as follows:

.
└── model
    └── model.joblib
    └── requirements.txt
    └── src
        └── func_main.py

Now we are ready to upload our model to the cluster.

Hydrosphere Serving has a strictly typed inference engine, so before uploading our model we need to specify it’s signature withSignatureBuilder. A signature contains information about which method inside the func_main.py should be called, as well as shapes and types of its inputs and outputs.

Use X.dtypes to check what types of data you have for each column. You can use int64 fields for all variables including income, which is our dependent variable and we can name it as 'y' in a signature for further prediction.

Besides, you can specify the type of profiling for each variable using ProfilingType so Hydrosphere could know what this variable is about and analyze it accordingly. For this purpose, we can create a dictionary, which could contain keys as our variables and values as our profiling types. Otherwise, you can describe them one by one as a parameter in the input.

Finally, we can complete our signature with the .build() method.

from hydrosdk.contract import SignatureBuilder, ModelContract, ProfilingType as PT

signature = SignatureBuilder('predict') 

col_types = {
  **dict.fromkeys(numerical_features, PT.NUMERICAL), 
  **dict.fromkeys(categorical_features, PT.CATEGORICAL)}

for i in X.columns:
    signature.with_input(i, 'int64', 'scalar', col_types[i])

signature = signature.with_output('y', 'int64', 'scalar', PT.NUMERICAL).build()

Next, we need to specify which files will be uploaded to the cluster. We use path to define the root model folder and payload to point out paths to all files that we need to upload.

At this point, we can combine all our efforts into the LocalModel object. LocalModels are models before they get uploaded to the cluster. They contain all the information required to instantiate a ModelVersion in a Hydrosphere cluster. We’ll name this model adult_model.

Additionally, we need to specify the environment in which our model will run. Such environments are called Runtimes. In this tutorial, we will use the default Python 3.7 runtime. This runtime uses the src/func_main.py script as an entry point, which is the reason we organized our files the way we did.

One more parameter that you can define is a path to the training data of your model, required if you want to utilize additional services of Hydrosphere (for example, Automatic Outlier Detection).

from hydrosdk.modelversion import LocalModel
from hydrosdk.image import DockerImage

path = "model/"
payload = ['src/func_main.py', 'requirements.txt', 'model.joblib']
contract = ModelContract(predict=signature)

local_model = LocalModel(name="adult_model", 
                         install_command = 'pip install -r requirements.txt',
                         contract=contract, payload=payload,
                         runtime=DockerImage("hydrosphere/serving-runtime-python-3.7", "2.3.2", None),
                         path=path, training_data = 'data/train.csv')

Now we are ready to upload our model to the cluster. This process consists of several steps:

  1. Once LocalModel is prepared we can apply the upload method to upload it.

  2. Then we can lock any interaction with the model until it will be successfully uploaded.

  3. ModelVersion helps to check whether our model was successfully uploaded to the platform by looking for it.

from hydrosdk.modelversion import ModelVersion

uploaded_model = local_model.upload(cluster)
uploaded_model.lock_till_released()
uploaded_model.upload_training_data()

# Check that model was uploaded successfully
adult_model = ModelVersion.find(cluster, name="adult_model", 
                               version=uploaded_model.version)

To deploy a model you should create an Application - a linear pipeline of ModelVersions with monitoring and other benefits. Applications provide Predictor objects, which should be used for data inference purposes.

from hydrosdk.application import ExecutionStageBuilder, Application, ApplicationBuilder

stage = ExecutionStageBuilder().with_model_variant(adult_model).build()
app = ApplicationBuilder(cluster, "adult-app").with_stage(stage).build()

predictor = app.predictor()

Predictors provide a predict method which we can use to send our data to the model. We can try to make predictions for our test set that has preliminarily been converted to a list of dictionaries. You can check the results using the name you have used for an output of Signature and preserve it in any format you would prefer. Before making a prediction don't forget to make a small pause to finish all necessary loadings.

results = []
for x in test_X.to_dict('records'):
    result = predictor.predict(x)
    results.append(result['y'])
print(results[:10])

Explore the UI

If you want to interact with your model via Hydrosphere UI, you can go to http://localhost. Here you can find all your models. Click on a model to view information about it: versions, building logs, created applications, model's environments, and other services associated with deployed models.

You might notice that after some time there appears an additional model with the metric postscript at the end of the name. This is your automatically formed monitoring model for outlier detection. Learn more about the Automatic Outlier Detection feature here.

🎉 You have successfully finished this tutorial! 🎉

Next Steps

Next, you can:

  1. Go to the next tutorial and learn how to create a custom Monitoring Metric and attach it to your deployed model:

  1. Explore the extended part of this tutorial to learn how to use YAML resource definitions to upload a ModelVersion and create an Application.

Deploy a model with CLI and Resource Definitions

Another way to upload your model is to apply a resource definition. This process repeats all the previous steps like data preparation and training. The difference is that instead of SDK, we are using CLI to apply a resource definition.

A resource definition is a file that defines the inputs and outputs of a model, a signature function, and some other metadata required for serving. Go to the root directory of the model and create a serving.yaml file. You should get the following file structure:

.
└── model
    └── model.joblib
    └── serving.yaml
    └── requirements.txt
    └── src
        └── func_main.py

Model deployment with a resource definition repeats all the steps of that with SDK, but in one file. A considerable advantage of using a resource definition is that besides describing your model it allows creating an application by simply adding an object to the contract after the separation line at the bottom. Just name your application and provide the name and version of a model you want to tie to it.

kind: Model
name: "adult_model"
payload:
  - "model/src/"
  - "model/requirements.txt"
  - "model/classification_model.joblib"
runtime: "hydrosphere/serving-runtime-python-3.6:0.1.2-rc0"
install-command: "pip install -r requirements.txt"
training-data: data/profile.csv
contract:
  name: "predict"
  inputs:
    age:
      shape: scalar
      type: int64
      profile: numerical
    workclass:
      shape: scalar
      type: int64
      profile: categorical
    fnlwgt:
      shape: scalar
      type: int64
      profile: numerical
    education:
      shape: scalar
      type: int64
      profile: categorical
    educational-num:
      shape: scalar
      type: int64
      profile: numerical
    marital_status:
      shape: scalar
      type: int64
      profile: categorical
    occupation:
      shape: scalar
      type: int64
      profile: categorical
    relationship:
      shape: scalar
      type: int64
      profile: categorical
    race:
      shape: scalar
      type: int64
      profile: categorical
    sex:
      shape: scalar
      type: int64
      profile: categorical
    capital_gain:
      shape: scalar
      type: int64
      profile: numerical
    capital_loss:
      shape: scalar
      type: int64
      profile: numerical
    hours_per_week:
      shape: scalar
      type: int64
      profile: numerical
    country:
      shape: scalar
      type: int64
      profile: categorical
  outputs:
    class:
      shape: scalar
      type: int64
      profile: numerical
---
kind: Application
name: adult_application
singular:
  model: adult_model:1

To start uploading, run hs apply -f serving.yaml. To monitor your model you can use Hydrosphere UI as was previously shown.

Monitoring Anomalies with a Custom Metric

Monitoring External Models

Overview

Monitoring can be used to track the behavior of external models running outside of the Hydrosphere platform. This tutorial describes how to register an external model, trigger analysis over your requests, and retrieve results.

By the end of this tutorial you will know how to:

  • Register a model

  • Upload training data

  • Assign custom metrics

  • Invoke analysis

  • Retrieve metrics

Prerequisites

For this tutorial, you need to have Hydrosphere Platform deployed on your local machine with Sonar component enabled. If you don't have it yet, please follow this guide first:

  • Platform Installation

You also need a running external model, capable of producing predictions. Inputs and outputs of that model will be fed into Hydrosphere for monitoring purposes.

Model registration

First, you have to register an external model. To do that, submit a JSON document, defining your model.

Request document structure

This section describes the structure of the JSON document used to register external models within the platform.

Top-level members

The document must contain the following top-level members, describing the interface of your model:

  • name: the name of the registered model. This name uniquely identifies a collection of model versions, registered within the Hydrosphere platform.

  • contract: the interface of the registered model. This member describes inputs and outputs of the model, as well as other complementary metadata, such as model signature, and data profile for each field.

A document may contain additional top-level members, describing other details of your model.

  • metadata: the metadata of the registered model. The structure of the object is not strictly defined. The only constraint is that the object must have a key-value structure, where a value can only be of a simple data type (string, number, boolean).

  • monitoringConfiguration: monitoring configuration to be used for this model.

This example shows, how a model can be defined at the top level:

{  
    "name": "external-model-example",
    "metadata": {
        "architecture": "Feed-forward neural network",
        "description": "Sample external model example",
        "author": "Hydrosphere.io",
        "training-data": "s3://bucket/external-model-example/data/",
        "endpoint": "http://example.com/api/external-model/"
    },
    "monitoringConfiguration": {
        "batchSize": 100
    },
    "contract": {
        ...
    }
}

MonitoringConfiguration object

monitoringConfiguration object defines a monitoring configuration to be used for the model version. The object must contain the following members:

  • batchSize: size of the batch to be used for aggregations.

The example below shows how a monitoringConfiguration object can be defined.

{
    "batchSize": 100,
}

Contract object

Thecontract object appears in the document to define the interface of the model. The contract object must contain the following members:

  • modelName: the original name of the model. It should be the same as the name of the registered model, defined on the level above;

  • predict: the signature of the model. It defines the inputs and the outputs of the model.

The example below shows how a contract object can be defined.

{
    "modelName": "external-model-example",
    "predict": {
        ...
    }
}

Predict object

predict object describes the signature of the model. The signature object must contain the following members:

  • signatureName: The signature of the model, used to process the request;

  • inputs: A collection of fields, defining the inputs of the model. Each item in the collection describes a single data entry, its type, shape, and profile. A collection must contain at least one item;

  • outputs: A collection of fields, defining the outputs of the model. Each item in the collection describes a single data entry, its type, shape, and profile. A collection must contain at least one item.

The example below shows how a predict object can be defined.

{
    "signatureName": "predict",
    "inputs": [
        ...
    ],
    "outputs": [
        ...
    ]
}

Field object

Items in the inputs / outputs collections are collectively called "fields". The field object must contain the following members:

  • name: Name of the field;

  • dtype: Data type of the field.

  • profile: Data profile of the field.

  • shape: Shape of the field.

The only valid options for dtype are:

  • DT_STRING;

  • DT_BOOL;

  • DT_VARIANT;

  • DT_HALF;

  • DT_FLOAT;

  • DT_DOUBLE;

  • DT_INT8;

  • DT_INT16;

  • DT_INT32;

  • DT_INT64;

  • DT_UINT8;

  • DT_UINT16;

  • DT_UINT32;

  • DT_UINT64;

  • DT_QINT8;

  • DT_QINT16;

  • DT_QINT32;

  • DT_QUINT8;

  • DT_QUINT16;

  • DT_COMPLEX64;

  • DT_COMPLEX128;

The only valid options for profile are:

  • NONE

  • NUMERICAL

  • TEXT

  • IMAGE

The example below shows how a single field object can be defined.

{
    "name": "age",
    "dtype": "DT_INT32",
    "profile": "NUMERICAL",
    "shape": {
        ...
    }
}

Shape object

shape object defines the shape of the data that the model is processing. The shape object must contain the following members:

  • dim: A collection of items, describing each dimension. A collection may be empty — in that case, the tensor will be interpreted as a scalar value.

  • unknownRank: Boolean value. Identifies whether the defined shape is of unknown rank.

The example below shows how a shape object can be defined.

{
    "dim": [
        ...
    ],
    "unknownRank": false
}

Dim object

dim object defines a dimension of the field. The dim object must contain the following members:

  • size: Size of the dimension.

  • name: Name of the dimension.

The example below shows how a dim object can be defined.

{
    "size": 10,
    "name": "example"
}

Registering external model

A model can be registered by sending a POST request to the /api/v2/externalmodel endpoint. The request must include a model definition as primary data.

The request below shows an example of an external model registration.

POST /api/v2/externalmodel HTTP/1.1
Content-Type: application/json
Accept: application/json

{
    "name": "external-model-example",
    "metadata": {
        "architecture": "Feed-forward neural network",
        "description": "Sample external model example",
        "author": "Hydrosphere.io",
        "training-data": "s3://bucket/external-model-example/data/",
        "endpoint": "http://example.com/api/external-model/"
    },
    "monitoringConfiguration": {
        "batchSize": 100
    },
    "contract": {
        "modelName": "external-model-example",
        "predict": {
            "signatureName": "predict",
            "inputs": [
                {
                    "name": "in",
                    "dtype": "DT_DOUBLE",
                    "profile": "NUMERICAL",
                    "shape": {
                        "dim": [],
                        "unknownRank": false
                    }
                }
            ],
            "outputs": [
                {
                    "name": "out",
                    "dtype": "DT_DOUBLE",
                    "profile": "NUMERICAL",
                    "shape": {
                        "dim": [],
                        "unknownRank": false
                    }
                }
            ]
        }
    }
}

As a response, the server will return a JSON object with complementary metadata, identifying a registered model version.

Response document structure

The response object from the external model registration request contains the following fields:

  • id: Model version ID, uniquely identifying a registered model version within Hydrosphere platform;

  • model: An object, representing a model collection, registered in Hydrosphere platform;

  • modelVersion: Model version number in the model collection;

  • modelContract: Contract of the model, similar to the one defined in the request section above;

  • metadata: Metadata of the model, similar to the one defined in the request section above;

  • monitoringConfiguration: MonitoringConfiguration of the model, similar to the one defined in the request section above;

  • created: Timestamp, indicating when the model was registered.

Note theid field. It will be referred as MODEL_VERSION_ID later throughout the article.

Model object

model object represents a collection of model versions, registered in the platform. The response model object contains the following fields:

  • id: ID of the model collection;

  • name: Name of the model collection.

The example below shows, a sample server response from an external model registration request.

HTTP/1.1 200 OK
Content-Type: application/json

{
    "id": 1,
    "model": {
        "id": 1,
        "name": "external-model-example"
    },
    "modelVersion": 1,
    "created": "2020-01-09T16:25:02.915Z",
    "modelContract": { 
        "modelName": "external-model-example",
        "predict": {
            "signatureName": "predict",
            "inputs": [
                {
                    "name": "in",
                    "dtype": "DT_DOUBLE",
                    "profile": "NUMERICAL",
                    "shape": {
                        "dim": [],
                        "unknownRank": false
                    }
                }
            ],
            "outputs": [
                {
                    "name": "out",
                    "dtype": "DT_DOUBLE",
                    "profile": "NUMERICAL",
                    "shape": {
                        "dim": [],
                        "unknownRank": false
                    }
                }
            ]
        },
    "metadata": { 
        "architecture": "Feed-forward neural network",
        "description": "Sample external model example",
        "author": "Hydrosphere.io",
        "training-data": "s3://bucket/external-model-example/data/",
        "endpoint": "http://example.com/api/external-model/"
    },
    "monitoringConfiguration": {
        "batchSize": 100
    }
}

Training data upload

To let Hydrosphere calculate the metrics of your requests, you have to submit the training data. You can do so by:

  • using CLI

  • using HTTP endpoint

In each case your training data should be represented as a CSV document, containing fields named exactly as in the interface of your model.

Currently, we support uploading training data as .csv files and utilizing it for NUMERICAL, CATEGORICAL, and TEXT profiles only.

Upload using CLI

Switch to the cluster, suitable for your current flow.

$ hs cluster use example-cluster
Switched to cluster '{'cluster': {'server': '<hydrosphere>'}, 'name': 'example-cluster'}'

If you don't have a defined cluster yet, create one using the following command.

$ hs cluster add --server <hydrosphere> --name example-cluster
Cluster 'example-cluster' @ <hydrosphere> added successfully
$ hs cluster use example-cluster

Make sure you have a local copy of the training data that you want to submit.

$ head external-model-data.csv
in,out
0.8744973,0.74737076
0.35367096,0.68612554
0.12600919,0.23873545
0.22988156,0.01602719
0.09958467,0.81491237
0.50324137,0.23527377
0.02184051,0.37468397
0.23937149,0.66311923
0.48611933,0.65467976
0.98475208,0.28292798

Submit the training data. You must specify two parameters:

  • --model-version: A string indicating the model version to which you want to submit the data. The string should be formatted in the following way <model-name>:<model-version>;

  • --filename: Path to a filename, that you want to submit.

If you already have your training data uploaded to S3, you can specify a path to that object URI using --s3path parameter instead of --filename. The object behind this URI should be available to the Hydrosphere instance.

$ hs profile push \
    --model-version external-model-example:1 \
    --filename external-model-data.csv

Depending on the size of your data, you will have to wait for the data to be uploaded. If you don't want to wait, you can use the --async flag.

Upload using an HTTP endpoint

To upload your data using an HTTP endpoint, stream it to the /monitoring/profiles/batch/<MODEL_VERSION_ID> endpoint.

In the code snippets below you can see how data can be uploaded using sample HTTP clients.

from argparse import ArgumentParser
from urllib.parse import urljoin

import requests


def read_in_chunks(filename, chunk_size=1024):
    """ Generator to read a file peace by peace. """
    with open(filename, "rb") as file:
        while True:
            data = file.read(chunk_size)
            if not data:
                break
            yield data


if __name__ == "__main__": 
    parser = ArgumentParser()
    parser.add_argument("--hydrosphere", type=str, required=True)
    parser.add_argument("--model-version-id", type=int, required=True)
    parser.add_argument("--filename", required=True)
    parser.add_argument("--chunk-size", default=1024)
    args, unknown = parser.parse_known_args()
    if unknown:
        print("Parsed unknown arguments: %s", unknown)

    endpoint_uri = "/monitoring/profiles/batch/{}".format(args.model_version_id)
    endpoint_uri = urljoin(args.hydrosphere, endpoint_uri) 

    gen = read_in_chunks(args.filename, chunk_size=args.chunk_size)
    response = requests.post(endpoint_uri, data=gen, stream=True)
    if response.status_code != 200:
        print("Got error:", response.text)
    else:
        print("Uploaded data:", response.text)
import com.google.common.io.Files;

import java.io.*;
import java.net.*;


public class DataUploader {
    private String endpointUrl = "/monitoring/profiles/batch/";

    private String composeUrl(String base, long modelVersionId) throws java.net.URISyntaxException {
        return new URI(base).resolve(this.endpointUrl + modelVersionId).toString();
    }

    public int upload(String baseUrl, String filePath, long modelVersionId) throws Exception {
        String composedUrl = this.composeUrl(baseUrl, modelVersionId);
        HttpURLConnection connection = (HttpURLConnection) new URL(composedUrl).openConnection();
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.setChunkedStreamingMode(4096);

        OutputStream output = connection.getOutputStream();
        Files.copy(new File(filePath), output);
        output.flush();

        return connection.getResponseCode();
    }

    public static void main(String[] args) throws Exception {
        DataUploader dataUploader = new DataUploader();
        int responseCode = dataUploader.upload(
            "http://<hydrosphere>/", "/path/to/data.csv", 1);
        System.out.println(responseCode);
    }
}

You can acquire MODEL_VERSION_ID by sending a GET request to /model/version/<MODEL_NAME>/<MODEL_VERSION> endpoint. The response document will have a similar structure, already defined @refabove.

Custom metrics assignment

This step is optional. If you wish to assign a custom monitoring metric to a model, you can do it by:

  • using Hydrosphere UI

  • using HTTP endpoint

Using Hydrosphere UI

To find out how to assign metrics using Hydrosphere UI, refer to this page.

Using HTTP endpoint

To assign metrics using HTTP endpoint, you will have to submit a JSON document, defining a monitoring specification.

Top-level members

The document must contain the following top-level members.

  • name: The name of the monitoring metric;

  • modelVersionId: Unique identifier of the model to which you want to assign a metric;

  • config: Object, representing a configuration of the metric, which will be applied to the model.

The example below shows how a metric can be defined on the top level.

{
    "name": "string",
    "modelVersionId": 1,
    "config": {
        ...
    }
}

Config object

config object defines a configuration of the monitoring metric that will monitor the model. The model must contain the following members:

  • modelVersionId: Unique identifier of the model that will monitor requests;

  • threshold: Threshold value, against which monitoring values will be compared using a comparison operator;

  • thresholdCmpOperator: Object, representing a comparison operator.

The example below shows, how a metric can be defined on a top-level.

{
    "modelVersionId": 2,
    "threshold": 0.5,
    "thresholdCmpOperator": {
        ...
    }
}

ThresholdCmpOperator object

thresholdCmpOperator object defines the kind of comparison operator that will be used when comparing a value produced by the metric against the threshold. The object must contain the following members:

  • kind: Kind of comparison operator.

The only valid options for kind are:

  • Eq;

  • NotEq;

  • Greater;

  • Less;

  • GreaterEq;

  • LessEq.

The example below shows, how a metric can be defined on the top level.

{
    "kind": "LessEq"
}

The request below shows an example of assigning a monitoring metric. At this moment, both monitoring and the actual prediction model should be registered/uploaded to the platform.

POST /monitoring/metricspec HTTP/1.1
Content-Type: application/json
Accept: application/json

{
    "name": "string",
    "modelVersionId": 1,
    "config": {
        "modelVersionId": 2,
        "threshold": 0.5,
        "thresholdCmpOperator": {
            "kind": "LessEq"
        }
    }
}

Analysis invocation

To send a request for analysis you have to use gRPC endpoint. We have already predefined ProtoBuf messages for the reference.

  1. Create an ExecutionMetadata message that contains metadata information of the model, used to process a given request:

  2. Create a PredictRequest message that contains the original request passed to the serving model for the prediction:

  3. Create a PredictResponse message that contains inferenced output of the model:

  4. Assemble an ExecutionInformation from the above-created messages.

  5. Submit ExecutionInformation proto to Sonar for analysis. Use the RPC Analyse method of the MonitoringService to calculate metrics.

In the code snippets below you can see how analysis can be triggered with sample gRPC clients.

import uuid
import grpc
import random
import hydro_serving_grpc as hs

use_ssl_connection = True
if use_ssl_connection:
    creds = grpc.ssl_channel_credentials()
    channel = grpc.secure_channel(HYDROSPHERE_INSTANCE_GRPC_URI, credentials=creds)
else:
    channel = grpc.insecure_channel(HYDROSPHERE_INSTANCE_GRPC_URI) 
monitoring_stub = hs.MonitoringServiceStub(channel)

# 1. Create an ExecutionMetadata message. ExecutionMetadata is used to define, 
# which model, registered within Hydrosphere platform, was used to process a 
# given request.
trace_id = str(uuid.uuid4())  # uuid used as an example
execution_metadata_proto = hs.ExecutionMetadata(
    model_name="external-model-example",
    modelVersion_id=2,
    model_version=3,
    signature_name="predict",
    request_id=trace_id,
    latency=0.014,
)

# 2. Create a PredictRequest message. PredictRequest is used to define the data 
# passed to the model for inference.
predict_request_proto = hs.PredictRequest(
    model_spec=hs.ModelSpec(
        name="external-model-example",
        signature_name="predict", 
    ),
    inputs={
        "in": hs.TensorProto(
            dtype=hs.DT_DOUBLE, 
            double_val=[random.random()], 
            tensor_shape=hs.TensorShapeProto()
        ),
    }, 
)

# 3. Create a PredictResponse message. PredictResponse is used to define the 
# outputs of the model inference.
predict_response_proto = hs.PredictResponse(
    outputs={
        "out": hs.TensorProto(
            dtype=hs.DT_DOUBLE, 
            double_val=[random.random()], 
            tensor_shape=hs.TensorShapeProto()
        ),
    },
)

# 4. Create an ExecutionInformation message. ExecutionInformation contains all 
# request data and all auxiliary information about request execution, required 
# to calculate metrics.
execution_information_proto = hs.ExecutionInformation(
    request=predict_request_proto,
    response=predict_response_proto,
    metadata=execution_metadata_proto,
)

# 5. Use RPC method Analyse of the MonitoringService to calculate metrics
monitoring_stub.Analyze(execution_information_proto)
import io.hydrosphere.serving.monitoring.MonitoringServiceGrpc;
import io.hydrosphere.serving.monitoring.MonitoringServiceGrpc.MonitoringServiceBlockingStub;
import io.hydrosphere.serving.monitoring.Metadata.ExecutionMetadata;
import io.hydrosphere.serving.monitoring.Api.ExecutionInformation;
import io.hydrosphere.serving.tensorflow.api.Predict.PredictRequest;
import io.hydrosphere.serving.tensorflow.api.Predict.PredictResponse;
import io.hydrosphere.serving.tensorflow.TensorProto;
import io.hydrosphere.serving.tensorflow.TensorShapeProto;
import io.hydrosphere.serving.tensorflow.DataType;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;


public class HydrosphereClient {

    private final String modelName;         // Actual model name, registered within Hydrosphere platform
    private final long modelVersion;        // Model version of the registered model within Hydrosphere platform
    private final long modelVersionId;      // Model version Id, which uniquely identifies any model within Hydrosphere platform
    private final ManagedChannel channel;
    private final MonitoringServiceBlockingStub blockingStub;

    public HydrosphereClient(String target, String modelName, long modelVersion, long modelVersionId) {
        this(ManagedChannelBuilder.forTarget(target).build(), modelName, modelVersion, modelVersionId);
    }

    HydrosphereClient(ManagedChannel channel, String modelName, long modelVersion, long modelVersionId) {
        this.channel = channel;
        this.modelName = modelName;
        this.modelVersion = modelVersion;
        this.modelVersionId = modelVersionId;
        this.blockingStub = MonitoringServiceGrpc.newBlockingStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    private double getLatency() {
        /*
        Random value is used as an example. Acquire the actual latency
        value, during which a model processed a request.
        */
        return new Random().nextDouble();
    }

    private String getTraceId() {
        /*
        UUID used as an example. Use this value to track down your
        requests within Hydrosphere platform.
        */
        return UUID.randomUUID().toString();
    }

    private TensorProto generateDoubleTensorProto() {
        /*
        Helper method generating TensorProto object with random double values.
        */
        return TensorProto.newBuilder()
                .addDoubleVal(new Random().nextDouble())
                .setDtype(DataType.DT_DOUBLE)
                .setTensorShape(TensorShapeProto.newBuilder().build())  // Empty TensorShape indicates scalar shape
                .build();
    }

    private PredictRequest generatePredictRequest() {
        /*
        PredictRequest is used to define the data passed to the model for inference.
        */
        return PredictRequest.newBuilder()
                .putInputs("in", this.generateDoubleTensorProto()).build();
    }

    private PredictResponse generatePredictResponse() {
        /*
        PredictResponse is used to define the outputs of the model inference.
        */
        return PredictResponse.newBuilder()
                .putOutputs("out", this.generateDoubleTensorProto()).build();
    }

    private ExecutionMetadata generateExecutionMetadata() {
        /*
        ExecutionMetadata is used to define, which model, registered within Hydrosphere
        platform, was used to process a given request.
        */
        return ExecutionMetadata.newBuilder()
                .setModelName(this.modelName)
                .setModelVersion(this.modelVersion)
                .setModelVersionId(this.modelVersionId)
                .setSignatureName("predict")                // Use default signature of the model
                .setLatency(this.getLatency())              // Get latency for a given request
                .setRequestId(this.getTraceId())            // Get traceId to track a given request within Hydrosphere platform
                .build();
    }

    public ExecutionInformation generateExecutionInformation() {
        /*
        ExecutionInformation contains all request data and all auxiliary information
        about request execution, required to calculate metrics.
        */
        return ExecutionInformation.newBuilder()
                .setRequest(this.generatePredictRequest())
                .setResponse(this.generatePredictResponse())
                .setMetadata(this.generateExecutionMetadata())
                .build();
    }

    public void analyzeExecution(ExecutionInformation executionInformation) {
        /*
        The actual use of RPC method Analyse of the MonitoringService to invoke
        metrics calculation.
        */
        this.blockingStub.analyze(executionInformation);
    }

    public static void main(String[] args) throws Exception {
        /*
        Test client functionality by sending randomly generated data for analysis.
        */
        HydrosphereClient client = new HydrosphereClient("<hydrosphere>", "external-model-example", 1, 1);
        try {
            int requestAmount = 10;
            System.out.printf("Analysing %d randomly generated samples\n", requestAmount);
            for (int i = 0; i < requestAmount; i++) {
                ExecutionInformation executionInformation = client.generateExecutionInformation();
                client.analyzeExecution(executionInformation);
            }
        } finally {
            System.out.println("Shutting down client");
            client.shutdown();
        }
    }
}

Metrics retrieval

Once triggered, the analyze method does not return anything. To fetch calculated metrics from the model version, you have to make a GET request to the /monitoring/checks/all/<MODEL_VERSION_ID> endpoint.

A request must contain the following parameters:

  • limit: how many requests to fetch;

  • offset: which offset to make from the beginning.

An example request is shown below.

GET /monitoring/checks/all/1?limit=1&offset=0 HTTP/1.1
Accept: application/json

Calculated metrics have a dynamic structure, which is dependant on the model interface.

Response object structure

A response object contains the original data submitted for prediction, the model's response, calculated metrics and other supplementary metadata. Every field produced by Hydrosphere is prefixed with _hs_ char.

  • _id: ID of the request, generated internally by Hydrosphere;

  • _hs_request_id: ID of the request, specified by user;

  • _hs_model_name: Name of the model that processed a request;

  • _hs_model_incremental_version: Version of the model that processed a request;

  • _hs_model_version_id: ID of the model version, which processed a request;

  • _hs_raw_checks: Raw checks calculated by Hydrosphere based on the training data;

  • _hs_metric_checks: Metrics produced by monitoring models;

  • _hs_latency: Latency, indicating how much it took to process a request;

  • _hs_error: Error message that occurred during request processing;

  • _hs_score: The number of all successful checks divided by the number of all checks;

  • _hs_overall_score: The amount of all successful metric values (not exceeding a specified threshold), divided by the amount of all metric values;

  • _hs_timestamp: Timestamp in nanoseconds, when the object was generated;

  • _hs_year: Year when the object was generated;

  • _hs_month: Month when the object was generated;

  • _hs_day: Day when the object was generated;

Apart from the fields defined above, each object will have additional fields specific to the particular model version and its interface.

  • _hs_<field_name>_score: The number of all successful checks calculated for this specific field divided by the total number of all checks calculated for this specific field;

  • <field_name>: The value of the field.

Raw checks object

_hs_raw_checks object contains all fields, for which checks have been calculated.

The example below shows, how the _hs_raw_checks_ object can be defined.

{
    "<field_name>": [
        ...
    ]
}

Check object

check object declares the check, that has been calculated for the particular field. The following members will be present in the object.

  • check: Boolean value indicating, whether the check has been passed;

  • description: Description of the check that has been calculated;

  • threshold: Threshold of the check;

  • value: Value of the field;

  • metricSpecId: Metric specification ID. For each check object this value will be set to null.

The example below shows, how the check object can be defined.

{
    "check": true,
    "description": "< max",
    "threshold": 0.9321230184950273,
    "value": 0.2081205412912307,
    "metricSpecId": null
}

Metrics object

_hs_metrics_checks object contains all fields for which metrics have been calculated.

The example below shows how the _hs_metrics_checks object can be defined.

{
    "<field_name>": {
        ...
    }
}

Metric object

metric object declares the metric, that has been calculated for the particular field. The following members will be present in the object.

  • check: Boolean value indicating, whether the metric has not been fired;

  • description: Name of the metric that has been calculated;

  • threshold: Threshold of the metric;

  • value: Value of the metric;

  • metricSpecId: Metric specification ID.

The example below shows how the metric object can be defined.

{
    "check": true, 
    "description": "string", 
    "threshold": 5.0,
    "value": 4.0,
    "metricSpecId": "bbb34c1f-13e1-4d1c-ad29-6e27c5461c37"
}

The example below shows a fully composed server response.

HTTP/1.1 200 OK
Content-Type: application/json

[
    {
        "_id": "5e1717f687a34b00086f58d8",
        "in": 0.2081205412912307,
        "out": 0.5551249161117925,
        "_hs_in_score": 1.0,
        "_hs_out_score": 1.0,
        "_hs_raw_checks": {
            "in": [
                {
                    "check": true,
                    "description": "< max",
                    "threshold": 0.9321230184950273,
                    "value": 0.2081205412912307,
                    "metricSpecId": null
                },
                {
                    "check": true,
                    "description": "> min",
                    "threshold": 0.0001208467391203,
                    "value": 0.2081205412912307,
                    "metricSpecId": null
                }
            ],
            "out": [
                {
                    "check": true,
                    "description": "< max",
                    "threshold": 0.9921230184950273,
                    "value": 0.5551249161117925,
                    "metricSpecId": null
                },
                {
                    "check": true,
                    "description": "> min",
                    "threshold": 0.0201208467391203,
                    "value": 0.5551249161117925,
                    "metricSpecId": null
                }
            ],
        },
        "_hs_metric_checks": {
            "string": {
                "check": true, 
                "description": "KNN", 
                "threshold": 5.0,
                "value": 4.0,
                "metricSpecId": "bbb34c1f-13e1-4d1c-ad29-6e27c5461c37"
            },
        },
        "_hs_latency": 0.7166033601366634,
        "_hs_error": "string",
        "_hs_score": 1.0,
        "_hs_overall_score": 1.0,
        "_hs_model_version_id": 1,
        "_hs_model_name": "external-model-example",
        "_hs_model_incremental_version": 1,
        "_hs_request_id": "395ae721-5e68-46e1-8ed6-74e360c614c1",
        "_hs_timestamp": 1578571766000,
        "_hs_year": 2020,
        "_hs_month": 1,
        "_hs_day": 9
    }
]