BPO-Bench / api_candidate_source.py
haroldshipibm's picture
Upload folder using huggingface_hub
d075a5b verified
"""
Candidate source APIs - compute metrics from actual data.
AUTO-GENERATED by scripts/generate_hf.sh - DO NOT EDIT DIRECTLY
Edit candidate_source.py in main repo and regenerate.
"""
from typing import Dict, List, Any, Optional, Union
import pandas as pd
from loguru import logger
from data_loader import get_data_loader
from models import (
RequisitionNotFoundResponse,
SLAPerSourceResponse,
TotalHiresBySourceResponse,
CandidateVolumeResponse,
FunnelConversionResponse,
MetadataResponse,
DefinitionsResponse,
SourceRecommendationResponse,
)
BPO_LOG_API_CALLS = False # Disabled for deployment
def _log_api_call(msg: str) -> None:
"""Log API call if BPO_LOG_API_CALLS is enabled."""
if BPO_LOG_API_CALLS:
logger.info(msg)
def _check_requisition_valid(requisition_id: str) -> Optional[RequisitionNotFoundResponse]:
"""
Check if a requisition ID is valid. Returns None if valid,
or an error response model if invalid.
"""
loader = get_data_loader()
if not loader.is_valid_requisition(requisition_id):
suggestions = loader.get_suggested_requisitions(requisition_id)
return RequisitionNotFoundResponse(
error="requisition_not_found",
message=f"No job can be found with the ID {requisition_id}.",
suggested_requisition_ids=suggestions,
)
return None
def get_sla_per_source(requisition_id: str) -> Union[SLAPerSourceResponse, RequisitionNotFoundResponse]:
"""
Retrieves the SLA percentage for each sourcing channel.
Args:
requisition_id: The specific requisition ID to filter SLA data for.
Returns:
A dictionary with source names and their SLA percentages.
"""
_log_api_call(f"API call: get_sla_per_source(requisition_id={requisition_id})")
# Check if requisition ID is valid
error = _check_requisition_valid(requisition_id)
if error:
return error
loader = get_data_loader()
data = loader.get_similar_requisitions(requisition_id)
# Filter to only reviewed candidates (SLA only applies to reviewed candidates)
reviewed_data = data[data['reviewed']]
# Group by source and calculate SLA met percentage
sla_by_source = reviewed_data.groupby('source_name').agg(
total=('sla_met', 'count'),
sla_met=('sla_met', 'sum')
)
sla_by_source['sla_percentage'] = (sla_by_source['sla_met'] / sla_by_source['total'] * 100).round(0).astype(int)
metrics = [
{
"source_name": source,
"sla_percentage": int(row['sla_percentage'])
}
for source, row in sla_by_source.iterrows()
]
# Sort by SLA percentage (ascending) for consistency
metrics.sort(key=lambda x: x['sla_percentage'])
return SLAPerSourceResponse(metrics=metrics)
def get_total_hires_by_source(requisition_id: str) -> Union[TotalHiresBySourceResponse, RequisitionNotFoundResponse]:
"""
Retrieves the total number of hires per sourcing channel.
Args:
requisition_id: The specific requisition ID to filter hiring data for.
Returns:
A dictionary with source names and total hires.
"""
_log_api_call(f"API call: get_total_hires_by_source(requisition_id={requisition_id})")
# Check if requisition ID is valid
error = _check_requisition_valid(requisition_id)
if error:
return error
loader = get_data_loader()
data = loader.get_similar_requisitions(requisition_id)
# Count hires by source
hires_by_source = data[data['hired']].groupby('source_name').size()
metrics = [
{
"source_name": source,
"total_hires": int(count)
}
for source, count in hires_by_source.items()
]
# Sort by total hires (descending)
metrics.sort(key=lambda x: x['total_hires'], reverse=True)
total_hires = int(data['hired'].sum())
return TotalHiresBySourceResponse(
job_id=requisition_id,
metrics=metrics,
total_hires=total_hires,
)
def get_candidate_volume_by_source(
requisition_id: str,
sources: Optional[List[str]] = None
) -> Union[CandidateVolumeResponse, RequisitionNotFoundResponse]:
"""
Retrieves candidate volume per sourcing channel.
Args:
requisition_id: The specific requisition ID to filter candidate volume.
sources: Optional subset of sourcing channels to include (case-sensitive).
Returns:
A dictionary with source names and candidate volumes.
"""
_log_api_call(f"API call: get_candidate_volume_by_source(requisition_id={requisition_id}, sources={sources})")
# Check if requisition ID is valid
error = _check_requisition_valid(requisition_id)
if error:
return error
loader = get_data_loader()
data = loader.get_similar_requisitions(requisition_id)
total_volume = len(data)
# Count candidates by source
volume_by_source = data.groupby('source_name').size()
metrics = [
{
"source_name": source,
"candidate_volume": int(count),
"percentage": int(round(count/total_volume*100))
}
for source, count in volume_by_source.items()
]
# Filter by sources if provided
if sources:
metrics = [m for m in metrics if m['source_name'] in sources]
# Sort by volume (descending)
metrics.sort(key=lambda x: x['candidate_volume'], reverse=True)
return CandidateVolumeResponse(
job_id=requisition_id,
total_candidate_volume=total_volume,
metrics=metrics,
heading=(
f"For requisitions similar to {requisition_id}, there were {total_volume} candidates over "
"the past three years. Here's how many candidates came from each source "
"(with percentages from the total number):"
),
)
def get_funnel_conversion_by_source(requisition_id: str) -> Union[FunnelConversionResponse, RequisitionNotFoundResponse]:
"""
Retrieves conversion rates at each funnel stage for each sourcing channel.
Args:
requisition_id: The specific requisition ID to filter funnel data for.
Returns:
A dictionary with review %, interview rate, and offer acceptance rate.
"""
_log_api_call(f"API call: get_funnel_conversion_by_source(requisition_id={requisition_id})")
# Check if requisition ID is valid
error = _check_requisition_valid(requisition_id)
if error:
return error
loader = get_data_loader()
data = loader.get_similar_requisitions(requisition_id)
metrics = []
for source in data['source_name'].unique():
source_data = data[data['source_name'] == source]
total = len(source_data)
if total == 0:
continue
reviewed = source_data['reviewed'].sum()
interviewed = source_data['interviewed'].sum()
offered = source_data['offer_extended'].sum()
metrics.append({
"source_name": source,
"first_round_review_percentage": round(reviewed / total * 100, 1),
"interview_rate": round(interviewed / total * 100, 1),
"offer_acceptance_rate": round(offered / total * 100, 1),
})
# Sort by source name for consistency
metrics.sort(key=lambda x: x['source_name'])
return FunnelConversionResponse(
job_id=requisition_id,
metrics=metrics,
)
def get_metadata_and_timeframe(requisition_id: str) -> Union[MetadataResponse, RequisitionNotFoundResponse]:
"""
Retrieves metadata including data timeframe, last update date, and the
number of requisitions analysed.
Args:
requisition_id: The job requisition ID.
Returns:
A dictionary containing timeframe and requisition summary.
"""
_log_api_call(f"API call: get_metadata_and_timeframe(requisition_id={requisition_id})")
# Check if requisition ID is valid
error = _check_requisition_valid(requisition_id)
if error:
return error
loader = get_data_loader()
data = loader.get_similar_requisitions(requisition_id)
# Get date range from applied_at column
min_date = data['applied_at'].min()
max_date = data['applied_at'].max()
# Count unique requisitions
num_requisitions = data['requisition_id'].nunique()
# Static dates for reproducible benchmarking
# Use actual dates from data but with last_updated fixed for stability
return MetadataResponse(
job_id=requisition_id,
time_frame_start="2023-10-09",
time_frame_end="2025-03-15",
data_last_updated="2025-04-29",
total_requisitions_analysed=num_requisitions,
)
def get_definitions_and_methodology(requisition_id: str) -> Union[DefinitionsResponse, RequisitionNotFoundResponse]:
"""
Provides definitions of key metrics and outlines the methodology used
to calculate performance.
Args:
requisition_id: The specific requisition ID for context.
Returns:
A dictionary including metric definitions, calculation notes,
and the top metrics considered.
"""
_log_api_call(f"API call: get_definitions_and_methodology(requisition_id={requisition_id})")
# Check if requisition ID is valid
error = _check_requisition_valid(requisition_id)
if error:
return error
loader = get_data_loader()
data = loader.get_similar_requisitions(requisition_id)
# Report total requisitions in dataset (full analysis framework)
num_total_requisitions = loader.data['requisition_id'].nunique()
min_date = data['applied_at'].min()
max_date = data['applied_at'].max()
years = (max_date - min_date).days / 365.25
return DefinitionsResponse(
job_id=requisition_id,
definitions={
"sla": "Percentage of candidates reviewed within the defined SLA window (e.g., 48 hours)",
"time_to_fill": "Average time from job posting to accepted offer",
"success_rate": "Ratio of candidates who accepted offers out of those interviewed",
},
calculation_notes=(
f"Metrics are computed from {num_total_requisitions} requisitions over the last {years:.1f} years. "
"Funnel stats are based on system timestamps and recruiter actions in ATS."
),
top_metrics_considered=[
"SLA %",
"First round review %",
"Offer acceptance rate",
"Candidate volume",
"Total hires",
],
)
def get_source_recommendation_summary(requisition_id: str) -> Union[SourceRecommendationResponse, RequisitionNotFoundResponse]:
"""
Returns a high-level summary combining jobs-filled %, review %, offer-accept
rate, and total hires for each source.
Args:
requisition_id: The job requisition ID.
Returns:
A dictionary with composite source metrics.
"""
_log_api_call(f"API call: get_source_recommendation_summary(requisition_id={requisition_id})")
# Check if requisition ID is valid
error = _check_requisition_valid(requisition_id)
if error:
return error
loader = get_data_loader()
data = loader.get_similar_requisitions(requisition_id)
num_requisitions = data['requisition_id'].nunique()
metrics = []
for source in data['source_name'].unique():
source_data = data[data['source_name'] == source]
total = len(source_data)
if total == 0:
continue
# Calculate metrics
reviewed = source_data['reviewed'].sum()
hired = source_data['hired'].sum()
# Jobs filled percentage: what % of requisitions had at least 1 hire from this source
reqs_with_hires = source_data[source_data['hired']]['requisition_id'].nunique()
jobs_filled_pct = int(reqs_with_hires / num_requisitions * 100)
# Offer acceptance rate: of those who got offers, how many accepted?
offers = source_data['offer_extended'].sum()
accepted = source_data['offer_accepted'].sum()
offer_accept_rate = round(accepted / offers * 100) if offers > 0 else 0
metrics.append({
"source_name": source,
"jobs_filled_percentage": jobs_filled_pct,
"first_round_review_percentage": int(reviewed / total * 100),
"offer_acceptance_rate": offer_accept_rate,
"total_hires": int(hired),
})
# Sort by source name
metrics.sort(key=lambda x: x['source_name'])
return SourceRecommendationResponse(
total_requisitions=num_requisitions,
metrics=metrics,
)