Skip to content

vllm.benchmarks.serve

Benchmark online serving throughput.

On the server side, run one of the following commands to launch the vLLM OpenAI API server: vllm serve

On the client side, run: vllm bench serve \ --backend \ --label \ --model \ --dataset-name \ --input-len \ --output-len \ --request-rate \ --num-prompts

SpecDecodeMetrics dataclass

Speculative decoding metrics from the server's Prometheus endpoint.

Source code in vllm/benchmarks/serve.py
@dataclass
class SpecDecodeMetrics:
    """Speculative decoding metrics from the server's Prometheus endpoint."""

    num_drafts: int
    num_draft_tokens: int
    num_accepted_tokens: int
    accepted_per_pos: dict[int, int]

calculate_metrics

calculate_metrics(
    input_requests: list[SampleRequest],
    outputs: list[RequestFuncOutput],
    dur_s: float,
    tokenizer: TokenizerLike,
    selected_percentiles: list[float],
    goodput_config_dict: dict[str, float],
) -> tuple[BenchmarkMetrics, list[int]]

Calculate the metrics for the benchmark.

Parameters:

Name Type Description Default
input_requests list[SampleRequest]

The input requests.

required
outputs list[RequestFuncOutput]

The outputs of the requests.

required
dur_s float

The duration of the benchmark.

required
tokenizer TokenizerLike

The tokenizer to use.

required
selected_percentiles list[float]

The percentiles to select.

required
goodput_config_dict dict[str, float]

The goodput configuration.

required

Returns:

Type Description
tuple[BenchmarkMetrics, list[int]]

A tuple of the benchmark metrics and the actual output lengths.

