Improve the adaptability of your pipeline

By 15 March 2023March 16th, 2023Functionality, Product update, UbiOps

In our previous blogs on pipeline operators, we saw how we could use the pipeline operators to speed up our inference, to add unit tests to the input of our machine learning model and to conditionally routing the pipeline request to one way or the other. 

With UbiOps latest release, v2.22.0, we introduced a new pipeline operator: the pipeline variable, which can be used to store objects. The variable’s type can be any of the in- and outputs fields that we use in UbiOps: integers, strings, arrays and also files. These variables or files function as fixed variables that can be processed in your pipeline. They can be used to store hyperparameters, configuration files, or environment-specific information.

In this blogpost we will look at an inferencing pipeline, where we emphasize on storing our input data, and logging our inference results and some metadata. We use the pipeline variable to specify which locations to write these pieces of information to. This makes the pipelines more modular, and easier to reproduce in different environments. This can be useful when you work with a development and a production environment.

There are a lot of choices to be made on how to store your results. This blogpost will highlight how to write results to S3 buckets, because UbiOps offers these as a native storage facility.  We finish off with a sneak preview on how to write your inference results and metadata to a relational PostgreSQL database.

Using a pipeline variable to connect to log inference and logging results

In this example, we will use a pipeline variable of type ‘string’ to specify the S3 bucket to which we will write the information that we want to store. We run our model in production, and expect a lot of traffic. We want to log information around the inference process, so that we can monitor its operations and performance.

As our example model, we use a model that is trained on the MNIST dataset. We create a deployment that stores this model. Input data is passed to the model, and we write our operational metadata and the prediction to the output fields.The deployment has the following specifications

DescriptionA model that recognizes handwritten digits. Trained on the MNIST dataset.
Input fieldimage (File)
Output fields:prediction (integer)

logs  (String)

Sample images from the MNIST handwritten digits database

The `` of our deployment is shown below. Additionally, the deployment package that we use contains the model artifact that is referenced inside the deployment code, and a `requirements.txt` that pins all versions of the packages that are used.

When sending an input image to this deployment, the image is classified by the MNIST-model. This leads to a prediction (datatype: integer) and a probability (datatype: double).

The file containing the deployment code is required to be called '' and should contain the 'Deployment'
class and 'request' method.

import os
from keras.models import load_model
from imageio import imread
import numpy as np
import json
from datetime import datetime

