In [None]:
import sys
import json
from getpass import getpass
import subprocess
import os
from datetime import datetime
import pandas as pd
import numpy as np
from huggingface_hub import notebook_login, create_inference_endpoint, list_inference_endpoints, whoami, get_inference_endpoint, get_token
from pathlib import Path
from tqdm.notebook import tqdm

In [None]:
notebook_login()

In [None]:
proj_dir = Path.cwd()
print(proj_dir)
LLMPerf_path = proj_dir/'llmperf'

# Config

In [None]:
# Endpoint
ENDPOINT_NAME="tgi-benchmark-sp"
NAMESPACE = 'hf-test-lab'
MODEL = 'meta-llama/Meta-Llama-3-8B-Instruct'
INSTANCE_TYPE = 'nvidia-a100_2'

# Simulation
RESULTS_DIR = proj_dir/'tgi_benchmark_results'/INSTANCE_TYPE
tgi_bss = [8, 16, 24, 32, 40, 48, 56, 64]

# Endpoint setup

In [None]:
def create_endpoint(MAX_BATCH_SIZE, name, instance_type):
    try:
        endpoint = get_inference_endpoint(name=name, namespace=NAMESPACE)
        endpoint.wait()
        return endpoint
    except:
        pass
    try:
        endpoint = create_inference_endpoint(
            name,
            repository=MODEL,
            task="text-generation",
            framework="pytorch",
            region="us-east-1",
            vendor="aws",
            accelerator="gpu",
            instance_size="x1",
            instance_type='nvidia-a100',
            min_replica=0,
            max_replica=1,
            namespace=NAMESPACE,
            custom_image={
                "health_route": "/health",
                "env": {
                    "MAX_INPUT_LENGTH": "3050",
                    "MAX_TOTAL_TOKENS": "3300",
                    "MAX_BATCH_SIZE": f"{MAX_BATCH_SIZE}",
                    "HF_TOKEN": get_token(),
                    "MODEL_ID": "/repository",
                },
                "url": "ghcr.io/huggingface/text-generation-inference:2.0.4",
            },
            type="protected",
        )
        endpoint.wait()
    except Exception as create_error:
        print(f"Failed to create inference endpoint: {str(create_error)}")
        return None

    return endpoint

In [None]:
def run_command(batch_size, endpoint, tgi_bs):
    prefix = f'tgibs_{tgi_bs}__bs_{batch_size}'
    vu = batch_size

    # Set environment variables
    env = os.environ.copy()
    env['HUGGINGFACE_API_BASE'] = endpoint.url
    env['HUGGINGFACE_API_KEY'] = get_token()
    # Convert pathlib.Path to string and append to PYTHONPATH
    env['PYTHONPATH'] = str(LLMPerf_path) + (os.pathsep + env.get('PYTHONPATH', ''))

    # Define the benchmark script path
    benchmark_script = str(LLMPerf_path / "token_benchmark_ray.py")

    if not os.path.isfile(benchmark_script):
        print(f"LLMPerf script not found at {benchmark_script}, please ensure the path is correct.")
        return "Script not found", False

    # Calculate the max number of completed requests
    max_requests = vu * 8

    # Generate the results directory name
    date_str = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    results_dir = RESULTS_DIR / f"{date_str}_{prefix}"

    # Construct the command to run the benchmark script
    command = [
        "python", benchmark_script,
        "--model", f"huggingface/{MODEL}",
        "--mean-input-tokens", "3000",
        "--stddev-input-tokens", "10",
        "--mean-output-tokens", "240",
        "--stddev-output-tokens", "5",
        "--max-num-completed-requests", str(min(max_requests, 1500)),
        "--timeout", "7200",
        "--num-concurrent-requests", str(vu),
        "--results-dir", str(results_dir),
        "--llm-api", "litellm",
        "--additional-sampling-params", '{}'
    ]

    # Run the command with the modified environment
    try:
        result = subprocess.check_output(command, stderr=subprocess.STDOUT, env=env).decode('utf-8')
        return result, True
    except subprocess.CalledProcessError as e:
        print(f"Error with batch size {batch_size}: {e.output.decode()}")
        return e.output.decode(), False

def find_max_working_batch_size(endpoint, tgi_bs):
    batch_sizes = [8, 16, 32, 64, 128, 256]
    max_working = None
    for size in tqdm(batch_sizes):
        tqdm.write(f"Running: TGIBS {tgi_bs} Client Requests {size}")
        output, success = run_command(size, endpoint, tgi_bs)
        if success:
            max_working = size
        else:
            break
    if max_working is None:
        return "No working batch size found in the provided list"
    return max_working

In [None]:
for tgi_bs in tqdm(tgi_bss):
    name = f"{ENDPOINT_NAME}--tgibs-{tgi_bs}"
    endpoint = create_endpoint(MAX_BATCH_SIZE=tgi_bs, name=name, instance_type=INSTANCE_TYPE) 
    endpoint.wait()
    tqdm.write(f"Endpoint Created: {name}")
    max_batch_size = find_max_working_batch_size(endpoint=endpoint, tgi_bs=tgi_bs)
    endpoint.delete()
    tqdm.write(f"Endpoint Deleted: {name}")