Skip to content

Webhooks

Webhooks enable you to listen for events in your project, so you can automatically trigger follow-up actions in the rest of your stack. For example, you can trigger an internal service when a deployment finishes a request, or you can automatically notify your team members through Slack each time a deployment or pipeline request fails.

Configuring a webhook

Webhooks in UbiOps can be configured through the UbiOps WebApp, Python client library and API. Each webhook needs to be tied to a specific object in your project (deployment, pipeline, experiment). Webhooks can be found in the WebApp in the Monitoring section.

A webhook can listen to one of the following four events:

  • Request started: this event is triggered when a request to the configured object changes to status processing.
  • Request finished: this event is triggered when a request finishes, regardless of whether it finished in a failed or successful state.
  • Request completed: this event is triggered when a request completes successfully.
  • Request failed: this event is triggered when a request fails.

creating a webhook

Click here for an example using the Client Library
project_name = 'project_name_example' # str
data = ubiops.WebhookCreate(
    name="webhook-1", 
    url="https://callback-url-webhook-1.com", 
    object_type="deployment", 
    object_name="deployment-1", 
    event="deployment_request_finished"
) # WebhookCreate

# Create webhooks
api_response = core_api.webhooks_create(project_name, data)
print(api_response)

Webhook payload

The webhook payload that is sent to the configured callback URL contains the request id, request status, information about the object that the webhook was configured for, and a textual description of the event that can for example be used for Slack integration.

If the include_result parameter is set to True and the result is smaller than 16 kB, then the result of the request is included as well.

Below you can see an example webhook payload:

{
    "event": "deployment_request_completed",
    "deployment_id": "<uuid>",
    "deployment_name": "<uuid>",
    "deployment_version_id": "<uuid>",
    "deployment_version_name": "<uuid>",
    "request_id": "<uuid>",
    "status": "completed",
    "result": "...", // Only if <= 16kb
    "text": "Deployment request <uuid> finished for deployment <name> <version> in project <project-name>"
}

Headers for the webhook request

It is possible to configure additional HTTP headers for the webhook request that is made to your callback URL.

If the header contains sensitive data, you can mark that header as a secret by passing protected: True. This way the value of that header will not be visible in the WebApp and will not be returned when using the GET endpoint to retrieve the webhook details.

Below you can find an example configuration that specifies a secret header, which you can use to create a webhook via the API:

{
  "name": "webhook-1",
  "url": "https://callback-url-webhook-1.com",
  "headers": [
    {
      "key": "Authorization",
      "value": "Token 123",
      "protected": true
    }
  ],
  "object_type": "pipeline",
  "object_name": "pipeline-1",
  "version": "v1",
  "event": "pipeline_request_started",
}

Testing webhooks

A test endpoint is available which enables you to test your webhook configuration with a dummy event. You can do this in the UbiOps WebApp, either when creating a Webhook or from the details page of an existing webhook. You can also test it via the API or the Python client library.

webhook test section

Click here for an example using the Client Library
project_name = 'project_name_example' # str
data = ubiops.WebhookTestCreate(
    name="webhook-1", 
    url="https://callback-url-webhook-1.com", 
    object_type="deployment", 
    object_name="deployment-1", 
    event="deployment_request_finished"
) # WebhookTestCreate

# Create webhook tests
api_response = core_api.webhook_tests_create(project_name, data)
print(api_response)

Using webhooks for training runs

You can also use webhooks for training runs. To enable this, simply create a webhook that is tied to the training-base-deployment and points to the version with the name of your experiment. Every run within that experiment is treated as a request and therefore the webhook can listen to the same four events.

HTTP Signatures

UbiOps signs outgoing requests from its webhook servers, which allows called services to cryptographically verify the source of webhook messages. The outgoing requests are signed with HTTP signatures. The following is an example of the headers which are sent:

Signature: pyhms=:mgmf2e9wC0qf5dP3sw/THp0f611l8YApS3y/vrO/Ph5kvVBDW2jsuMLoSnQLv99D2+F2K+V6o+t4aMvdIzYLCw==:
Signature-Input: pyhms=("@method" "@authority" "@target-uri" "content-digest" "date");created=1705392742;keyid="sha512:9af50fa7a822616c33fe9bd6d7f274c0aed046ac967df015b43e674382c4a8bd75e35b3de75c9922f549cd9211fe4ed4f344c8ed8afebca36b8041479eddeff0";alg="ed25519"

A server receiving webhooks from UbiOps must specifically support HTTP Signature verification to use this feature.

Click here for an example implementation using Python's built-in web server
"""A listener for webhooks which implements HTTP signature verification"""
import argparse
import hashlib
import json
import sys

# Use the built-in HTTP library from python, which is not meant for production use but allows easy
# access to headers etc. Adapt this example to FastAPI, flask, etc. in order to fit your use case.
from http.server import BaseHTTPRequestHandler, HTTPServer

# The following add-on libraries are needed for HTTP signature verification: cryptography, requests-http-signature
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from requests import Request
from requests_http_signature import HTTPSignatureAuth, HTTPSignatureKeyResolver, VerifyResult, algorithms
from http_message_signatures.exceptions import InvalidSignature


def parse_args():
    """Parse command line arguments"""
    # Instantiate an argparse parser
    p = argparse.ArgumentParser(description=__doc__)

    # Accept a public key path with a sane default
    p.add_argument(
        '--pubkey',
        default='/run/secrets/webhooks-public-key', # this default path should work well for docker and kubernetes
        help='path of trusted PEM-encoded Ed25519 public key'
    )

    # Accept a listen address, defaulting to connections from the local machine only
    p.add_argument(
        '--ip',
        default='127.0.0.1', # by listening on loopback by default users cannot accidentally expose themselves to internet traffic
        help='IP on which to listen for HTTP connections'
    )

    # Accept a listening port number, defaulting to a non-privileged port (runnable without root)
    p.add_argument(
        '--port',
        default=8000, # by listening to port 8000 by default users are not incentivized to use sudo
        type=int, # convert inputs to int type automatically
        help='TCP port on which to listen for HTTP connections'
    )

    # Perform parsing and return the values
    return p.parse_args()


def main():
    """The entrypoint function"""
    # Parse the command line
    args = parse_args()

    # Private keys inherently contain public keys, so it is possible for a user to mistakenly
    # configure a private key as a trusted public key. We will check for this situation to
    # prevent insecure misconfigurations where a private key is unknowingly being distributed.
    with open(args.pubkey, 'rb') as fh:
        # Try to load the provided trusted key as a private key, and expect this operation to fail
        try:
            load_pem_private_key(fh.read(), password=None)
        # If loading a private key fails, this is actually a success as we do not want a private key
        except Exception:
            # This is exactly what we hoped for: loading the key as a private key failed, which means
            # that it is not a private key, which is good.
            pass
        # If a private key can be loaded the user has misconfigured the application
        else:
            # Loading as a private key did not fail, which means the user mistakenly provided a private key
            # instead of a public key. Refuse to run under these circumstances.
            raise ValueError('--pubkey must be a public key, not a private key')

    # Open the public key file in read-only mode
    with open(args.pubkey, 'rb') as fh:
        # Parse the PEM file and store it in the request handler class
        RequestHandler.public_key = load_pem_public_key(fh.read())

        # Create the necessary key resolver with the loaded public key object
        RequestHandler.key_resolver = PublicKeyResolver(RequestHandler.public_key)

        # Create the HTTP signature auth engine using the created key resolver
        RequestHandler.http_signature_auth = HTTPSignatureAuth(
            key_id=None,
            signature_algorithm=algorithms.ED25519,
            key_resolver=RequestHandler.key_resolver
        )

    # Build the listen address as a tuple of IP address and TCP port.
    listen_address = tuple([args.ip, args.port])

    # Instantiate a web server
    # Caution: Not using TLS increases the likelihood of replay attacks using valid webhook requests
    server = HTTPServer(listen_address, RequestHandler)

    # Print a log message
    print(f'Listening on {args.ip}:{args.port}')

    # Listen and service incoming HTTP requests
    try:
        # This function never returns but can be interrupted
        server.serve_forever()
    # Handle ctrl-c and other service stop requests
    except KeyboardInterrupt:
        # Exit code 130 is the standard exit code for SIGINT (e.g. ctrl-c) interrupted POSIX applications
        return 130