Source code in vllm/benchmarks/serve.py
def calculate_metrics(
    input_requests: list[SampleRequest],
    outputs: list[RequestFuncOutput],
    dur_s: float,
    tokenizer: TokenizerLike,
    selected_percentiles: list[float],
    goodput_config_dict: dict[str, float],
) -> tuple[BenchmarkMetrics, list[int]]:
    """Calculate the metrics for the benchmark.

    Args:
        input_requests: The input requests.
        outputs: The outputs of the requests.
        dur_s: The duration of the benchmark.
        tokenizer: The tokenizer to use.
        selected_percentiles: The percentiles to select.
        goodput_config_dict: The goodput configuration.

    Returns:
        A tuple of the benchmark metrics and the actual output lengths.
    """
    actual_output_lens: list[int] = []
    total_input = 0
    completed = 0
    good_completed = 0
    itls: list[float] = []
    tpots: list[float] = []
    all_tpots: list[float] = []
    ttfts: list[float] = []
    e2els: list[float] = []
    input_audio_duration = 0.0
    for i in range(len(outputs)):
        if outputs[i].success:
            output_len = outputs[i].output_tokens

            if not output_len:
                if tokenizer is None:
                    output_len = 1
                else:
                    # We use the tokenizer to count the number of output tokens
                    # for some serving backends instead of looking at
                    # len(outputs[i].itl) since multiple output tokens may be
                    # bundled together
                    # Note : this may inflate the output token count slightly
                    output_len = len(
                        tokenizer(
                            outputs[i].generated_text, add_special_tokens=False
                        ).input_ids
                    )
            actual_output_lens.append(output_len)
            total_input += input_requests[i].prompt_len
            tpot = 0
            if output_len > 1:
                latency_minus_ttft = outputs[i].latency - outputs[i].ttft
                tpot = latency_minus_ttft / (output_len - 1)
                tpots.append(tpot)
            # Note: if output_len <= 1, we regard tpot as 0 for goodput
            all_tpots.append(tpot)
            itls += outputs[i].itl
            ttfts.append(outputs[i].ttft)
            e2els.append(outputs[i].latency)
            input_audio_duration += outputs[i].input_audio_duration
            completed += 1
        else:
            actual_output_lens.append(0)

    if goodput_config_dict:
        valid_metrics = []
        slo_values = []

        if "ttft" in goodput_config_dict:
            valid_metrics.append(ttfts)
            slo_values.append(
                goodput_config_dict["ttft"] / MILLISECONDS_TO_SECONDS_CONVERSION
            )
        if "tpot" in goodput_config_dict:
            valid_metrics.append(all_tpots)
            slo_values.append(
                goodput_config_dict["tpot"] / MILLISECONDS_TO_SECONDS_CONVERSION
            )
        if "e2el" in goodput_config_dict:
            valid_metrics.append(e2els)
            slo_values.append(
                goodput_config_dict["e2el"] / MILLISECONDS_TO_SECONDS_CONVERSION
            )

        for req_metric in zip(*valid_metrics):
            is_good_req = all([s >= r for s, r in zip(slo_values, req_metric)])
            if is_good_req:
                good_completed += 1

    if completed == 0:
        warnings.warn(
            "All requests failed. This is likely due to a misconfiguration "
            "on the benchmark arguments.",
            stacklevel=2,
        )

    # Calculate max output tokens per second metric
    max_output_tokens_per_s = 0.0
    max_concurrent_requests = 0

    # Find the time range across all successful requests
    successful_outputs = [output for output in outputs if output.success]
    failed_outputs = [output for output in outputs if not output.success]

    if len(failed_outputs) > 0:
        print("Failed requests during benchmark run detected (capping to 10):")
        for i, err in enumerate(failed_outputs[:10]):
            print(f"Error {i}: {err.error}")

    if successful_outputs:
        min_start_time = min(output.start_time for output in successful_outputs)
        max_end_time = max(
            output.start_time + output.latency for output in successful_outputs
        )

        # Create second buckets (ceiling to ensure we capture all time)
        duration_seconds = int(np.ceil(max_end_time - min_start_time)) + 1
        tokens_per_second = np.zeros(duration_seconds)
        concurrent_requests_per_second = np.zeros(duration_seconds)

        for i, output in enumerate(successful_outputs):
            # Calculate token generation timestamp using
            # start_time, ttft, and itl
            token_times = [output.start_time + output.ttft]
            current_time = token_times[0]
            for itl_value in output.itl:
                current_time += itl_value
                token_times.append(current_time)

            # Add tokens to second buckets
            for token_time in token_times:
                second_bucket = int(token_time - min_start_time)
                if 0 <= second_bucket < duration_seconds:
                    tokens_per_second[second_bucket] += 1

            # Track concurrent requests for each second this request was active
            request_start_second = int(output.start_time - min_start_time)
            request_end_second = int(
                (output.start_time + output.latency) - min_start_time
            )

            for second in range(request_start_second, request_end_second + 1):
                concurrent_requests_per_second[second] += 1

        # Find the maximum tokens per second and corresponding
        # concurrent requests
        if len(tokens_per_second) > 0:
            max_output_tokens_per_s = float(np.max(tokens_per_second))
            max_concurrent_requests = int(np.max(concurrent_requests_per_second))

        if TERM_PLOTLIB_AVAILABLE:
            import termplotlib as tpl

            fig = tpl.figure()
            fig.plot(
                np.arange(len(tokens_per_second)),
                tokens_per_second,
                title="Output tokens per second",
            )
            fig.plot(
                np.arange(len(concurrent_requests_per_second)),
                concurrent_requests_per_second,
                title="Concurrent requests per second",
            )
            fig.show()
        else:
            print("tip: install termplotlib and gnuplot to plot the metrics")

    metrics = BenchmarkMetrics(
        completed=completed,
        failed=len(failed_outputs),
        total_input=total_input,
        total_output=sum(actual_output_lens),
        request_throughput=completed / dur_s,
        request_goodput=good_completed / dur_s,
        output_throughput=sum(actual_output_lens) / dur_s,
        total_token_throughput=(total_input + sum(actual_output_lens)) / dur_s,
        mean_ttft_ms=np.mean(ttfts or 0)
        * 1000,  # ttfts is empty if streaming is not supported by the endpoint
        std_ttft_ms=np.std(ttfts or 0) * 1000,
        median_ttft_ms=np.median(ttfts or 0) * 1000,
        percentiles_ttft_ms=[
            (p, np.percentile(ttfts or 0, p) * 1000) for p in selected_percentiles
        ],
        mean_tpot_ms=np.mean(tpots or 0) * 1000,
        std_tpot_ms=np.std(tpots or 0) * 1000,
        median_tpot_ms=np.median(tpots or 0) * 1000,
        percentiles_tpot_ms=[
            (p, np.percentile(tpots or 0, p) * 1000) for p in selected_percentiles
        ],
        mean_itl_ms=np.mean(itls or 0) * 1000,
        std_itl_ms=np.std(itls or 0) * 1000,
        median_itl_ms=np.median(itls or 0) * 1000,
        percentiles_itl_ms=[
            (p, np.percentile(itls or 0, p) * 1000) for p in selected_percentiles
        ],
        mean_e2el_ms=np.mean(e2els or 0) * 1000,
        std_e2el_ms=np.std(e2els or 0) * 1000,
        median_e2el_ms=np.median(e2els or 0) * 1000,
        percentiles_e2el_ms=[
            (p, np.percentile(e2els or 0, p) * 1000) for p in selected_percentiles
        ],
        max_output_tokens_per_s=max_output_tokens_per_s,
        max_concurrent_requests=max_concurrent_requests,
        rtfx=input_audio_duration / dur_s,
    )

    return metrics, actual_output_lens

