Skip to content

vLLM Metrics

UbiOps provides a built-in metrics system that allows you to collect, store, and visualize custom metrics from your deployments. Alongside the default metrics, you can also add your own custom metrics. This how-to provides instructions on how vLLM metrics can be integrated into the UbiOps metrics system, allowing you to monitor the performance and usage of your vLLM server directly from the UbiOps web interface!

First, we will register which metrics we want to collect from the vLLM server in UbiOps. Next, code will be provided that runs a process to periodically collect metrics from the vLLM server and send our metrics from our deployment to UbiOps.

Prerequisites

Before you start, make sure you have a deployment running a vLLM server. You can use the Deploy vLLM server tutorial to set up a deployment running a vLLM server.

Collecting metrics

To collect metrics from the vLLM server, we will use the /metrics endpoint provided by vLLM. This endpoint provides various metrics about the server, such as request counts, latencies, and resource usage. In this how-to, we will focus on the following metrics:

  • vllm:num_requests_running: The number of requests currently being processed by the server.

  • vllm:num_requests_waiting: The number of requests currently waiting in the queue

  • vllm:prompt_tokens_total: The total number of prompt tokens processed by the server.

  • vllm:generation_tokens_total: The total number of generation tokens produced by the server.

Available metrics

For a full list of available metrics, see the vLLM metrics documentation.

Registering metrics in UbiOps

Before we can start sending metrics to UbiOps, we need to define the metrics in the UbiOps project. For an explanation of how to create custom metrics in UbiOps, see the UbiOps documentation. We will register the following metrics in UbiOps:

Name Type
custom.vllm_num_requests_running Gauge
custom.vllm_num_requests_waiting Gauge
custom.vllm_prompt_tokens_total Delta
custom.vllm_generation_tokens_total Delta

Note that Unit is optional and can be left empty and Metric Level should be kept unchanged as Deployment.

Code

We will create a workflow that periodically collects these metrics and pushes them to UbiOps. UbiOps aggregates metrics on a per-minute basis. To increase accuracy however, we will collect and push metrics every 10 seconds (but you can adjust this interval as needed). This script will run in a separate process alongside the main deployment process. The following 2 Python files are needed to create this workflow (and will thus be added to the deployment package ):

  • metric_helpers.py: This file contains helper functions to parse the metrics from the /metrics endpoint.

  • collect_metrics.py: This file contains the main logic to collect metrics and send them to UbiOps.

Click to expand code snippets metric_helpers.py
import logging
import re

_LABEL_RE = re.compile(r'(\w+)\s*=\s*"((?:[^"\\]|\\.)*)"')

def parse_labels(label_block: str) -> dict:
    """
    Parse a Prometheus-style label block (e.g., '{key="value"}') into a dictionary.
    """

    if not label_block.startswith("{") or not label_block.endswith("}"):
        logging.debug(f"Invalid label block format: {label_block}")
        return {}
    labels = {}
    for match in _LABEL_RE.finditer(label_block[1:-1]):
        key, raw_val = match.groups()
        val = bytes(raw_val, "utf-8").decode("unicode_escape")
        labels[key] = val
    return labels

def parse_lines_to_dict(lines):
    """
    Convert metric lines (Prometheus-style) into a structured dictionary.
    """

    metrics = {}
    for raw in lines:
        line = raw.strip()
        if not line or line.startswith("#"):
            continue

        parts = line.split()
        if len(parts) < 2:
            logging.debug(f"Too few parts in metric line: {line}")
            continue

        name_and_labels = parts[0]
        value_txt = parts[1]

        try:
            metric_value = float(value_txt)
        except ValueError:
            logging.debug(f"Skipping: could not parse value in metric line: {line}")
            continue

        # Split metric name and optional label block
        if "{" in name_and_labels:
            metric_name, rest = name_and_labels.split("{", 1)
            label_block = "{" + rest.split("}", 1)[0] + "}"
            labels = parse_labels(label_block)
            logging.debug(f"Parsed labels: {labels}")
        else:
            metric_name = name_and_labels
            labels = {}

        # Sort labels by key to ensure consistent ordering
        ordered_labels = dict(sorted(labels.items()))

        metrics.setdefault(metric_name, []).append({
            "labels": ordered_labels,
            "value": metric_value
        })

    return metrics
collect_metrics.py
import time
import logging
import sys
from typing import Callable, Dict, Optional, List

import requests
from ubiops.utils.metrics import MetricClient

from metric_helpers import parse_lines_to_dict

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")


