Webhooks¶
Webhooks enable you to listen for events in your project, so you can automatically trigger follow-up actions in the rest of your stack. For example, you can trigger an internal service when a deployment finishes a request, or you can automatically notify your team members through Slack each time a deployment or pipeline request fails.
Configuring a webhook¶
Webhooks in UbiOps can be configured through the UbiOps WebApp, Python client library and API. Each webhook needs to be tied to a specific object in your project (deployment, pipeline, experiment). Webhooks can be found in the WebApp in the Monitoring section.
A webhook can listen to one of the following four events:
- Request started: this event is triggered when a request to the configured object changes to status
processing
. - Request finished: this event is triggered when a request finishes, regardless of whether it finished in a failed or successful state.
- Request completed: this event is triggered when a request completes successfully.
- Request failed: this event is triggered when a request fails.
When configuring a webhook via our API, events are restricted to the names of the four event types started
, finished
, completed
or failed
, prefixed with either pipeline_request
or deployment_request
. So for an event where you want to listen to failed pipeline requests, the event will be pipeline_request_failed
.
Click here for an example of creating a webhook through the Client Library
project_name = 'project_name_example' # str
data = ubiops.WebhookCreate(
name="webhook-1",
url="https://callback-url-webhook-1.com",
object_type="deployment",
object_name="deployment-1",
event="deployment_request_finished"
) # WebhookCreate
# Create webhooks
api_response = core_api.webhooks_create(project_name, data)
print(api_response)
Webhook payload¶
The webhook payload sent to the configured callback URL includes the request ID, request status, details about the object the webhook was set up for, and a textual description of the event. This description can be used in communication channels like Slack or Teams. The payload is delivered via an HTTP POST request to the specified URL. The request status can be any of the available request statuses.
If the include_result
parameter is set to True
and the result is smaller than 16 kB, then the result of the request is included as well.
Below you can see an example webhook payload:
{
"event": "deployment_request_completed",
"deployment_id": "<uuid>",
"deployment_name": "<uuid>",
"deployment_version_id": "<uuid>",
"deployment_version_name": "<uuid>",
"request_id": "<uuid>",
"status": "completed",
"result": "...", // !!! Only if <= 16kb
"text": "Deployment request <uuid> finished for deployment <name> <version> in project <project-name>"
}
Headers for the webhook request¶
It is possible to configure additional HTTP headers for the webhook request that is made to your callback URL.
If the header contains sensitive data, you can mark that header as a secret by passing protected: True
. This way the value of that header will not be visible in the WebApp and will not be returned when using the GET endpoint to retrieve the webhook details.
Below you can find an example configuration that specifies a secret header, which you can use to create a webhook via the API:
{
"name": "webhook-1",
"url": "https://callback-url-webhook-1.com",
"headers": [
{
"key": "Authorization",
"value": "Token 123",
"protected": true
}
],
"object_type": "pipeline",
"object_name": "pipeline-1",
"version": "v1",
"event": "pipeline_request_started",
}
Testing webhooks¶
A test endpoint is available which enables you to test your webhook configuration with a dummy event. You can do this in the UbiOps WebApp, either when creating a Webhook or from the details page of an existing webhook. Ensure that the receiving server is able to handle the webhook test request format. The actual webhook request follows the exact same format as the webhook test request (see an example below). You can also test it via the API or the Python client library.
Testing webhooks with output fields that include datatype file
When testing your webhook with output fields that include datatype file, we provide a dummy URI that does not link to an existing file. To test the webhook, ensure to catch the case where the URI does not link to an existing file. Otherwise the test will fail.
Click here for more information on the test request
The dummy event involves sending a HTTP POST request sent to the specified webhook URL, This request includes all relevant headers, with the body containing sample data for testing purposes.
A dummy event is shown below, where the dummy event has the following configuration:
Event Trigger
: Request completed (successful)Object Selection
: DeploymentDeployment configuration
:- Input:
input_int
: Integerinput_str
: String
- Output:
output_int
: Integeroutput_str
: String
- Input:
Headers
:sample_key_1
:sample_value_1
Payload
: Yes
The dummy event request headers are as follows:
POST / HTTP/1.1
Host: ...
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Digest: sha-256=:...=:
Content-Length: 511
Content-Type: application/json
Date: <Current GMT time>
Sample_key_1: sample_value_1
Signature: pyhms=:...==:
Signature-Input: pyhms=("@method" "@authority" "@target-uri" "date" "content-digest");created=...;keyid="sha512:9af50fa7a822616c33fe9bd6d7f274c0aed046ac967df015b43e674382c4a8bd75e35b3de75c9922f549cd9211fe4ed4f344c8ed8afebca36b8041479eddeff0";alg="ed25519"
User-Agent: UbiOps Webhook
The dummy event request body is the same as the request body in the webhook payload section, except that the result
field will be filled with sample values. An example of the result
field with sample values for the previously outlined deployment configuration is shown below:
{
"result": {
"output_int": 1,
"output_str": "test"
}
}
Click here for an example of testing a webhook using the Client Library
project_name = 'project_name_example' # str
data = ubiops.WebhookTestCreate(
name="webhook-1",
url="https://callback-url-webhook-1.com",
object_type="deployment",
object_name="deployment-1",
event="deployment_request_finished"
) # WebhookTestCreate
# Create webhook tests
api_response = core_api.webhook_tests_create(project_name, data)
print(api_response)
Webhook retry logic¶
It is possible to enable a retry mechanism for webhooks. This means that if the webhook request fails (either when receiving a 4xx
or a 5xx
code, or when no response is returned after the the timeout period), UbiOps will retry the request immediately once more. If this request fails again, no further retries will be attempted.
Using webhooks for training runs¶
You can also use webhooks for training runs. To enable this, simply create a webhook that is tied to the training-base-deployment
and points to the version with the name of your experiment. Every run within that experiment is treated as a request and therefore the webhook can listen to the same four events.
HTTP Signatures¶
UbiOps signs outgoing requests from its webhook servers, which allows called services to cryptographically verify the source of webhook messages. The outgoing requests are signed with HTTP signatures. The following is an example of the headers which are sent:
Signature: pyhms=:mgmf2e9wC0qf5dP3sw/THp0f611l8YApS3y/vrO/Ph5kvVBDW2jsuMLoSnQLv99D2+F2K+V6o+t4aMvdIzYLCw==:
Signature-Input: pyhms=("@method" "@authority" "@target-uri" "content-digest" "date");created=1705392742;keyid="sha512:9af50fa7a822616c33fe9bd6d7f274c0aed046ac967df015b43e674382c4a8bd75e35b3de75c9922f549cd9211fe4ed4f344c8ed8afebca36b8041479eddeff0";alg="ed25519"
A server receiving webhooks from UbiOps must specifically support HTTP Signature verification to use this feature.
Public Key
The following public key is used to verify the signature (PEM format):-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAZc9ffGBYWDhRUVZUY433NwzS9yKmH9gfvN7MbJlDqxY=
-----END PUBLIC KEY-----
Click here for an example implementation using Python's built-in web server
"""A listener for webhooks which implements HTTP signature verification"""
import argparse
import hashlib
import json
import sys
# Use the built-in HTTP library from python, which is not meant for production use but allows easy
# access to headers etc. Adapt this example to FastAPI, flask, etc. in order to fit your use case.
from http.server import BaseHTTPRequestHandler, HTTPServer
# The following add-on libraries are needed for HTTP signature verification: cryptography, requests-http-signature
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from requests import Request
from requests_http_signature import HTTPSignatureAuth, HTTPSignatureKeyResolver, VerifyResult, algorithms
from http_message_signatures.exceptions import InvalidSignature
def parse_args():
"""Parse command line arguments"""
# Instantiate an argparse parser
p = argparse.ArgumentParser(description=__doc__)
# Accept a public key path with a sane default
p.add_argument(
'--pubkey',
default='/run/secrets/webhooks-public-key', # this default path should work well for docker and kubernetes
help='path of trusted PEM-encoded Ed25519 public key'
)
# Accept a listen address, defaulting to connections from the local machine only
p.add_argument(
'--ip',
default='127.0.0.1', # by listening on loopback by default users cannot accidentally expose themselves to internet traffic
help='IP on which to listen for HTTP connections'
)
# Accept a listening port number, defaulting to a non-privileged port (runnable without root)
p.add_argument(
'--port',
default=8000, # by listening to port 8000 by default users are not incentivized to use sudo
type=int, # convert inputs to int type automatically
help='TCP port on which to listen for HTTP connections'
)
# Perform parsing and return the values
return p.parse_args()
def main():
"""The entrypoint function"""
# Parse the command line
args = parse_args()
# Private keys inherently contain public keys, so it is possible for a user to mistakenly
# configure a private key as a trusted public key. We will check for this situation to
# prevent insecure misconfigurations where a private key is unknowingly being distributed.
with open(args.pubkey, 'rb') as fh:
# Try to load the provided trusted key as a private key, and expect this operation to fail
try:
load_pem_private_key(fh.read(), password=None)
# If loading a private key fails, this is actually a success as we do not want a private key
except Exception:
# This is exactly what we hoped for: loading the key as a private key failed, which means
# that it is not a private key, which is good.
pass
# If a private key can be loaded the user has misconfigured the application
else:
# Loading as a private key did not fail, which means the user mistakenly provided a private key
# instead of a public key. Refuse to run under these circumstances.
raise ValueError('--pubkey must be a public key, not a private key')
# Open the public key file in read-only mode
with open(args.pubkey, 'rb') as fh:
# Parse the PEM file and store it in the request handler class
RequestHandler.public_key = load_pem_public_key(fh.read())
# Create the necessary key resolver with the loaded public key object
RequestHandler.key_resolver = PublicKeyResolver(RequestHandler.public_key)
# Create the HTTP signature auth engine using the created key resolver
RequestHandler.http_signature_auth = HTTPSignatureAuth(
key_id=None,
signature_algorithm=algorithms.ED25519,
key_resolver=RequestHandler.key_resolver
)
# Build the listen address as a tuple of IP address and TCP port.
listen_address = tuple([args.ip, args.port])
# Instantiate a web server
# Caution: Not using TLS increases the likelihood of replay attacks using valid webhook requests
server = HTTPServer(listen_address, RequestHandler)
# Print a log message
print(f'Listening on {args.ip}:{args.port}')
# Listen and service incoming HTTP requests
try:
# This function never returns but can be interrupted
server.serve_forever()
# Handle ctrl-c and other service stop requests
except KeyboardInterrupt:
# Exit code 130 is the standard exit code for SIGINT (e.g. ctrl-c) interrupted POSIX applications
return 130
class RequestHandler(BaseHTTPRequestHandler):
"""Handle incoming HTTP requests"""
# Define class attributes for the HTTP signature related objects
http_signature_auth: HTTPSignatureAuth = None
public_key: Ed25519PublicKey = None
key_resolver: HTTPSignatureKeyResolver = None
def do_HEAD(self):
"""Handle HEAD requests for docker's HEALTHCHECK"""
# Respond 204 No Content, which is like 200 OK but with no response body
self.send_response_only(204)
self.end_headers()
def do_POST(self):
"""Handle incoming POST requests"""
# Read and print the headers
print(f"{self.headers}")
# Check how much data was POSTed
content_length = int(self.headers.get("Content-Length", 0))
# Read the request body
unverified_body = self.rfile.read(content_length)
# Create a requests library prepared request to match the incoming request
plain_request = Request(
method="POST",
url=f"http://{self.headers['host']}{self.path}",
headers=self.headers
)
prepared_request = plain_request.prepare()
prepared_request.body = unverified_body
for key, value in self.headers.items():
prepared_request.headers[key] = value
# Perform verification of the incoming request's signature
try:
verify_result: VerifyResult = self.http_signature_auth.verify(
message=prepared_request,
signature_algorithm=algorithms.ED25519,
key_resolver=self.key_resolver
)
# Handle HTTP signature verification failures
except InvalidSignature as e:
print(f"Bad signature: {str(e)}")
self.send_response_only(403)
self.end_headers()
return
# Handle successful HTTP signature verification
else:
# Build the dict of headers which can be trusted
verified_headers = {
# Strip the double quotes from the covered_components
key.strip('"'): self.headers[key.strip('"')]
# Loop over the covered components of the signature, ignoring meta-headers (e.g. @method)
for key in verify_result.covered_components if not key.startswith('"@')
}
# Ensure the Content-Digest header is provided and protected by the HTTP signature
if 'content-digest' not in verified_headers:
print(f"Bad signature: Content-Digest header must be a covered signature component")
self.send_response_only(403)
self.end_headers()
return
# Ensure the Date header is provided and protected by the HTTP signature
if 'date' not in verified_headers:
print(f"Bad signature: Date header must be a covered signature component")
self.send_response_only(403)
self.end_headers()
return
else:
# TODO: If you have a reliable time source it is a good idea to check the nearness of the Date
# header in order to reduce the chances of a replay attack.
pass
# Print the verified headers and verified body
print(f"Verified headers: {verified_headers!r}")
print(f"Verified body: {verify_result.body!r}")
# Try to parse the verified body as JSON
try:
parsed = json.loads(verify_result.body)
# If the JSON parsing fails then just print the verified body as it is
except Exception:
print(verify_result.body)
# If the JSON parsing succeeded then pretty print the JSON
else:
# Pretty print the JSON
print("Parsed verified body: " + json.dumps(parsed, indent=2))
# Respond with 204 No Content
self.send_response_only(204)
self.end_headers()
class PublicKeyResolver(HTTPSignatureKeyResolver):
"""
A utility class for use with the requests_http_signature library
"""
def __init__(self, key):
self.key = key
def resolve_public_key(self, key_id):
# Ensure the specified key ID matches the configured key
assert key_id == self._key_id()
# Return the key object
return self.key
def _key_id(self):
# Get the raw publicly shareable key bytes
public_bytes = self.key.public_bytes_raw()
# Calculate a secure digest of those bytes as the key id
return "sha512:" + hashlib.sha512(public_bytes).hexdigest()
# Map the entry point function
if __name__ == '__main__':
sys.exit(main())