calculate_metrics_for_embeddings

calculate_metrics_for_embeddings(
    outputs: list[RequestFuncOutput],
    dur_s: float,
    selected_percentiles: list[float],
) -> EmbedBenchmarkMetrics

Calculate the metrics for the embedding requests.

Parameters:

Name Type Description Default
outputs list[RequestFuncOutput]

The outputs of the requests.

required
dur_s float

The duration of the benchmark.

required
selected_percentiles list[float]

The percentiles to select.

required

Returns:

Type Description
EmbedBenchmarkMetrics

The calculated benchmark metrics.

Source code in vllm/benchmarks/serve.py
def calculate_metrics_for_embeddings(
    outputs: list[RequestFuncOutput],
    dur_s: float,
    selected_percentiles: list[float],
) -> EmbedBenchmarkMetrics:
    """Calculate the metrics for the embedding requests.

    Args:
        outputs: The outputs of the requests.
        dur_s: The duration of the benchmark.
        selected_percentiles: The percentiles to select.

    Returns:
        The calculated benchmark metrics.
    """
    total_input = 0
    completed = 0
    failed = 0
    e2els: list[float] = []
    for i in range(len(outputs)):
        if outputs[i].success:
            e2els.append(outputs[i].latency)
            completed += 1
            total_input += outputs[i].prompt_len
        else:
            failed += 1

    if completed == 0:
        warnings.warn(
            "All requests failed. This is likely due to a misconfiguration "
            "on the benchmark arguments.",
            stacklevel=2,
        )
    metrics = EmbedBenchmarkMetrics(
        completed=completed,
        failed=failed,
        total_input=total_input,
        request_throughput=completed / dur_s,
        total_token_throughput=total_input / dur_s,
        mean_e2el_ms=np.mean(e2els or 0) * 1000,
        std_e2el_ms=np.std(e2els or 0) * 1000,
        median_e2el_ms=np.median(e2els or 0) * 1000,
        percentiles_e2el_ms=[
            (p, np.percentile(e2els or 0, p) * 1000) for p in selected_percentiles
        ],
    )
    return metrics

fetch_spec_decode_metrics async

fetch_spec_decode_metrics(
    base_url: str, session: ClientSession
) -> SpecDecodeMetrics | None

Fetch speculative decoding metrics from the server's Prometheus endpoint.

Returns None if speculative decoding is not enabled or metrics are not available.

