| |
| """ |
| SolarWine Control System Verification Script |
| ============================================= |
| |
| 10-step manual verification comparing DayAheadPlanner output |
| against actual tracker behavior from ThingsBoard over the last N days. |
| |
| Steps: |
| 1. Pull actual tracker angles from ThingsBoard |
| 2. Pull actual energy production from Plant asset |
| 3. Pull historical IMS weather (or use TB ambient sensor) |
| 4. Run DayAheadPlanner for each day (with real weather as "perfect forecast") |
| 5. Compute astronomical tracking angles for comparison |
| 6. Compare planned angles vs actual angles (MAE, max deviation) |
| 7. Validate InterventionGate decisions against conditions |
| 8. Verify energy budget compliance |
| 9. Cross-validate FvCB model outputs |
| 10. Generate summary report / scorecard |
| |
| Usage |
| ----- |
| # Default: last 10 days |
| python scripts/verify_control_system.py |
| |
| # Custom range |
| python scripts/verify_control_system.py --days 7 |
| |
| # Save detailed JSON report |
| python scripts/verify_control_system.py --output Data/verification_report.json |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import math |
| import os |
| import sys |
| from datetime import date, datetime, timedelta, timezone |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
|
|
| from config.settings import ( |
| CANDIDATE_OFFSETS, |
| NO_SHADE_BEFORE_HOUR, |
| SEMILLON_TRANSITION_TEMP_C, |
| SHADE_ELIGIBLE_GHI_ABOVE, |
| SHADE_ELIGIBLE_TLEAF_ABOVE, |
| SITE_LATITUDE, |
| SITE_LONGITUDE, |
| SYSTEM_CAPACITY_KW, |
| TRACKER_ID_MAP, |
| ) |
|
|
| logger = logging.getLogger("verification") |
|
|
|
|
| |
| |
| |
|
|
| def get_tb_client(): |
| """Create a ThingsBoard client with prod credentials.""" |
| from src.data.thingsboard_client import ThingsBoardClient, ThingsBoardConfig |
|
|
| config = ThingsBoardConfig( |
| host=os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/"), |
| username=os.environ.get("THINGSBOARD_USERNAME"), |
| password=os.environ.get("THINGSBOARD_PASSWORD"), |
| ) |
| return ThingsBoardClient(config) |
|
|
|
|
| |
| |
| |
|
|
| def step1_tracker_angles(tb, start: datetime, end: datetime) -> dict[str, pd.DataFrame]: |
| """Fetch tracker angle timeseries for all 4 trackers.""" |
| print("\n[Step 1] Pulling actual tracker angles from ThingsBoard...") |
| tracker_data = {} |
| for tid, tname in TRACKER_ID_MAP.items(): |
| try: |
| df = tb.get_timeseries( |
| tname, ["angle"], start, end, |
| limit=10000, agg="NONE", |
| ) |
| if not df.empty: |
| tracker_data[tname] = df |
| print(f" {tname}: {len(df)} records, " |
| f"angle range [{df['angle'].min():.1f}, {df['angle'].max():.1f}]°") |
| else: |
| print(f" {tname}: no data") |
| except Exception as e: |
| print(f" {tname}: ERROR - {e}") |
| return tracker_data |
|
|
|
|
| |
| |
| |
|
|
| def step2_energy_production(tb, start: datetime, end: datetime) -> pd.DataFrame: |
| """Fetch Plant asset energy production.""" |
| print("\n[Step 2] Pulling actual energy production from Plant asset...") |
| try: |
| df = tb.get_asset_timeseries( |
| "Plant", ["production"], |
| start, end, |
| limit=10000, |
| interval_ms=3_600_000, |
| agg="SUM", |
| ) |
| if not df.empty: |
| |
| daily = df.resample("D").sum() / 1000.0 |
| for idx, row in daily.iterrows(): |
| print(f" {idx.strftime('%Y-%m-%d')}: {row.get('production', 0):.1f} kWh") |
| else: |
| print(" No energy data available") |
| return df |
| except Exception as e: |
| print(f" ERROR: {e}") |
| return pd.DataFrame() |
|
|
|
|
| |
| |
| |
|
|
| def step3_weather_data(tb, start: datetime, end: datetime) -> pd.DataFrame: |
| """Fetch weather data from Air1 ambient sensor (temp, PAR for GHI proxy).""" |
| print("\n[Step 3] Pulling weather data from Air1 (ambient sensor)...") |
| keys = ["airTemperature", "PAR", "windSpeed"] |
| try: |
| df = tb.get_timeseries("Air1", keys, start, end, limit=10000, agg="NONE") |
| if not df.empty: |
| |
| df = df.resample("15min").mean() |
| print(f" {len(df)} records (15-min resampled)") |
| if "airTemperature" in df.columns: |
| temps = df["airTemperature"].dropna() |
| if len(temps): |
| print(f" Temperature: {temps.min():.1f}–{temps.max():.1f}°C " |
| f"(mean {temps.mean():.1f}°C)") |
| if "PAR" in df.columns: |
| par = df["PAR"].dropna() |
| if len(par): |
| |
| print(f" PAR: {par.min():.0f}–{par.max():.0f} µmol/m²/s " |
| f"(GHI proxy: {par.max()/2.1:.0f} W/m²)") |
| else: |
| print(" No weather data from Air1") |
| return df |
| except Exception as e: |
| print(f" ERROR: {e}") |
| return pd.DataFrame() |
|
|
|
|
| |
| |
| |
|
|
| def step4_run_planner(weather_df: pd.DataFrame, start_date: date, end_date: date) -> dict[str, dict]: |
| """Run DayAheadPlanner for each day using real weather as forecast.""" |
| print("\n[Step 4] Running DayAheadPlanner for each day...") |
| from src.day_ahead_planner import DayAheadPlanner |
| from src.energy_budget import EnergyBudgetPlanner |
|
|
| planner = DayAheadPlanner() |
| budget_planner = EnergyBudgetPlanner() |
|
|
| |
| year = start_date.year |
| try: |
| annual = budget_planner.compute_annual_plan(year) |
| month_budget = annual.get("monthly_budgets", {}) |
| except Exception: |
| month_budget = {} |
|
|
| plans = {} |
| current = start_date |
| while current <= end_date: |
| day_str = str(current) |
|
|
| |
| day_start = pd.Timestamp(current, tz="UTC") |
| day_end = day_start + pd.Timedelta(hours=24) - pd.Timedelta(minutes=15) |
| day_times = pd.date_range(day_start, periods=96, freq="15min") |
|
|
| temps = [25.0] * 96 |
| ghis = [0.0] * 96 |
|
|
| if not weather_df.empty: |
| day_weather = weather_df.loc[ |
| (weather_df.index >= day_start) & (weather_df.index < day_start + pd.Timedelta(days=1)) |
| ] |
| for i, ts in enumerate(day_times): |
| |
| if len(day_weather) > 0: |
| idx = day_weather.index.get_indexer([ts], method="nearest")[0] |
| if idx >= 0 and idx < len(day_weather): |
| row = day_weather.iloc[idx] |
| if "airTemperature" in row and pd.notna(row["airTemperature"]): |
| temps[i] = float(row["airTemperature"]) |
| if "PAR" in row and pd.notna(row["PAR"]): |
| |
| ghis[i] = float(row["PAR"]) / 2.1 |
|
|
| |
| month = current.month |
| monthly_kwh = month_budget.get(month, 0.5) |
| import calendar |
| days_in_month = calendar.monthrange(current.year, current.month)[1] |
| daily_budget = monthly_kwh / days_in_month if monthly_kwh > 0 else 0.5 |
|
|
| try: |
| plan = planner.plan_day( |
| target_date=current, |
| forecast_temps=temps, |
| forecast_ghi=ghis, |
| daily_budget_kwh=daily_budget, |
| ) |
| plans[day_str] = plan.to_dict() |
| n_interv = plan.n_intervention_slots |
| print(f" {day_str}: {len(plan.slots)} slots, " |
| f"{n_interv} interventions, " |
| f"cost {plan.total_energy_cost_kwh:.4f}/{daily_budget:.4f} kWh " |
| f"({plan.budget_utilisation_pct:.1f}%)") |
| except Exception as e: |
| print(f" {day_str}: PLANNER ERROR - {e}") |
| plans[day_str] = {"error": str(e)} |
|
|
| current += timedelta(days=1) |
|
|
| return plans |
|
|
|
|
| |
| |
| |
|
|
| def step5_astronomical_angles(start_date: date, end_date: date) -> pd.DataFrame: |
| """Compute expected astronomical tracking angles for each 15-min slot.""" |
| print("\n[Step 5] Computing astronomical tracking angles...") |
| from src.shading.solar_geometry import ShadowModel |
|
|
| shadow = ShadowModel() |
| records = [] |
|
|
| current = start_date |
| while current <= end_date: |
| day_start = pd.Timestamp(current, tz="UTC") |
| times = pd.date_range(day_start, periods=96, freq="15min") |
| solar_pos = shadow.get_solar_position(times) |
|
|
| for i, ts in enumerate(times): |
| elev = float(solar_pos.iloc[i]["solar_elevation"]) |
| azim = float(solar_pos.iloc[i]["solar_azimuth"]) |
|
|
| if elev > 2: |
| tracker = shadow.compute_tracker_tilt(azim, elev) |
| astro_angle = float(tracker["tracker_theta"]) |
| else: |
| astro_angle = 0.0 |
|
|
| records.append({ |
| "timestamp": ts, |
| "date": str(current), |
| "solar_elevation": elev, |
| "solar_azimuth": azim, |
| "astro_angle": astro_angle, |
| }) |
|
|
| current += timedelta(days=1) |
|
|
| df = pd.DataFrame(records).set_index("timestamp") |
| daylight = df[df["solar_elevation"] > 2] |
| print(f" {len(daylight)} daylight slots computed") |
| if len(daylight): |
| print(f" Astro angle range: [{daylight['astro_angle'].min():.1f}, " |
| f"{daylight['astro_angle'].max():.1f}]°") |
| return df |
|
|
|
|
| |
| |
| |
|
|
| def step6_compare_angles( |
| plans: dict, |
| tracker_data: dict[str, pd.DataFrame], |
| astro_df: pd.DataFrame, |
| ) -> pd.DataFrame: |
| """Compare planned offsets + astronomical angles against actual tracker angles.""" |
| print("\n[Step 6] Comparing planned vs actual tracker angles...") |
|
|
| if not tracker_data: |
| print(" No tracker data available — skipping comparison") |
| return pd.DataFrame() |
|
|
| |
| tracker_name = next(iter(tracker_data)) |
| actual_df = tracker_data[tracker_name].copy() |
| print(f" Using {tracker_name} for comparison ({len(actual_df)} records)") |
|
|
| comparisons = [] |
| for day_str, plan in plans.items(): |
| if "error" in plan: |
| continue |
| slots = plan.get("slots", []) |
| for slot in slots: |
| time_str = slot["time"] |
| offset = slot["offset_deg"] |
|
|
| |
| ts = pd.Timestamp(f"{day_str} {time_str}", tz="UTC") |
|
|
| |
| if ts in astro_df.index: |
| astro = astro_df.loc[ts, "astro_angle"] |
| else: |
| |
| idx = astro_df.index.get_indexer([ts], method="nearest")[0] |
| astro = astro_df.iloc[idx]["astro_angle"] if idx >= 0 else 0.0 |
|
|
| planned_angle = astro + offset |
|
|
| |
| actual_angle = None |
| if not actual_df.empty and "angle" in actual_df.columns: |
| nearest_idx = actual_df.index.get_indexer([ts], method="nearest") |
| if nearest_idx[0] >= 0: |
| actual_row = actual_df.iloc[nearest_idx[0]] |
| time_diff = abs((actual_df.index[nearest_idx[0]] - ts).total_seconds()) |
| if time_diff < 1800: |
| actual_angle = float(actual_row["angle"]) |
|
|
| comparisons.append({ |
| "timestamp": ts, |
| "date": day_str, |
| "time": time_str, |
| "astro_angle": astro, |
| "planned_offset": offset, |
| "planned_angle": planned_angle, |
| "actual_angle": actual_angle, |
| "gate_passed": slot["gate_passed"], |
| "deviation": abs(planned_angle - actual_angle) if actual_angle is not None else None, |
| }) |
|
|
| comp_df = pd.DataFrame(comparisons) |
| if comp_df.empty: |
| print(" No comparison data generated") |
| return comp_df |
|
|
| valid = comp_df.dropna(subset=["deviation"]) |
| if len(valid): |
| mae = valid["deviation"].mean() |
| max_dev = valid["deviation"].max() |
| within_2 = (valid["deviation"] <= 2.0).sum() |
| print(f" Matched records: {len(valid)}") |
| print(f" Mean Absolute Error: {mae:.2f}°") |
| print(f" Max deviation: {max_dev:.2f}°") |
| print(f" Within ±2° tolerance: {within_2}/{len(valid)} " |
| f"({within_2/len(valid)*100:.0f}%)") |
|
|
| |
| print(f"\n {'Date':<12} {'Slots':>6} {'MAE':>7} {'MaxDev':>7} {'Within2°':>9}") |
| for day, grp in valid.groupby("date"): |
| d_mae = grp["deviation"].mean() |
| d_max = grp["deviation"].max() |
| d_ok = (grp["deviation"] <= 2.0).sum() |
| print(f" {day:<12} {len(grp):>6} {d_mae:>6.2f}° {d_max:>6.2f}° " |
| f"{d_ok:>4}/{len(grp):<4}") |
| else: |
| print(" No matched actual vs planned angle data") |
|
|
| return comp_df |
|
|
|
|
| |
| |
| |
|
|
| def step7_validate_gate(plans: dict, weather_df: pd.DataFrame) -> list[dict]: |
| """Check gate decisions against weather conditions.""" |
| print("\n[Step 7] Validating InterventionGate decisions...") |
| violations = [] |
|
|
| for day_str, plan in plans.items(): |
| if "error" in plan: |
| continue |
|
|
| for slot in plan.get("slots", []): |
| time_str = slot["time"] |
| offset = slot["offset_deg"] |
| gate = slot["gate_passed"] |
| tags = slot.get("tags", []) |
|
|
| ts = pd.Timestamp(f"{day_str} {time_str}", tz="UTC") |
| hour = ts.hour + ts.minute / 60.0 |
|
|
| |
| temp_c = None |
| ghi = None |
| if not weather_df.empty: |
| idx = weather_df.index.get_indexer([ts], method="nearest") |
| if idx[0] >= 0: |
| row = weather_df.iloc[idx[0]] |
| time_diff = abs((weather_df.index[idx[0]] - ts).total_seconds()) |
| if time_diff < 1800: |
| temp_c = row.get("airTemperature") |
| par = row.get("PAR") |
| if par is not None and pd.notna(par): |
| ghi = par / 2.1 |
| if temp_c is not None and pd.notna(temp_c): |
| temp_c = float(temp_c) |
|
|
| |
| if gate and offset > 0: |
| |
| if hour < NO_SHADE_BEFORE_HOUR: |
| violations.append({ |
| "date": day_str, "time": time_str, |
| "type": "SHADE_BEFORE_10", |
| "detail": f"Shading at {hour:.1f}h (before {NO_SHADE_BEFORE_HOUR}:00)", |
| "severity": "CRITICAL", |
| }) |
| if temp_c is not None and temp_c < SHADE_ELIGIBLE_TLEAF_ABOVE: |
| violations.append({ |
| "date": day_str, "time": time_str, |
| "type": "SHADE_BELOW_TEMP", |
| "detail": f"Shading at {temp_c:.1f}°C (threshold: {SHADE_ELIGIBLE_TLEAF_ABOVE}°C)", |
| "severity": "WARNING", |
| }) |
| if ghi is not None and ghi < SHADE_ELIGIBLE_GHI_ABOVE: |
| violations.append({ |
| "date": day_str, "time": time_str, |
| "type": "SHADE_LOW_GHI", |
| "detail": f"Shading at GHI {ghi:.0f} W/m² (threshold: {SHADE_ELIGIBLE_GHI_ABOVE})", |
| "severity": "WARNING", |
| }) |
|
|
| elif not gate and offset == 0: |
| |
| if temp_c is not None and temp_c >= SHADE_ELIGIBLE_TLEAF_ABOVE and \ |
| ghi is not None and ghi >= SHADE_ELIGIBLE_GHI_ABOVE and \ |
| hour >= NO_SHADE_BEFORE_HOUR: |
| |
| |
| pass |
|
|
| if violations: |
| critical = [v for v in violations if v["severity"] == "CRITICAL"] |
| warnings = [v for v in violations if v["severity"] == "WARNING"] |
| print(f" {len(critical)} CRITICAL violations, {len(warnings)} warnings") |
| for v in critical: |
| print(f" !! {v['date']} {v['time']}: {v['detail']}") |
| for v in warnings[:5]: |
| print(f" ? {v['date']} {v['time']}: {v['detail']}") |
| if len(warnings) > 5: |
| print(f" ... and {len(warnings) - 5} more warnings") |
| else: |
| print(" No gate violations found — all decisions are consistent") |
|
|
| return violations |
|
|
|
|
| |
| |
| |
|
|
| def step8_budget_compliance(plans: dict) -> list[dict]: |
| """Check that no plan exceeds its daily budget.""" |
| print("\n[Step 8] Verifying energy budget compliance...") |
| results = [] |
|
|
| for day_str, plan in plans.items(): |
| if "error" in plan: |
| continue |
|
|
| budget = plan.get("daily_budget_kwh", 0) |
| cost = plan.get("total_energy_cost_kwh", 0) |
| util = plan.get("budget_utilisation_pct", 0) |
| exceeded = cost > budget |
|
|
| result = { |
| "date": day_str, |
| "budget_kwh": budget, |
| "cost_kwh": cost, |
| "utilisation_pct": util, |
| "exceeded": exceeded, |
| } |
| results.append(result) |
|
|
| if exceeded: |
| print(f" !! {day_str}: BUDGET EXCEEDED — cost {cost:.4f} > budget {budget:.4f} kWh") |
|
|
| if results: |
| total_budget = sum(r["budget_kwh"] for r in results) |
| total_cost = sum(r["cost_kwh"] for r in results) |
| any_exceeded = any(r["exceeded"] for r in results) |
| print(f" Total budget: {total_budget:.4f} kWh, total cost: {total_cost:.4f} kWh " |
| f"({total_cost/total_budget*100:.1f}%)" if total_budget > 0 else " No budget data") |
| if not any_exceeded: |
| print(" All days within budget — PASS") |
| return results |
|
|
|
|
| |
| |
| |
|
|
| def step9_fvcb_validation(weather_df: pd.DataFrame) -> list[dict]: |
| """Run FvCB model on available weather data and check consistency.""" |
| print("\n[Step 9] Cross-validating FvCB model outputs...") |
|
|
| try: |
| from src.models.farquhar_model import FarquharModel |
| except ImportError: |
| try: |
| from src.farquhar_model import FarquharModel |
| except ImportError: |
| print(" FarquharModel not importable — skipping") |
| return [] |
|
|
| model = FarquharModel() |
| results = [] |
|
|
| if weather_df.empty: |
| print(" No weather data for FvCB validation") |
| return results |
|
|
| |
| daylight = weather_df.between_time("05:00", "17:00") |
| if daylight.empty: |
| |
| daylight = weather_df[ |
| (weather_df.index.hour >= 4) & (weather_df.index.hour <= 16) |
| ] |
|
|
| sample = daylight.dropna(subset=["airTemperature"]).head(50) |
|
|
| for ts, row in sample.iterrows(): |
| temp = float(row["airTemperature"]) |
| par = float(row.get("PAR", 500)) if pd.notna(row.get("PAR")) else 500.0 |
|
|
| try: |
| result = model.calc_photosynthesis_semillon( |
| PAR=par, |
| Tleaf=temp, |
| CO2=400.0, |
| VPD=2.0, |
| Tair=temp, |
| ) |
| if isinstance(result, tuple) and len(result) >= 3: |
| A, limiting, shading_helps = result[0], result[1], result[2] |
| else: |
| A = result |
| limiting = "unknown" |
| shading_helps = temp >= SEMILLON_TRANSITION_TEMP_C |
|
|
| |
| issues = [] |
| if A < 0: |
| issues.append("negative_A") |
| if A > 40: |
| issues.append("A_too_high") |
| if temp < SEMILLON_TRANSITION_TEMP_C and shading_helps: |
| issues.append("shading_helps_below_transition") |
| if temp >= SEMILLON_TRANSITION_TEMP_C and not shading_helps and limiting == "rubisco": |
| issues.append("rubisco_limited_but_shading_not_helpful") |
|
|
| results.append({ |
| "timestamp": str(ts), |
| "temp_c": temp, |
| "par": par, |
| "A": round(A, 2), |
| "limiting": str(limiting), |
| "shading_helps": bool(shading_helps), |
| "issues": issues, |
| }) |
| except Exception as e: |
| results.append({ |
| "timestamp": str(ts), |
| "temp_c": temp, |
| "par": par, |
| "error": str(e), |
| }) |
|
|
| valid = [r for r in results if "error" not in r] |
| issues_found = [r for r in valid if r.get("issues")] |
|
|
| if valid: |
| a_values = [r["A"] for r in valid] |
| print(f" {len(valid)} slots evaluated") |
| print(f" A range: {min(a_values):.2f}–{max(a_values):.2f} µmol/m²/s " |
| f"(mean {sum(a_values)/len(a_values):.2f})") |
|
|
| shading_count = sum(1 for r in valid if r["shading_helps"]) |
| print(f" Shading helps: {shading_count}/{len(valid)} slots") |
|
|
| if issues_found: |
| print(f" {len(issues_found)} consistency issues found:") |
| for r in issues_found[:5]: |
| print(f" {r['timestamp']}: {r['issues']} " |
| f"(T={r['temp_c']:.1f}°C, A={r['A']:.2f})") |
| else: |
| print(" No FvCB consistency issues — PASS") |
|
|
| errors = [r for r in results if "error" in r] |
| if errors: |
| print(f" {len(errors)} FvCB computation errors") |
| for r in errors[:3]: |
| print(f" {r['timestamp']}: {r['error']}") |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| def step10_scorecard( |
| plans: dict, |
| comp_df: pd.DataFrame, |
| violations: list[dict], |
| budget_results: list[dict], |
| fvcb_results: list[dict], |
| tracker_data: dict, |
| energy_df: pd.DataFrame, |
| ) -> dict: |
| """Generate and print the final verification scorecard.""" |
| print("\n" + "=" * 70) |
| print(" VERIFICATION SCORECARD") |
| print("=" * 70) |
|
|
| scorecard = { |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "days_analyzed": len(plans), |
| "checks": {}, |
| } |
|
|
| |
| has_trackers = bool(tracker_data) |
| has_energy = not energy_df.empty |
| has_plans = any("error" not in p for p in plans.values()) |
| print(f"\n Data Availability:") |
| print(f" Tracker telemetry: {'YES' if has_trackers else 'NO'}") |
| print(f" Energy production: {'YES' if has_energy else 'NO'}") |
| print(f" Planner output: {'YES' if has_plans else 'NO'}") |
| scorecard["checks"]["data_availability"] = { |
| "trackers": has_trackers, "energy": has_energy, "plans": has_plans, |
| } |
|
|
| |
| if not comp_df.empty: |
| valid = comp_df.dropna(subset=["deviation"]) |
| if len(valid): |
| mae = valid["deviation"].mean() |
| within_2 = (valid["deviation"] <= 2.0).mean() * 100 |
| status = "PASS" if mae < 5.0 else "WARN" if mae < 15.0 else "FAIL" |
| print(f"\n Angle Alignment ({status}):") |
| print(f" MAE: {mae:.2f}°") |
| print(f" Within ±2° tolerance: {within_2:.0f}%") |
| scorecard["checks"]["angle_alignment"] = { |
| "status": status, "mae_deg": round(mae, 2), |
| "within_tolerance_pct": round(within_2, 1), |
| } |
|
|
| |
| critical = [v for v in violations if v["severity"] == "CRITICAL"] |
| warnings = [v for v in violations if v["severity"] == "WARNING"] |
| gate_status = "PASS" if not critical else "FAIL" |
| print(f"\n Gate Compliance ({gate_status}):") |
| print(f" Critical violations: {len(critical)}") |
| print(f" Warnings: {len(warnings)}") |
| scorecard["checks"]["gate_compliance"] = { |
| "status": gate_status, |
| "critical": len(critical), |
| "warnings": len(warnings), |
| } |
|
|
| |
| exceeded = [r for r in budget_results if r.get("exceeded")] |
| budget_status = "PASS" if not exceeded else "FAIL" |
| print(f"\n Budget Compliance ({budget_status}):") |
| print(f" Days exceeding budget: {len(exceeded)}/{len(budget_results)}") |
| if budget_results: |
| total_cost = sum(r["cost_kwh"] for r in budget_results) |
| total_budget = sum(r["budget_kwh"] for r in budget_results) |
| print(f" Total spend: {total_cost:.4f} / {total_budget:.4f} kWh") |
| scorecard["checks"]["budget_compliance"] = { |
| "status": budget_status, "days_exceeded": len(exceeded), |
| } |
|
|
| |
| fvcb_issues = [r for r in fvcb_results if r.get("issues")] |
| fvcb_errors = [r for r in fvcb_results if "error" in r] |
| fvcb_status = "PASS" if not fvcb_issues and not fvcb_errors else "WARN" if not fvcb_errors else "FAIL" |
| print(f"\n FvCB Model ({fvcb_status}):") |
| print(f" Consistency issues: {len(fvcb_issues)}") |
| print(f" Computation errors: {len(fvcb_errors)}") |
| scorecard["checks"]["fvcb_model"] = { |
| "status": fvcb_status, |
| "issues": len(fvcb_issues), |
| "errors": len(fvcb_errors), |
| } |
|
|
| |
| all_statuses = [c.get("status", "PASS") for c in scorecard["checks"].values() if isinstance(c, dict)] |
| if "FAIL" in all_statuses: |
| overall = "FAIL" |
| elif "WARN" in all_statuses: |
| overall = "WARN" |
| else: |
| overall = "PASS" |
|
|
| print(f"\n {'=' * 40}") |
| print(f" OVERALL: {overall}") |
| print(f" {'=' * 40}") |
| scorecard["overall"] = overall |
|
|
| return scorecard |
|
|
|
|
| |
| |
| |
|
|
| def print_daily_table(plans, comp_df, violations, budget_results): |
| """Print the per-day scorecard table from Step 10.""" |
| print("\n Per-Day Scorecard:") |
| print(f" {'Date':<12} {'Interv':>7} {'Budget%':>8} {'MAE°':>6} " |
| f"{'GateViol':>9} {'Status':>8}") |
| print(f" {'-'*60}") |
|
|
| days = sorted(plans.keys()) |
| for day in days: |
| plan = plans[day] |
| if "error" in plan: |
| print(f" {day:<12} {'ERROR':>7}") |
| continue |
|
|
| n_interv = plan.get("n_intervention_slots", 0) |
| util = plan.get("budget_utilisation_pct", 0) |
|
|
| |
| if not comp_df.empty: |
| day_comp = comp_df[comp_df["date"] == day].dropna(subset=["deviation"]) |
| mae = day_comp["deviation"].mean() if len(day_comp) else float("nan") |
| else: |
| mae = float("nan") |
|
|
| |
| day_viol = [v for v in violations if v["date"] == day and v["severity"] == "CRITICAL"] |
|
|
| status = "PASS" |
| if day_viol: |
| status = "FAIL" |
| elif mae > 10: |
| status = "WARN" |
|
|
| mae_str = f"{mae:.1f}" if not math.isnan(mae) else "N/A" |
| print(f" {day:<12} {n_interv:>7} {util:>7.1f}% {mae_str:>6} " |
| f"{len(day_viol):>9} {status:>8}") |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description="SolarWine control system verification") |
| parser.add_argument("--days", type=int, default=10, help="Number of days to verify (default: 10)") |
| parser.add_argument("--output", type=str, help="Save JSON report to this path") |
| parser.add_argument("--verbose", "-v", action="store_true") |
| args = parser.parse_args() |
|
|
| level = logging.DEBUG if args.verbose else logging.INFO |
| logging.basicConfig( |
| level=level, |
| format="%(asctime)s %(name)-15s %(levelname)-7s %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
|
|
| end_date = date.today() - timedelta(days=1) |
| start_date = end_date - timedelta(days=args.days - 1) |
|
|
| print("=" * 70) |
| print(f" SolarWine Control System Verification") |
| print(f" Period: {start_date} → {end_date} ({args.days} days)") |
| print("=" * 70) |
|
|
| |
| tb = get_tb_client() |
|
|
| start_dt = datetime(start_date.year, start_date.month, start_date.day, tzinfo=timezone.utc) |
| end_dt = datetime(end_date.year, end_date.month, end_date.day, 23, 59, 59, tzinfo=timezone.utc) |
|
|
| |
| tracker_data = step1_tracker_angles(tb, start_dt, end_dt) |
| energy_df = step2_energy_production(tb, start_dt, end_dt) |
| weather_df = step3_weather_data(tb, start_dt, end_dt) |
|
|
| |
| plans = step4_run_planner(weather_df, start_date, end_date) |
| astro_df = step5_astronomical_angles(start_date, end_date) |
|
|
| |
| comp_df = step6_compare_angles(plans, tracker_data, astro_df) |
| violations = step7_validate_gate(plans, weather_df) |
| budget_results = step8_budget_compliance(plans) |
| fvcb_results = step9_fvcb_validation(weather_df) |
|
|
| |
| scorecard = step10_scorecard( |
| plans, comp_df, violations, budget_results, fvcb_results, |
| tracker_data, energy_df, |
| ) |
| print_daily_table(plans, comp_df, violations, budget_results) |
|
|
| |
| if args.output: |
| report = { |
| "scorecard": scorecard, |
| "plans": plans, |
| "violations": violations, |
| "budget_results": budget_results, |
| "fvcb_results": fvcb_results, |
| "comparison_summary": { |
| "total_records": len(comp_df) if not comp_df.empty else 0, |
| "matched_records": len(comp_df.dropna(subset=["deviation"])) if not comp_df.empty else 0, |
| }, |
| } |
| Path(args.output).parent.mkdir(parents=True, exist_ok=True) |
| with open(args.output, "w") as f: |
| json.dump(report, f, indent=2, default=str) |
| print(f"\nDetailed report saved to: {args.output}") |
|
|
| print("\nVerification complete.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|