vLLM Metrics¶
UbiOps provides a built-in metrics system that allows you to collect, store, and visualize custom metrics from your deployments. Alongside the default metrics, you can also add your own custom metrics. This how-to provides instructions on how vLLM metrics can be integrated into the UbiOps metrics system, allowing you to monitor the performance and usage of your vLLM server directly from the UbiOps web interface!
First, we will register which metrics we want to collect from the vLLM server in UbiOps. Next, code will be provided that runs a process to periodically collect metrics from the vLLM server and send our metrics from our deployment to UbiOps.
Prerequisites¶
Before you start, make sure you have a deployment running a vLLM server. You can use the Deploy vLLM server tutorial to set up a deployment running a vLLM server.
Collecting metrics¶
To collect metrics from the vLLM server, we will use the /metrics endpoint provided by vLLM. This endpoint provides various metrics about the server, such as request counts, latencies, and resource usage. In this how-to, we will focus on the following metrics:
-
vllm:num_requests_running: The number of requests currently being processed by the server. -
vllm:num_requests_waiting: The number of requests currently waiting in the queue -
vllm:prompt_tokens_total: The total number of prompt tokens processed by the server. -
vllm:generation_tokens_total: The total number of generation tokens produced by the server.
Available metrics
For a full list of available metrics, see the vLLM metrics documentation.
Registering metrics in UbiOps¶
Before we can start sending metrics to UbiOps, we need to define the metrics in the UbiOps project. For an explanation of how to create custom metrics in UbiOps, see the UbiOps documentation. We will register the following metrics in UbiOps:
| Name | Type |
|---|---|
| custom.vllm_num_requests_running | Gauge |
| custom.vllm_num_requests_waiting | Gauge |
| custom.vllm_prompt_tokens_total | Delta |
| custom.vllm_generation_tokens_total | Delta |
Note that Unit is optional and can be left empty and Metric Level should be kept unchanged as Deployment.
Code¶
We will create a workflow that periodically collects these metrics and pushes them to UbiOps. UbiOps aggregates metrics on a per-minute basis. To increase accuracy however, we will collect and push metrics every 10 seconds (but you can adjust this interval as needed). This script will run in a separate process alongside the main deployment process. The following 2 Python files are needed to create this workflow (and will thus be added to the deployment package ):
-
metric_helpers.py: This file contains helper functions to parse the metrics from the/metricsendpoint. -
collect_metrics.py: This file contains the main logic to collect metrics and send them to UbiOps.
Click to expand code snippets
metric_helpers.pyimport logging
import re
_LABEL_RE = re.compile(r'(\w+)\s*=\s*"((?:[^"\\]|\\.)*)"')
def parse_labels(label_block: str) -> dict:
"""
Parse a Prometheus-style label block (e.g., '{key="value"}') into a dictionary.
"""
if not label_block.startswith("{") or not label_block.endswith("}"):
logging.debug(f"Invalid label block format: {label_block}")
return {}
labels = {}
for match in _LABEL_RE.finditer(label_block[1:-1]):
key, raw_val = match.groups()
val = bytes(raw_val, "utf-8").decode("unicode_escape")
labels[key] = val
return labels
def parse_lines_to_dict(lines):
"""
Convert metric lines (Prometheus-style) into a structured dictionary.
"""
metrics = {}
for raw in lines:
line = raw.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) < 2:
logging.debug(f"Too few parts in metric line: {line}")
continue
name_and_labels = parts[0]
value_txt = parts[1]
try:
metric_value = float(value_txt)
except ValueError:
logging.debug(f"Skipping: could not parse value in metric line: {line}")
continue
# Split metric name and optional label block
if "{" in name_and_labels:
metric_name, rest = name_and_labels.split("{", 1)
label_block = "{" + rest.split("}", 1)[0] + "}"
labels = parse_labels(label_block)
logging.debug(f"Parsed labels: {labels}")
else:
metric_name = name_and_labels
labels = {}
# Sort labels by key to ensure consistent ordering
ordered_labels = dict(sorted(labels.items()))
metrics.setdefault(metric_name, []).append({
"labels": ordered_labels,
"value": metric_value
})
return metrics
import time
import logging
import sys
from typing import Callable, Dict, Optional, List
import requests
from ubiops.utils.metrics import MetricClient
from metric_helpers import parse_lines_to_dict
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
class MetricsReporter:
"""
Periodically fetches Prometheus-style metrics from a vLLM endpoint and
pushes selected custom metrics to UbiOps Metrics.
"""
# URL of the vLLM metrics endpoint
DEFAULT_URL = "http://0.0.0.0:8000/metrics"
GAUGE_METRICS = [
"vllm:num_requests_running",
"vllm:num_requests_waiting"
]
COUNTER_METRICS = [
"vllm:prompt_tokens_total",
"vllm:generation_tokens_total",
]
def __init__(
self,
project_name: str,
deployment_version_id: str,
url: str = DEFAULT_URL,
interval_seconds: int = 10,
request_timeout: int = 5
):
self.project_name = project_name
self.deployment_version_id = deployment_version_id
self.url = url
self.interval_seconds = interval_seconds
self.request_timeout = request_timeout
self.last_counter_metric_values = {}
self.metric_client = MetricClient(project_name=self.project_name)
@staticmethod
def _parse_float_metric(lines: List[str], metric_prefix: str) -> Optional[float]:
for line in lines:
if line.startswith(metric_prefix):
try:
return float(line.split()[-1])
except ValueError:
return None
return None
def _fetch_lines(self) -> Optional[List[str]]:
try:
resp = requests.get(self.url, timeout=self.request_timeout)
except requests.exceptions.RequestException as e:
logging.error(f"Failed to fetch metrics from {self.url}: {e}")
return None
if resp.status_code != 200:
logging.error(f"Unexpected status code {resp.status_code} from {self.url}")
return None
return resp.text.splitlines()
def collect_selected_metrics(self):
"""
Returns a dict like:
{
"time_to_first_token_seconds": 0.123,
"num_requests_running": 2.0,
"num_requests_waiting": 1.0
}
Values may be None if parsing failed or metric not present.
"""
metrics_dict = self.collect_all_as_dict()
if not metrics_dict:
logging.warning("No metrics collected from the endpoint.")
return {}
result = {}
for name in self.GAUGE_METRICS:
try:
result[name] = metrics_dict[name][0]["value"]
except (KeyError, IndexError):
logging.warning(f"Gauge metric {name} not found in collected metrics.")
for name in self.COUNTER_METRICS:
last_value = self.last_counter_metric_values.get(name, 0)
try:
current_value = metrics_dict[name][0]["value"]
except (KeyError, IndexError):
logging.warning(f"Counter metric {name} not found in collected metrics.")
continue
delta = current_value - last_value
if delta < 0:
logging.warning(f"Counter metric {name} decreased, unknown reason.")
continue
result[name] = delta
self.last_counter_metric_values[name] = current_value
return result
def collect_all_as_dict(self) -> Dict[str, list]:
"""
Full parse of the endpoint into a Prometheus-like dict using metric_helpers.
Not currently pushed anywhere; kept for debugging/extensions.
"""
lines = self._fetch_lines()
if lines is None:
return {}
return parse_lines_to_dict(lines)
def report_once(self) -> None:
"""
Fetch + push the selected metrics once.
"""
metrics = self.collect_selected_metrics()
if not metrics:
logging.warning("No metrics collected this cycle.")
return
for name, value in metrics.items():
if value is None:
continue
try:
self.metric_client.log_metric(
metric_name=f"custom.{name.replace(":", "_")}",
labels={"deployment_version_id": self.deployment_version_id},
value=value,
)
except Exception as e:
logging.error(f"Failed to log metric {name}: {e}")
def run_forever(self) -> None:
"""
Start the MetricClient and loop forever at the configured interval.
"""
self.metric_client.start()
logging.info(
f"Starting metrics reporter (interval={self.interval_seconds}s, url={self.url})"
)
while True:
self.report_once()
time.sleep(self.interval_seconds)
def main(argv: List[str]) -> int:
"""
Entry point for running the metrics reporter as a script.
Expects 2 arguments: <project_name> <deployment_version_id>
"""
if len(argv) != 3:
print(f"Usage: python {argv[0]} <project_name> <deployment_version_id>")
return 1
project_name = argv[1]
deployment_version_id = argv[2]
reporter = MetricsReporter(
project_name=project_name,
deployment_version_id=deployment_version_id,
url=MetricsReporter.DEFAULT_URL,
interval_seconds=10,
)
reporter.run_forever()
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))
Add different metrics
In order to add different metrics in the code, add these metrics to the COUNTER_METRICS or GAUGE_METRICS list in collect_metrics.py, depending on the metric type.
Running the metrics reporter¶
We run the metrics reporter in a separate process alongside the main deployment process upon initialization of the deployment. This will cause the metric reporter to run indefinitely while an instance is running. We will also ensure that the metrics reporter process is terminated when the deployment instance is shut down. First, we define two functions to start and stop the metrics reporter process. Secondly, we show how to integrate these functions into the Deployment class in the deployment.py file.
Functions¶
The following functions can be added to our deployment.py file:
import subprocess
import logging
def start_metrics_reporting(project, version_id):
"""
Start the metrics reporting subprocess.
"""
logging.error(f"Starting metrics reporting for project={project}, version_id={version_id}")
return subprocess.Popen(
[
"python3", "collect_metrics.py",
project,
version_id,
]
)
def stop_metrics_reporting(metrics_process):
"""
Stop the metrics reporting subprocess.
"""
if metrics_process and metrics_process.poll() is None:
logging.info("Stopping metrics reporting subprocess...")
metrics_process.terminate()
metrics_process.wait()
Note that the metrics_process argument of stop_metrics_reporting is the return value of start_metrics_reporting.
The previously mentioned functions can be integrated into your __init__ function in your Deployment class in the deployment.py file in the following way:
Integration¶
We can integrate the aformentioned functions into the Deployment class in the deployment.py file as follows:
import atexit
class Deployment:
def __init__(self, context):
# Your existing initialization code here...
if int(context["process_id"]) == 0:
# Only register the metrics reporter in one process
self.metrics_process = start_metrics_reporting(
project=context["project"],
version_id=context["version_id"]
)
atexit.register(stop_metrics_reporting, self.metrics_process)
def __del__(self):
stop_metrics_reporting(self.metrics_process)
Now, when you deploy your deployment, the metrics reporter will start in a separate process and will periodically collect and send metrics to UbiOps. It will also be properly terminated when the deployment instance is shut down.
That's all there is to integrate vLLM metrics into UbiOps!
If you run into any issues though, do not hesitate to contact support.