Source code in vllm/benchmarks/serve.py
async def fetch_spec_decode_metrics(
    base_url: str, session: aiohttp.ClientSession
) -> SpecDecodeMetrics | None:
    """Fetch speculative decoding metrics from the server's Prometheus endpoint.

    Returns None if speculative decoding is not enabled or metrics are not available.
    """
    metrics_url = f"{base_url}/metrics"
    try:
        async with session.get(metrics_url) as response:
            if response.status != 200:
                return None
            text = await response.text()

            num_drafts = 0
            num_draft_tokens = 0
            num_accepted_tokens = 0
            accepted_per_pos: dict[int, int] = {}
            found_spec_decode = False

            for line in text.split("\n"):
                line = line.strip()
                if not line or line.startswith("#"):
                    continue

                if line.startswith("vllm:spec_decode"):
                    found_spec_decode = True
                    parts = line.split()
                    if parts:
                        with contextlib.suppress(ValueError):
                            if "num_drafts" in line:
                                num_drafts += int(float(parts[-1]))
                            elif "num_draft_tokens" in line:
                                num_draft_tokens += int(float(parts[-1]))
                            elif "num_accepted_tokens_per_pos" in line:
                                pos_label = 'position="'
                                if pos_label in line:
                                    start = line.index(pos_label) + len(pos_label)
                                    end = line.index('"', start)
                                    pos = int(line[start:end])
                                    val = int(float(parts[-1]))
                                    accepted_per_pos[pos] = (
                                        accepted_per_pos.get(pos, 0) + val
                                    )
                            elif "num_accepted_tokens" in line:
                                num_accepted_tokens += int(float(parts[-1]))

            if not found_spec_decode:
                return None

            return SpecDecodeMetrics(
                num_drafts=num_drafts,
                num_draft_tokens=num_draft_tokens,
                num_accepted_tokens=num_accepted_tokens,
                accepted_per_pos=accepted_per_pos,
            )
    except (aiohttp.ClientError, asyncio.TimeoutError):
        return None

get_first_model_from_server async

get_first_model_from_server(
    base_url: str,
    headers: dict | None = None,
    ssl_context: SSLContext | bool | None = None,
) -> tuple[str, str]

Fetch the first model from the server's /v1/models endpoint.

Source code in vllm/benchmarks/serve.py
async def get_first_model_from_server(
    base_url: str,
    headers: dict | None = None,
    ssl_context: ssl.SSLContext | bool | None = None,
) -> tuple[str, str]:
    """Fetch the first model from the server's /v1/models endpoint."""
    models_url = f"{base_url}/v1/models"
    connector = aiohttp.TCPConnector(ssl=ssl_context)
    async with aiohttp.ClientSession(connector=connector) as session:
        try:
            async with session.get(models_url, headers=headers) as response:
                response.raise_for_status()
                data = await response.json()
                if "data" in data and len(data["data"]) > 0:
                    return data["data"][0]["id"], data["data"][0]["root"]
                else:
                    raise ValueError(
                        f"No models found on the server at {base_url}. "
                        "Make sure the server is running and has models loaded."
                    )
        except (aiohttp.ClientError, json.JSONDecodeError) as e:
            raise RuntimeError(
                f"Failed to fetch models from server at {models_url}. "
                "Check that:\n"
                "1. The server is running\n"
                "2. The server URL is correct\n"
                f"Error: {e}"
            ) from e

get_request async

get_request(
    input_requests: list[SampleRequest],
    request_rate: float,
    burstiness: float = 1.0,
    ramp_up_strategy: Literal["linear", "exponential"]
    | None = None,
    ramp_up_start_rps: int | None = None,
    ramp_up_end_rps: int | None = None,
) -> AsyncGenerator[tuple[SampleRequest, float], None]

Asynchronously generates requests at a specified rate with OPTIONAL burstiness and OPTIONAL ramp-up strategy.

Parameters:

Name Type Description Default
input_requests list[SampleRequest]

A list of input requests, each represented as a SampleRequest.

required
request_rate float

The rate at which requests are generated (requests/s).

required
burstiness optional

The burstiness factor of the request generation. Only takes effect when request_rate is not inf. Default value is 1, which follows a Poisson process. Otherwise, the request intervals follow a gamma distribution. A lower burstiness value (0 < burstiness < 1) results in more bursty requests, while a higher burstiness value (burstiness > 1) results in a more uniform arrival of requests.

1.0
ramp_up_strategy optional

The ramp-up strategy. Can be "linear" or "exponential". If None, uses constant request rate (specified by request_rate).

None
ramp_up_start_rps optional