class MetricsReporter:
    """
    Periodically fetches Prometheus-style metrics from a vLLM endpoint and
    pushes selected custom metrics to UbiOps Metrics.
    """

    # URL of the vLLM metrics endpoint
    DEFAULT_URL = "http://0.0.0.0:8000/metrics"

    GAUGE_METRICS = [
        "vllm:num_requests_running",
        "vllm:num_requests_waiting"
    ]

    COUNTER_METRICS = [
        "vllm:prompt_tokens_total",
        "vllm:generation_tokens_total",
    ]

    def __init__(
            self,
            project_name: str,
            deployment_version_id: str,
            url: str = DEFAULT_URL,
            interval_seconds: int = 10,
            request_timeout: int = 5
    ):
        self.project_name = project_name
        self.deployment_version_id = deployment_version_id
        self.url = url
        self.interval_seconds = interval_seconds
        self.request_timeout = request_timeout

        self.last_counter_metric_values = {}

        self.metric_client = MetricClient(project_name=self.project_name)

    @staticmethod
    def _parse_float_metric(lines: List[str], metric_prefix: str) -> Optional[float]:
        for line in lines:
            if line.startswith(metric_prefix):
                try:
                    return float(line.split()[-1])
                except ValueError:
                    return None
        return None

    def _fetch_lines(self) -> Optional[List[str]]:
        try:
            resp = requests.get(self.url, timeout=self.request_timeout)
        except requests.exceptions.RequestException as e:
            logging.error(f"Failed to fetch metrics from {self.url}: {e}")
            return None

        if resp.status_code != 200:
            logging.error(f"Unexpected status code {resp.status_code} from {self.url}")
            return None

        return resp.text.splitlines()

    def collect_selected_metrics(self):
        """
        Returns a dict like:
        {
            "time_to_first_token_seconds": 0.123,
            "num_requests_running": 2.0,
            "num_requests_waiting": 1.0
        }
        Values may be None if parsing failed or metric not present.
        """
        metrics_dict = self.collect_all_as_dict()
        if not metrics_dict:
            logging.warning("No metrics collected from the endpoint.")
            return {}

        result = {}
        for name in self.GAUGE_METRICS:
            try:
                result[name] = metrics_dict[name][0]["value"]
            except (KeyError, IndexError):
                logging.warning(f"Gauge metric {name} not found in collected metrics.")

        for name in self.COUNTER_METRICS:
            last_value = self.last_counter_metric_values.get(name, 0)
            try:
                current_value = metrics_dict[name][0]["value"]
            except (KeyError, IndexError):
                logging.warning(f"Counter metric {name} not found in collected metrics.")
                continue

            delta = current_value - last_value
            if delta < 0:
                logging.warning(f"Counter metric {name} decreased, unknown reason.")
                continue
            result[name] = delta
            self.last_counter_metric_values[name] = current_value
        return result

    def collect_all_as_dict(self) -> Dict[str, list]:
        """
        Full parse of the endpoint into a Prometheus-like dict using metric_helpers.
        Not currently pushed anywhere; kept for debugging/extensions.
        """
        lines = self._fetch_lines()
        if lines is None:
            return {}
        return parse_lines_to_dict(lines)

    def report_once(self) -> None:
        """
        Fetch + push the selected metrics once.
        """
        metrics = self.collect_selected_metrics()
        if not metrics:
            logging.warning("No metrics collected this cycle.")
            return

        for name, value in metrics.items():
            if value is None:
                continue
            try:
                self.metric_client.log_metric(
                    metric_name=f"custom.{name.replace(":", "_")}",
                    labels={"deployment_version_id": self.deployment_version_id},
                    value=value,
                )
            except Exception as e:
                logging.error(f"Failed to log metric {name}: {e}")

    def run_forever(self) -> None:
        """
        Start the MetricClient and loop forever at the configured interval.
        """
        self.metric_client.start()
        logging.info(
            f"Starting metrics reporter (interval={self.interval_seconds}s, url={self.url})"
        )
        while True:
            self.report_once()
            time.sleep(self.interval_seconds)

def main(argv: List[str]) -> int:
    """
    Entry point for running the metrics reporter as a script.
    Expects 2 arguments: <project_name> <deployment_version_id>
    """
    if len(argv) != 3:
        print(f"Usage: python {argv[0]} <project_name> <deployment_version_id>")
        return 1

    project_name = argv[1]
    deployment_version_id = argv[2]

    reporter = MetricsReporter(
        project_name=project_name,
        deployment_version_id=deployment_version_id,
        url=MetricsReporter.DEFAULT_URL,
        interval_seconds=10,
    )
    reporter.run_forever()
    return 0


if __name__ == "__main__":
    sys.exit(main(sys.argv))

Add different metrics

In order to add different metrics in the code, add these metrics to the COUNTER_METRICS or GAUGE_METRICS list in collect_metrics.py, depending on the metric type.

Running the metrics reporter

We run the metrics reporter in a separate process alongside the main deployment process upon initialization of the deployment. This will cause the metric reporter to run indefinitely while an instance is running. We will also ensure that the metrics reporter process is terminated when the deployment instance is shut down. First, we define two functions to start and stop the metrics reporter process. Secondly, we show how to integrate these functions into the Deployment class in the deployment.py file.

Functions

The following functions can be added to our deployment.py file:

import subprocess
import logging

def start_metrics_reporting(project, version_id):
    """
    Start the metrics reporting subprocess.
    """
    logging.error(f"Starting metrics reporting for project={project}, version_id={version_id}")
    return subprocess.Popen(
        [
            "python3", "collect_metrics.py",
            project,
            version_id,
        ]
    )

def stop_metrics_reporting(metrics_process):
    """
    Stop the metrics reporting subprocess.
    """
    if metrics_process and metrics_process.poll() is None:
        logging.info("Stopping metrics reporting subprocess...")
        metrics_process.terminate()
        metrics_process.wait()

Note that the metrics_process argument of stop_metrics_reporting is the return value of start_metrics_reporting.
The previously mentioned functions can be integrated into your __init__ function in your Deployment class in the deployment.py file in the following way:

Integration

We can integrate the aformentioned functions into the Deployment class in the deployment.py file as follows:

import atexit


class Deployment:
    def __init__(self, context):
        # Your existing initialization code here...

        if int(context["process_id"]) == 0:
            # Only register the metrics reporter in one process
            self.metrics_process = start_metrics_reporting(
                project=context["project"],
                version_id=context["version_id"]
            )
            atexit.register(stop_metrics_reporting, self.metrics_process)

    def __del__(self):
        stop_metrics_reporting(self.metrics_process)

Now, when you deploy your deployment, the metrics reporter will start in a separate process and will periodically collect and send metrics to UbiOps. It will also be properly terminated when the deployment instance is shut down.

That's all there is to integrate vLLM metrics into UbiOps!
If you run into any issues though, do not hesitate to contact support.