""" Self-contained evaluation runner for OCC stack. Includes all core classes inline + all simulated benchmarks + ablations + anti-gaming. Runs on CPU. Outputs JSON report. """ import json import random import sys import time from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional import numpy as np # --- CORE CLASSES (INLINE) --- @dataclass class OracleResult: raw_score: float cost_adjusted_score: float confidence: float evidence: Dict[str, Any] reason: str failure_tags: List[str] = field(default_factory=list) reward_value: float = 0.0 class ImpactOracle: def __init__(self, compute_penalty_rate=0.0001, calibration_weight=0.2, abstention_bonus=1.0, hallucination_penalty=2.0, confident_wrong_penalty=3.0, gaming_penalty=2.0, code_weights=None, qa_weights=None, debate_weights=None): self.code_weights = code_weights or {"correctness": 1.0, "pass_at_k": 0.3, "regression": -0.5, "compute_penalty": 0.001} self.qa_weights = qa_weights or {"correctness": 1.0, "evidence_support": 0.5, "calibration": 0.2, "abstention_utility": 1.0, "hallucination_penalty": 2.0, "confident_wrong_penalty": 3.0} self.debate_weights = debate_weights or {"decision_quality": 1.0, "influence_efficiency": 0.5, "throughput": 0.3, "marginal_contribution": 0.5} self.compute_penalty_rate = compute_penalty_rate self.calibration_weight = calibration_weight self.abstention_bonus = abstention_bonus self.hallucination_penalty = hallucination_penalty self.confident_wrong_penalty = confident_wrong_penalty self.gaming_penalty = gaming_penalty def score(self, mode, action, context, result, agent_id=""): if mode == "code": return self._score_code(action, context, result, agent_id) elif mode == "retrieval_qa": return self._score_qa(action, context, result, agent_id) elif mode == "debate": return self._score_debate(action, context, result, agent_id) return OracleResult(0.0, 0.0, 0.0, {}, f"unknown mode {mode}", ["unknown_mode"]) def _score_code(self, action, context, result, agent_id): correctness = result.get("correctness", 0.0) pass_at_k = result.get("pass_at_k", 0.0) regression = result.get("regression", False) compute_cost = result.get("compute_cost", 0.0) hidden_pass = result.get("hidden_tests_pass", correctness) public_pass = result.get("public_pass", correctness) tags = [] if public_pass and not hidden_pass: tags.append("gaming_hidden_tests") raw = (correctness * self.code_weights["correctness"] + pass_at_k * self.code_weights["pass_at_k"] + (self.code_weights["regression"] if regression else 0.0) - compute_cost * self.code_weights.get("compute_penalty", self.compute_penalty_rate)) if "gaming_hidden_tests" in tags: raw -= self.gaming_penalty cost_adj = raw - compute_cost * self.compute_penalty_rate return OracleResult(raw, cost_adj, result.get("confidence", correctness), {"correctness": correctness}, f"corr={correctness:.2f}, cost={compute_cost}", tags, cost_adj) def _score_qa(self, action, context, result, agent_id): gold = context.get("gold_answer", "") answer = result.get("answer", "") confidence = result.get("confidence", 0.5) compute_cost = result.get("compute_cost", 0.0) abstained = action.get("abstained", False) if abstained: correct_abstention = context.get("is_unanswerable", False) raw = self.abstention_bonus if correct_abstention else -self.abstention_bonus tags = ["correct_abstention" if correct_abstention else "wrong_abstention"] return OracleResult(raw, raw - compute_cost * self.compute_penalty_rate, confidence, {}, f"abstain correct={correct_abstention}", tags, raw) correctness = self._fuzzy_match(answer, gold) evidence = result.get("evidence", {}) entailment = evidence.get("entailment_score", 0.0) contradiction = evidence.get("contradiction_score", 0.0) hallucination = contradiction > 0.5 confident_wrong = confidence > 0.8 and correctness < 0.5 tags = [] if hallucination: tags.append("hallucination") if confident_wrong: tags.append("confident_wrong") if compute_cost > 2000: tags.append("excessive_compute") if compute_cost > 500 and correctness < 0.5: tags.append("compute_waste") raw = (correctness * self.qa_weights["correctness"] + entailment * self.qa_weights.get("evidence_support", 0.5) - (self.hallucination_penalty if hallucination else 0.0) - (self.confident_wrong_penalty if confident_wrong else 0.0) - compute_cost * self.compute_penalty_rate) brier = (confidence - correctness) ** 2 raw += (1 - brier) * self.calibration_weight cost_adj = raw - compute_cost * self.compute_penalty_rate if compute_cost > 100 and raw < 0.5: cost_adj -= self.gaming_penalty * 0.5 return OracleResult(raw, cost_adj, confidence, evidence, f"corr={correctness:.2f}, conf={confidence:.2f}", tags, cost_adj) def _score_debate(self, action, context, result, agent_id): decision_quality = result.get("decision_quality", 0.0) marginal = result.get("marginal_contribution", 0.0) tokens = result.get("tokens", 0) compute_cost = result.get("compute_cost", tokens) spam = result.get("spam", False) collusion = result.get("collusion", False) tags = [] if spam: tags.append("spam") if collusion: tags.append("collusion") if tokens > 5000: tags.append("verbose_waste") raw = (decision_quality * self.debate_weights["decision_quality"] + marginal * self.debate_weights["marginal_contribution"] + (1.0 / max(tokens, 1)) * self.debate_weights["influence_efficiency"] - compute_cost * self.compute_penalty_rate) if spam: raw -= self.gaming_penalty if collusion: raw -= self.gaming_penalty * 2 cost_adj = raw - compute_cost * self.compute_penalty_rate return OracleResult(raw, cost_adj, result.get("confidence", 0.5), {"marginal": marginal}, f"dq={decision_quality:.2f}, tokens={tokens}", tags, cost_adj) def _fuzzy_match(self, a, b): if not a or not b: return 0.0 a, b = a.strip().lower(), b.strip().lower() return 1.0 if a == b else 0.5 if (a in b or b in a) else 0.0 @dataclass class LedgerEntry: agent_id: str; task_id: str; action_id: str; earned_credit: float; spent_credit: float decayed_credit: float; remaining_credit: float; reason: str; oracle_score: float compute_cost: float; timestamp: float; capability_scope: str = "global" class CreditLedger: def __init__(self, decay_lambda=0.05): self.entries = [] self.balances = {} self.decay_lambda = decay_lambda def earn(self, agent_id, task_id, action_id, amount, oracle_score, compute_cost, reason, capability_scope="global"): now = time.time() self._apply_decay(agent_id, now, capability_scope) current = self._get(agent_id, capability_scope) new_bal = current + amount self.entries.append(LedgerEntry(agent_id, task_id, action_id, amount, 0.0, 0.0, new_bal, reason, oracle_score, compute_cost, now, capability_scope)) self._set(agent_id, capability_scope, new_bal) def spend(self, agent_id, task_id, action_id, amount, capability_scope="global", reason="spend"): now = time.time() self._apply_decay(agent_id, now, capability_scope) current = self._get(agent_id, capability_scope) if current < amount: return False new_bal = current - amount self.entries.append(LedgerEntry(agent_id, task_id, action_id, 0.0, amount, 0.0, new_bal, reason, 0.0, 0.0, now, capability_scope)) self._set(agent_id, capability_scope, new_bal) return True def transfer(self, from_agent, to_agent, amount, capability_scope="global"): return False # non-transferable def balance(self, agent_id, capability_scope="global"): now = time.time() self._apply_decay(agent_id, now, capability_scope) return self._get(agent_id, capability_scope) def _get(self, agent_id, cap): return self.balances.get(agent_id, {}).get(cap, 0.0) def _set(self, agent_id, cap, val): if agent_id not in self.balances: self.balances[agent_id] = {} self.balances[agent_id][cap] = val def _apply_decay(self, agent_id, now, cap): current = self._get(agent_id, cap) if current <= 0: return decayed = current * (1 - self.decay_lambda) if decayed < current: self.entries.append(LedgerEntry(agent_id, "decay", "decay", 0.0, 0.0, current - decayed, decayed, "credit_decay", 0.0, 0.0, now, cap)) self._set(agent_id, cap, decayed) def detect_collusion(self, window=10): recent = self.entries[-window:] agents = set(e.agent_id for e in recent) if len(agents) < 2: return None return {"suspicious_agents": list(agents), "count": len(recent)} class Decision(Enum): ALLOW = "allow"; DENY = "deny"; REQUIRE_APPROVAL = "require_approval" DOWNGRADE = "downgrade"; ESCALATE = "escalate"; ASK_JUSTIFICATION = "ask_justification" @dataclass class ResourceDecision: decision: Decision; reason: str; capability: str; downgrade_to: Optional[str] = None class ResourceBroker: RESOURCE_RISK = {"model_call": "medium", "retrieval_call": "low", "verifier_call": "medium", "debate_turn": "low", "file_write": "high", "shell_execute": "high", "memory_write": "medium", "human_escalation": "high", "larger_model": "medium"} DEFAULT_THRESHOLDS = {"low": 0.5, "medium": 2.0, "high": 5.0} def __init__(self, thresholds=None, urgency_boost=0.5): self.thresholds = thresholds or self.DEFAULT_THRESHOLDS.copy() self.urgency_boost = urgency_boost self.denial_history = {} def request(self, capability, agent_id, credit_balance, task_state=None, risk_score=0.0, gaming_flags=None): task_state = task_state or {} gaming_flags = gaming_flags or [] risk_class = self.RESOURCE_RISK.get(capability, "medium") threshold = self.thresholds.get(risk_class, 2.0) urgency = task_state.get("urgency", 0.0) adjusted = max(0.1, threshold - urgency * self.urgency_boost) if gaming_flags: return ResourceDecision(Decision.DENY, f"Gaming: {gaming_flags}", capability) if risk_class == "high" and risk_score > 0.7: return ResourceDecision(Decision.REQUIRE_APPROVAL, f"High risk {risk_score:.2f}", capability) if credit_balance >= adjusted: return ResourceDecision(Decision.ALLOW, f"Balance {credit_balance:.2f} >= {adjusted:.2f}", capability) if credit_balance >= adjusted * 0.5: if risk_class == "medium": return ResourceDecision(Decision.DOWNGRADE, f"Downgrading from {capability}", capability, "retrieval_call") return ResourceDecision(Decision.ASK_JUSTIFICATION, f"Justification required", capability) denials = self.denial_history.get(agent_id, 0) if denials > 3: return ResourceDecision(Decision.ESCALATE, f"Denied {denials} times", capability) self.denial_history[agent_id] = denials + 1 return ResourceDecision(Decision.DENY, f"Balance {credit_balance:.2f} < {adjusted:.2f}", capability) # --- BENCHMARK SIMULATIONS (INLINE) --- @dataclass class CodeProblem: task_id: str; difficulty: float; hidden_test_difficulty: float; public_test_difficulty: float class SimulatedCodeAgent: def __init__(self, agent_id, pass_rate_easy=0.9, pass_rate_hard=0.3, hidden_test_falloff=0.15, cost_per_attempt=200): self.agent_id = agent_id self.pass_rate_easy = pass_rate_easy self.pass_rate_hard = pass_rate_hard self.hidden_test_falloff = hidden_test_falloff self.cost_per_attempt = cost_per_attempt self.attempts = 0 def solve(self, problem): self.attempts += 1 base_acc = self.pass_rate_easy * (1 - problem.difficulty) + self.pass_rate_hard * problem.difficulty public_pass = random.random() < base_acc hidden_acc = max(0.0, base_acc - self.hidden_test_falloff * problem.hidden_test_difficulty) hidden_pass = random.random() < hidden_acc return {"public_pass": public_pass, "hidden_pass": hidden_pass, "compute_cost": self.cost_per_attempt} def gen_code_problems(n, seed): random.seed(seed); np.random.seed(seed) return [CodeProblem(f"task_{i}", random.random(), random.random(), random.random()) for i in range(n)] def run_code_baseline(problems, agent): total = 0; results = [] for p in problems: r = agent.solve(p) total += r["compute_cost"] results.append(r) acc = sum(1 for r in results if r["public_pass"]) / len(results) return {"accuracy": acc, "total_compute": total, "mean_compute": total / len(problems)} def run_code_occ(problems, agents, oracle, ledger, broker, max_attempts=3): total = 0; results = [] for a in agents: q = (a.pass_rate_easy + a.pass_rate_hard) / 2 ledger.earn(a.agent_id, "seed", "seed", q * 20, 0.0, 0.0, "initial", "model_call") for p in problems: solved = False; cost = 0; used = [] ranked = sorted(agents, key=lambda a: a.cost_per_attempt / max(0.1, (a.pass_rate_easy + a.pass_rate_hard) / 2)) for agent in ranked: if solved or len(used) >= max_attempts: break r = agent.solve(p); cost += r["compute_cost"]; total += r["compute_cost"]; used.append(agent.agent_id) solved = r["public_pass"]; hidden = r["hidden_pass"] ora = oracle.score("code", {"attempt": len(used)}, {}, {"correctness": 1.0 if solved else 0.0, "pass_at_k": 1.0 if hidden else 0.0, "compute_cost": cost, "public_pass": solved, "hidden_tests_pass": hidden}, agent_id=agent.agent_id) if ora.raw_score > 0: ledger.earn(agent.agent_id, p.task_id, "solve", ora.raw_score * 5, ora.raw_score, cost, "pass", "model_call") else: ledger.spend(agent.agent_id, p.task_id, "solve", 1.0, "model_call", "fail") if hidden: break results.append({"solved": solved, "cost": cost, "agents": used}) acc = sum(1 for r in results if r["solved"]) / len(results) return {"accuracy": acc, "total_compute": total, "mean_compute": total / len(problems), "mean_agents": sum(len(r["agents"]) for r in results) / len(results)} def create_qa_dataset(seed=42, n=50): random.seed(seed) evidence_pool = ["alpha", "beta", "gamma", "delta"] questions = [] for i in range(n): q_type = random.choice(["answerable", "unanswerable", "misleading", "incomplete", "conflicting"]) answer = random.choice(["paris", "42", "yes", "no", "tokyo"]) evidence = random.sample(evidence_pool, k=random.randint(1, 3)) questions.append({"id": f"q_{i}", "question": f"Q{i}", "type": q_type, "answer": answer, "evidence": evidence, "is_unanswerable": q_type == "unanswerable"}) return questions def run_qa_occ(dataset, agent_params, oracle, ledger, broker): total_compute = 0; correct = 0 ledger.earn("qa_agent", "seed", "seed", 20, 0.0, 0.0, "initial", "retrieval_call") for item in dataset: balance = ledger.balance("qa_agent", "retrieval_call") dec = broker.request("retrieval_call", "qa_agent", balance, task_state={"urgency": 0.5}) if dec.decision == Decision.DENY: continue tokens = 200 if dec.decision == Decision.ALLOW else 100 total_compute += tokens should_answer = item["type"] != "unanswerable" ans = item["answer"] if (should_answer and random.random() < agent_params["acc"]) else None conf = 0.9 if ans else 0.3 ora = oracle.score("retrieval_qa", {"abstained": ans is None}, item, {"answer": ans, "confidence": conf, "evidence": {}, "compute_cost": tokens}, "qa_agent") if ora.raw_score > 0: ledger.earn("qa_agent", item["id"], "ans", ora.raw_score * 3, ora.raw_score, tokens, "correct", "retrieval_call") correct += 1 else: ledger.spend("qa_agent", item["id"], "ans", 0.5, "retrieval_call", "wrong") return {"accuracy": correct / len(dataset), "total_compute": total_compute, "mean_compute": total_compute / len(dataset)} def run_debate_occ(n_debates, n_agents, agent_configs, oracle, ledger, broker, seed=42): random.seed(seed) correct = 0; total_compute = 0; consensus = 0 for _ in range(n_debates): truth = random.choice(["A", "B", "C"]) agents = [] for cfg in agent_configs: acc = cfg["acc"] if cfg["honest"] else random.random() * 0.4 agents.append({"honest": cfg["honest"], "acc": acc, "id": cfg["id"], "tokens": cfg.get("tokens", 200)}) votes = [] for a in agents: balance = ledger.balance(a["id"], "debate_turn") dec = broker.request("debate_turn", a["id"], balance) if dec.decision == Decision.DENY: continue total_compute += a["tokens"] vote = truth if (a["honest"] and random.random() < a["acc"]) else random.choice(["A", "B", "C"]) votes.append((a["id"], vote, a["honest"], a["acc"])) ledger.spend(a["id"], "debate", "turn", 1.0, "debate_turn", "participate") if not votes: continue honest_votes = [v for _, v, h, _ in votes if h] final = max(set([v for _, v, _, _ in votes]), key=lambda x: sum(1 for _, v, _, _ in votes if v == x)) if final == truth: correct += 1 for vid, _, h, _ in votes: if h: ledger.earn(vid, "debate", "consensus", 2.0, 1.0, 0, "consensus", "debate_turn") if len(set(v for _, v, _, _ in votes)) == 1: consensus += 1 n = n_debates return {"accuracy": correct / n, "consensus_reached": consensus / n, "total_compute": total_compute, "mean_compute": total_compute / n} # --- ABLATIONS & ANTI-GAMING --- ABLATIONS = [ ("default", "Full OCC", 0.02, 2.0, 0.0001, True, {}), ("no_decay", "No credit decay", 0.0, 2.0, 0.0001, True, {}), ("fast_decay", "Aggressive decay", 0.1, 2.0, 0.0001, True, {}), ("no_gaming_penalty", "No gaming penalties", 0.02, 0.0, 0.0001, True, {}), ("high_gaming_penalty", "Severe gaming penalties", 0.02, 5.0, 0.0001, True, {}), ("lenient_broker", "Lenient broker", 0.02, 2.0, 0.0001, True, {"low": 0.25, "medium": 1.0, "high": 2.5}), ("strict_broker", "Strict broker", 0.02, 2.0, 0.0001, True, {"low": 1.0, "medium": 4.0, "high": 10.0}), ("high_compute_cost", "High compute penalty", 0.02, 2.0, 0.001, True, {}), ("low_compute_cost", "Low compute penalty", 0.02, 2.0, 0.00001, True, {}), ("anti_gaming_off", "Anti-gaming disabled", 0.02, 2.0, 0.0001, False, {}), ] def run_all(): print("=" * 60) print("OCC UNIFIED EVALUATION RUNNER (SELF-CONTAINED)") print("=" * 60) seed = 42 n_problems = 100 n_qa = 100 n_debates = 50 results = {"ablations": {}, "anti_gaming": {}} # Ablations for name, desc, decay, game_pen, comp_pen, anti_on, broker_thresh in ABLATIONS: print(f"\n--- ABLATION: {name} ---") oracle = ImpactOracle(compute_penalty_rate=comp_pen, gaming_penalty=game_pen if anti_on else 0.0) ledger = CreditLedger(decay_lambda=decay) broker = ResourceBroker(thresholds=broker_thresh if broker_thresh else None) problems = gen_code_problems(n_problems, seed) cheap = SimulatedCodeAgent("cheap", 0.65, 0.15, 0.20, 60) medium = SimulatedCodeAgent("medium", 0.85, 0.35, 0.15, 150) expensive = SimulatedCodeAgent("expensive", 0.95, 0.65, 0.10, 350) code_res = run_code_occ(problems, [cheap, medium, expensive], oracle, ledger, broker, max_attempts=3) print(f" Code: acc={code_res['accuracy']:.3f}, compute={code_res['total_compute']:.0f}") qa_data = create_qa_dataset(seed=seed, n=n_qa) qa_res = run_qa_occ(qa_data, {"acc": 0.85}, oracle, ledger, broker) print(f" QA: acc={qa_res['accuracy']:.3f}, compute={qa_res['total_compute']:.0f}") ledger2 = CreditLedger(decay_lambda=decay) broker2 = ResourceBroker(thresholds=broker_thresh if broker_thresh else None) for i in range(3): ledger2.earn(f"f{i}", "seed", "seed", 5, 0, 0, "initial", "debate_turn") debate_res = run_debate_occ(n_debates, 3, [{"id": f"f{i}", "honest": True, "acc": 0.9, "tokens": 200} for i in range(3)], oracle, ledger2, broker2, seed=seed) print(f" Debate: acc={debate_res['accuracy']:.3f}, compute={debate_res['total_compute']:.0f}") results["ablations"][name] = {"description": desc, "code": code_res, "qa": qa_res, "debate": debate_res} # Anti-gaming print("\n--- ANTI-GAMING TESTS ---") # Hidden-test gaming oracle = ImpactOracle(gaming_penalty=2.0) normal_res = []; gamer_res = [] for _ in range(50): public_pass = random.random() < 0.9 hidden_pass = random.random() < 0.5 if True else random.random() < 0.9 ora_normal = oracle.score("code", {}, {}, {"correctness": 1.0 if public_pass else 0.0, "pass_at_k": 1.0 if hidden_pass else 0.0, "compute_cost": 150, "public_pass": public_pass, "hidden_tests_pass": hidden_pass}) normal_res.append(ora_normal.raw_score) # Gamer: always passes public, fails hidden ora_gamer = oracle.score("code", {}, {}, {"correctness": 1.0, "pass_at_k": 0.0, "compute_cost": 100, "public_pass": True, "hidden_tests_pass": False}) gamer_res.append(ora_gamer.raw_score) results["anti_gaming"]["hidden_test_gaming"] = { "normal_mean_raw": sum(normal_res) / len(normal_res), "gamer_mean_raw": sum(gamer_res) / len(gamer_res), "gamer_penalized_rate": sum(1 for r in gamer_res if r < 0) / len(gamer_res), } print(f" Hidden-test gaming: normal={results['anti_gaming']['hidden_test_gaming']['normal_mean_raw']:.2f}, gamer={results['anti_gaming']['hidden_test_gaming']['gamer_mean_raw']:.2f}") # Collusion ledger = CreditLedger() ledger.earn("alice", "seed", "seed", 10, 0, 0, "initial") ledger.earn("bob", "seed", "seed", 1, 0, 0, "initial") ok = ledger.transfer("alice", "bob", 5.0) results["anti_gaming"]["collusion"] = { "transfer_allowed": ok, "alice_balance": ledger.balance("alice"), "bob_balance": ledger.balance("bob"), "blocked": not ok, } print(f" Collusion: transfer_allowed={ok}, alice={ledger.balance('alice'):.1f}, bob={ledger.balance('bob'):.1f}") # Over-abstention oracle = ImpactOracle() abstention_rewards = [] for _ in range(10): res = oracle.score("retrieval_qa", {"abstained": True}, {"is_unanswerable": False, "gold_answer": "yes"}, {"answer": None, "confidence": 0.9, "evidence": {}, "compute_cost": 50}) abstention_rewards.append(res.reward_value) results["anti_gaming"]["abstention"] = { "mean_reward": sum(abstention_rewards) / len(abstention_rewards), "negative": sum(abstention_rewards) < 0, } print(f" Abstention: mean_reward={results['anti_gaming']['abstention']['mean_reward']:.2f}, negative={results['anti_gaming']['abstention']['negative']}") # Spam oracle = ImpactOracle() spam_res = oracle.score("retrieval_qa", {}, {"gold_answer": "paris"}, {"answer": "london", "confidence": 0.1, "evidence": {}, "compute_cost": 5000}) results["anti_gaming"]["spam"] = { "reward": spam_res.reward_value, "tags": spam_res.failure_tags, } print(f" Spam: reward={spam_res.reward_value:.2f}, tags={spam_res.failure_tags}") # Save out = Path("/app/occ/reports") out.mkdir(parents=True, exist_ok=True) with open(out / "eval_runner_results.json", "w") as f: json.dump(results, f, indent=2, default=str) print(f"\nSaved to {out / 'eval_runner_results.json'}") # Summary table print("\n" + "=" * 60) print("ABLATION SUMMARY") print("=" * 60) print(f"{'Name':<20} {'Code Acc':>10} {'Code Comp':>10} {'QA Acc':>10} {'QA Comp':>10} {'Deb Acc':>10} {'Deb Comp':>10}") for name, data in results["ablations"].items(): print(f"{name:<20} {data['code']['accuracy']:>10.3f} {data['code']['total_compute']:>10.0f} " f"{data['qa']['accuracy']:>10.3f} {data['qa']['total_compute']:>10.0f} " f"{data['debate']['accuracy']:>10.3f} {data['debate']['total_compute']:>10.0f}") return results if __name__ == "__main__": run_all()