The starting request rate for ramp-up.

None
ramp_up_end_rps optional

The ending request rate for ramp-up.

None
Source code in vllm/benchmarks/serve.py
async def get_request(
    input_requests: list[SampleRequest],
    request_rate: float,
    burstiness: float = 1.0,
    ramp_up_strategy: Literal["linear", "exponential"] | None = None,
    ramp_up_start_rps: int | None = None,
    ramp_up_end_rps: int | None = None,
) -> AsyncGenerator[tuple[SampleRequest, float], None]:
    """
    Asynchronously generates requests at a specified rate
    with OPTIONAL burstiness and OPTIONAL ramp-up strategy.

    Args:
        input_requests:
            A list of input requests, each represented as a SampleRequest.
        request_rate:
            The rate at which requests are generated (requests/s).
        burstiness (optional):
            The burstiness factor of the request generation.
            Only takes effect when request_rate is not inf.
            Default value is 1, which follows a Poisson process.
            Otherwise, the request intervals follow a gamma distribution.
            A lower burstiness value (0 < burstiness < 1) results
            in more bursty requests, while a higher burstiness value
            (burstiness > 1) results in a more uniform arrival of requests.
        ramp_up_strategy (optional):
            The ramp-up strategy. Can be "linear" or "exponential".
            If None, uses constant request rate (specified by request_rate).
        ramp_up_start_rps (optional):
            The starting request rate for ramp-up.
        ramp_up_end_rps (optional):
            The ending request rate for ramp-up.
    """
    assert burstiness > 0, (
        f"A positive burstiness factor is expected, but given {burstiness}."
    )
    # Convert to list to get length for ramp-up calculations
    if isinstance(input_requests, Iterable) and not isinstance(input_requests, list):
        input_requests = list(input_requests)

    total_requests = len(input_requests)
    assert total_requests > 0, "No requests provided."

    # Precompute delays among requests to minimize request send laggings
    request_rates = []
    delay_ts = []
    for request_index, request in enumerate(input_requests):
        current_request_rate = _get_current_request_rate(
            ramp_up_strategy,
            ramp_up_start_rps,
            ramp_up_end_rps,
            request_index,
            total_requests,
            request_rate,
        )
        assert current_request_rate > 0.0, (
            f"Obtained non-positive request rate {current_request_rate}."
        )
        request_rates.append(current_request_rate)
        if current_request_rate == float("inf"):
            delay_ts.append(0)
        elif burstiness == float("inf"):
            # when burstiness tends to infinity, the delay time becomes constant
            # and tends to the inverse of the request rate
            delay_ts.append(1.0 / current_request_rate)
        else:
            theta = 1.0 / (current_request_rate * burstiness)

            # Sample the request interval from the gamma distribution.
            # If burstiness is 1, it follows exponential distribution.
            delay_ts.append(np.random.gamma(shape=burstiness, scale=theta))

    # Calculate the cumulative delay time from the first sent out requests.
    for i in range(1, len(delay_ts)):
        delay_ts[i] += delay_ts[i - 1]
    if ramp_up_strategy is None and delay_ts[-1] != 0:
        # When ramp_up_strategy is not set, we assume the request rate is fixed
        # and all requests should be sent in target_total_delay_s, the following
        # logic would re-scale delay time to ensure the final delay_ts
        # align with target_total_delay_s.
        #
        # NOTE: If we simply accumulate the random delta values
        # from the gamma distribution, their sum would have 1-2% gap
        # from target_total_delay_s. The purpose of the following logic is to
        # close the gap for stabilizing the throughput data
        # from different random seeds.
        target_total_delay_s = total_requests / request_rate
        normalize_factor = target_total_delay_s / delay_ts[-1]
        delay_ts = [delay * normalize_factor for delay in delay_ts]

    start_ts = time.time()
    for request_index, request in enumerate(input_requests):
        if delay_ts[request_index] > 0:
            current_ts = time.time()
            sleep_interval_s = start_ts + delay_ts[request_index] - current_ts
            if sleep_interval_s > 0:
                await asyncio.sleep(sleep_interval_s)
        yield request, request_rates[request_index]