class Deployment:

    def __init__(self, base_directory, context):
        Initialisation method for the deployment. This method will run as soon as a deployment starts up.
        Load your external model files (such as pickles or .h5 files) here.

        print("Initialising deployment")

        # This is where we load the pre-trained weight file of our CNN model
        weights = os.path.join(base_directory, "cnn.h5")
        self.model = load_model(weights)
        # We use the context dictionary to extract some information that we want to log
        self.PROJECT_NAME = context["project"]
        self.DEPLOYMENT = context['deployment']
        self.DEPLOYMENT_VERSION = context['version']

    def request(self, data, context):
        Method for deployment requests, called separately for each individual request.

        print("Processing request")

        #Here we read the data that is passed to the model with the request.
        x = imread(data['image'])
        #Use the request context dictionary to extract the request id 
        request_id = context['id']

        x = x.reshape(1, 28, 28, 1)
        x = x.astype(np.float32) / 255
        #Model inference
        out = self.model.predict(x)
        #Extract results
        prediction = int(np.argmax(out))
        probability = float(np.max(out))
        #Set-up information that we want to log 
        logs = json.dumps({
                "prediction" : prediction,
                "probability": probability,
                "request_id" : request_id,
                "deployment" : self.DEPLOYMENT,
                "version" : self.DEPLOYMENT_VERSION,
                "project" : self.PROJECT_NAME,
                "timestamp" : str(,
                "message" : "Run completed succesfully!"
        #Write the prediction result and the logs to the outputs fields
        return {'prediction': prediction,
                'logs' : logs}

We use a second deployment is used to log the operational data to our bucket. This deployment takes as input fields the logs from the mnist-handwriting-recognition deployment, the input image that is classified, and the name of the bucket to which we want to write our results.

It contains two output fields, that are used to write the files to our bucket.

Name log-results
DescriptionA deployment that writes input images and metadata to a UbiOps-hosted S3 bucket.
Input fieldbucket-name (String)

image   (File)

logs  (String)

Output fields:logs (file)

image (file)


We set up this workflow using the deployment code below:

from datetime import datetime
import ubiops
import os
import configparser
import json

class Deployment:
    def __init__(self, base_directory, context):
    def request(self, data):
        Method for deployment requests, called separately for each individual request.
        #Load the bucket name to write information to
        bucket_name = data["bucket_name"]
        #Extract image to store
        image_uri = data["image"]
        #Load the input logs into a dictionary
        results_logging = json.loads(data["results_logging"])
        #Write logging information to a temporary text file, that we upload to our selected S3 bucket  
        log_file_tmp  = "results_logging.txt"
        with open(log_file_tmp, 'w') as f:
            for key, value in results_logging.items():
                f.write('%s:%s\n' % (key, value))
        logs_to_path = f'inference_logs/logs-{results_logging["request_id"]}.txt' 
        print(f"Uploading logs in '{log_file_tmp}' to path '{logs_to_path}' in bucket '{bucket_name}'")
        #Write the input data to our S3 bucket
        image_filename = image_uri.rsplit('/')[-1]
        prediction = results_logging["prediction"]
        image_to_path = f"images/predicted-class-{prediction}/image-{results_logging['request_id']}"
        print(f"Uploading image '{image_filename}' to path '{image_to_path}' in bucket '{bucket_name}'")
        return {
            "logs" : {
                "file": log_file_tmp,
                "bucket": bucket_name,
                "bucket_file": logs_to_path  
            "image" : {
                "file": data["image"],
                "bucket": bucket_name,
                "bucket_file": image_to_path

✔️ Creating the final pipeline

So let’s use all this to create our final pipeline! We create two branches from the start of our pipeline. One branch is our inference branch. It processes the input image and sends the result to the end of the pipeline request. The start of the pipeline request branches off to our ‘log-results’ deployment. We connect three objects to our ‘log-results’ deployment:

    1. The pipeline request start, which passes the input image and is connected to the input field ‘image’
    2. A pipeline variable named ‘bucket-name’, which specifies the name of our S3 bucket and is connected to the input field ‘bucket-name’
    3. The mnist-model, which passes the inference logging information and is connected to our ‘log-results’ deployment

✔️ Testing our results

After having set-up our final pipeline, we can initiate our S3 bucket! We name it ‘inference-logs’’, and grant read-and-write permissions to our ‘write-results’ deployment. Permissions to read and write from and to buckets need to be granted explicitly.

Now that everything is initiated, we can send as much request as we want to our pipeline, and be ensured of proper results logging! After trying out our model a little, our results_logs folder will contain two folders:

The ’inference_logs’ folder contains files with detailed information on each inference run.

The ‘images’ folder contains some subfolders, named after the predicted classes of the input images.

If we look into our folder, we can now easily analyze all images that were classified a certain way!

To wrap up, we have created a generic set-up that we can conveniently export and import in different environments, using our file system and pipeline variables. We can specify which bucket to write our logs to by changing the variable of our pipeline variable.

Instead of writing the metadata to an S3 bucket, you could try to configure this set-up such that it writes this information to a database service such as MongoDB or PostgreSQL. Connecting to these types of databases requires specifying a large number of settings. Instead of storing each credential as an environment variable, you can use a pipeline variable of type ‘file’, or ‘list of strings’! A final pipeline could then look as follows.

Can this set-up be of use to you? Is your use-case a little different? Always feel free to reach out via our support channels or schedule a call! We are always happy to discuss your ideas further!