class RequestHandler(BaseHTTPRequestHandler):
    """Handle incoming HTTP requests"""
    # Define class attributes for the HTTP signature related objects
    http_signature_auth: HTTPSignatureAuth = None
    public_key: Ed25519PublicKey = None
    key_resolver: HTTPSignatureKeyResolver = None

    def do_HEAD(self):
        """Handle HEAD requests for docker's HEALTHCHECK"""
        # Respond 204 No Content, which is like 200 OK but with no response body
        self.send_response_only(204)
        self.end_headers()

    def do_POST(self):
        """Handle incoming POST requests"""
        # Read and print the headers
        print(f"{self.headers}")

        # Check how much data was POSTed
        content_length = int(self.headers.get("Content-Length", 0))

        # Read the request body
        unverified_body = self.rfile.read(content_length)

        # Create a requests library prepared request to match the incoming request
        plain_request = Request(
            method="POST",
            url=f"http://{self.headers['host']}{self.path}",
            headers=self.headers
        )
        prepared_request = plain_request.prepare()
        prepared_request.body = unverified_body
        for key, value in self.headers.items():
            prepared_request.headers[key] = value

        # Perform verification of the incoming request's signature
        try:
            verify_result: VerifyResult = self.http_signature_auth.verify(
                message=prepared_request,
                signature_algorithm=algorithms.ED25519,
                key_resolver=self.key_resolver
            )

        # Handle HTTP signature verification failures
        except InvalidSignature as e:
            print(f"Bad signature: {str(e)}")
            self.send_response_only(403)
            self.end_headers()
            return

        # Handle successful HTTP signature verification
        else:
            # Build the dict of headers which can be trusted
            verified_headers = {
                # Strip the double quotes from the covered_components
                key.strip('"'): self.headers[key.strip('"')]

                # Loop over the covered components of the signature, ignoring meta-headers (e.g. @method)
                for key in verify_result.covered_components if not key.startswith('"@')
            }

            # Ensure the Content-Digest header is provided and protected by the HTTP signature
            if 'content-digest' not in verified_headers:
                print(f"Bad signature: Content-Digest header must be a covered signature component")
                self.send_response_only(403)
                self.end_headers()
                return

            # Ensure the Date header is provided and protected by the HTTP signature
            if 'date' not in verified_headers:
                print(f"Bad signature: Date header must be a covered signature component")
                self.send_response_only(403)
                self.end_headers()
                return
            else:
                # TODO: If you have a reliable time source it is a good idea to check the nearness of the Date
                # header in order to reduce the chances of a replay attack.
                pass

            # Print the verified headers and verified body
            print(f"Verified headers: {verified_headers!r}")
            print(f"Verified body: {verify_result.body!r}")

        # Try to parse the verified body as JSON
        try:
            parsed = json.loads(verify_result.body)

        # If the JSON parsing fails then just print the verified body as it is
        except Exception:
            print(verify_result.body)

        # If the JSON parsing succeeded then pretty print the JSON
        else:
            # Pretty print the JSON
            print("Parsed verified body: " + json.dumps(parsed, indent=2))

        # Respond with 204 No Content
        self.send_response_only(204)
        self.end_headers()


class PublicKeyResolver(HTTPSignatureKeyResolver):
    """
    A utility class for use with the requests_http_signature library
    """
    def __init__(self, key):
        self.key = key

    def resolve_public_key(self, key_id):
        # Ensure the specified key ID matches the configured key
        assert key_id == self._key_id()

        # Return the key object
        return self.key

    def _key_id(self):
        # Get the raw publicly shareable key bytes
        public_bytes = self.key.public_bytes_raw()

        # Calculate a secure digest of those bytes as the key id
        return "sha512:" + hashlib.sha512(public_bytes).hexdigest()


# Map the entry point function
if __name__ == '__main__':
    sys.exit(main())