api / scripts /verify_control_system.py
safraeli's picture
Deploy: 2026 sensor migration + redesign + bucket B endpoints
13fc29d verified
#!/usr/bin/env python3
"""
SolarWine Control System Verification Script
=============================================
10-step manual verification comparing DayAheadPlanner output
against actual tracker behavior from ThingsBoard over the last N days.
Steps:
1. Pull actual tracker angles from ThingsBoard
2. Pull actual energy production from Plant asset
3. Pull historical IMS weather (or use TB ambient sensor)
4. Run DayAheadPlanner for each day (with real weather as "perfect forecast")
5. Compute astronomical tracking angles for comparison
6. Compare planned angles vs actual angles (MAE, max deviation)
7. Validate InterventionGate decisions against conditions
8. Verify energy budget compliance
9. Cross-validate FvCB model outputs
10. Generate summary report / scorecard
Usage
-----
# Default: last 10 days
python scripts/verify_control_system.py
# Custom range
python scripts/verify_control_system.py --days 7
# Save detailed JSON report
python scripts/verify_control_system.py --output Data/verification_report.json
"""
from __future__ import annotations
import json
import logging
import math
import os
import sys
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
import numpy as np
import pandas as pd
# Ensure project root is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from config.settings import (
CANDIDATE_OFFSETS,
NO_SHADE_BEFORE_HOUR,
SEMILLON_TRANSITION_TEMP_C,
SHADE_ELIGIBLE_GHI_ABOVE,
SHADE_ELIGIBLE_TLEAF_ABOVE,
SITE_LATITUDE,
SITE_LONGITUDE,
SYSTEM_CAPACITY_KW,
TRACKER_ID_MAP,
)
logger = logging.getLogger("verification")
# ---------------------------------------------------------------------------
# ThingsBoard connection
# ---------------------------------------------------------------------------
def get_tb_client():
"""Create a ThingsBoard client with prod credentials."""
from src.data.thingsboard_client import ThingsBoardClient, ThingsBoardConfig
config = ThingsBoardConfig(
host=os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/"),
username=os.environ.get("THINGSBOARD_USERNAME"),
password=os.environ.get("THINGSBOARD_PASSWORD"),
)
return ThingsBoardClient(config)
# ---------------------------------------------------------------------------
# Step 1: Pull actual tracker angles
# ---------------------------------------------------------------------------
def step1_tracker_angles(tb, start: datetime, end: datetime) -> dict[str, pd.DataFrame]:
"""Fetch tracker angle timeseries for all 4 trackers."""
print("\n[Step 1] Pulling actual tracker angles from ThingsBoard...")
tracker_data = {}
for tid, tname in TRACKER_ID_MAP.items():
try:
df = tb.get_timeseries(
tname, ["angle"], start, end,
limit=10000, agg="NONE",
)
if not df.empty:
tracker_data[tname] = df
print(f" {tname}: {len(df)} records, "
f"angle range [{df['angle'].min():.1f}, {df['angle'].max():.1f}]°")
else:
print(f" {tname}: no data")
except Exception as e:
print(f" {tname}: ERROR - {e}")
return tracker_data
# ---------------------------------------------------------------------------
# Step 2: Pull actual energy production
# ---------------------------------------------------------------------------
def step2_energy_production(tb, start: datetime, end: datetime) -> pd.DataFrame:
"""Fetch Plant asset energy production."""
print("\n[Step 2] Pulling actual energy production from Plant asset...")
try:
df = tb.get_asset_timeseries(
"Plant", ["production"],
start, end,
limit=10000,
interval_ms=3_600_000, # hourly aggregation
agg="SUM",
)
if not df.empty:
# production is in Wh, convert to kWh
daily = df.resample("D").sum() / 1000.0
for idx, row in daily.iterrows():
print(f" {idx.strftime('%Y-%m-%d')}: {row.get('production', 0):.1f} kWh")
else:
print(" No energy data available")
return df
except Exception as e:
print(f" ERROR: {e}")
return pd.DataFrame()
# ---------------------------------------------------------------------------
# Step 3: Pull historical weather (IMS or TB ambient sensor)
# ---------------------------------------------------------------------------
def step3_weather_data(tb, start: datetime, end: datetime) -> pd.DataFrame:
"""Fetch weather data from Air1 ambient sensor (temp, PAR for GHI proxy)."""
print("\n[Step 3] Pulling weather data from Air1 (ambient sensor)...")
keys = ["airTemperature", "PAR", "windSpeed"]
try:
df = tb.get_timeseries("Air1", keys, start, end, limit=10000, agg="NONE")
if not df.empty:
# Resample to 15-min
df = df.resample("15min").mean()
print(f" {len(df)} records (15-min resampled)")
if "airTemperature" in df.columns:
temps = df["airTemperature"].dropna()
if len(temps):
print(f" Temperature: {temps.min():.1f}{temps.max():.1f}°C "
f"(mean {temps.mean():.1f}°C)")
if "PAR" in df.columns:
par = df["PAR"].dropna()
if len(par):
# PAR (µmol/m²/s) → GHI proxy: GHI ≈ PAR / 2.1
print(f" PAR: {par.min():.0f}{par.max():.0f} µmol/m²/s "
f"(GHI proxy: {par.max()/2.1:.0f} W/m²)")
else:
print(" No weather data from Air1")
return df
except Exception as e:
print(f" ERROR: {e}")
return pd.DataFrame()
# ---------------------------------------------------------------------------
# Step 4: Run DayAheadPlanner for each day
# ---------------------------------------------------------------------------
def step4_run_planner(weather_df: pd.DataFrame, start_date: date, end_date: date) -> dict[str, dict]:
"""Run DayAheadPlanner for each day using real weather as forecast."""
print("\n[Step 4] Running DayAheadPlanner for each day...")
from src.day_ahead_planner import DayAheadPlanner
from src.energy_budget import EnergyBudgetPlanner
planner = DayAheadPlanner()
budget_planner = EnergyBudgetPlanner()
# Get annual plan for budget allocation
year = start_date.year
try:
annual = budget_planner.compute_annual_plan(year)
month_budget = annual.get("monthly_budgets", {})
except Exception:
month_budget = {}
plans = {}
current = start_date
while current <= end_date:
day_str = str(current)
# Extract 96 temperature and GHI values for this day
day_start = pd.Timestamp(current, tz="UTC")
day_end = day_start + pd.Timedelta(hours=24) - pd.Timedelta(minutes=15)
day_times = pd.date_range(day_start, periods=96, freq="15min")
temps = [25.0] * 96
ghis = [0.0] * 96
if not weather_df.empty:
day_weather = weather_df.loc[
(weather_df.index >= day_start) & (weather_df.index < day_start + pd.Timedelta(days=1))
]
for i, ts in enumerate(day_times):
# Find closest weather record
if len(day_weather) > 0:
idx = day_weather.index.get_indexer([ts], method="nearest")[0]
if idx >= 0 and idx < len(day_weather):
row = day_weather.iloc[idx]
if "airTemperature" in row and pd.notna(row["airTemperature"]):
temps[i] = float(row["airTemperature"])
if "PAR" in row and pd.notna(row["PAR"]):
# PAR → GHI proxy
ghis[i] = float(row["PAR"]) / 2.1
# Daily budget: use monthly allocation / 30 as simple estimate
month = current.month
monthly_kwh = month_budget.get(month, 0.5)
import calendar
days_in_month = calendar.monthrange(current.year, current.month)[1]
daily_budget = monthly_kwh / days_in_month if monthly_kwh > 0 else 0.5
try:
plan = planner.plan_day(
target_date=current,
forecast_temps=temps,
forecast_ghi=ghis,
daily_budget_kwh=daily_budget,
)
plans[day_str] = plan.to_dict()
n_interv = plan.n_intervention_slots
print(f" {day_str}: {len(plan.slots)} slots, "
f"{n_interv} interventions, "
f"cost {plan.total_energy_cost_kwh:.4f}/{daily_budget:.4f} kWh "
f"({plan.budget_utilisation_pct:.1f}%)")
except Exception as e:
print(f" {day_str}: PLANNER ERROR - {e}")
plans[day_str] = {"error": str(e)}
current += timedelta(days=1)
return plans
# ---------------------------------------------------------------------------
# Step 5: Compute astronomical tracking angles
# ---------------------------------------------------------------------------
def step5_astronomical_angles(start_date: date, end_date: date) -> pd.DataFrame:
"""Compute expected astronomical tracking angles for each 15-min slot."""
print("\n[Step 5] Computing astronomical tracking angles...")
from src.shading.solar_geometry import ShadowModel
shadow = ShadowModel()
records = []
current = start_date
while current <= end_date:
day_start = pd.Timestamp(current, tz="UTC")
times = pd.date_range(day_start, periods=96, freq="15min")
solar_pos = shadow.get_solar_position(times)
for i, ts in enumerate(times):
elev = float(solar_pos.iloc[i]["solar_elevation"])
azim = float(solar_pos.iloc[i]["solar_azimuth"])
if elev > 2:
tracker = shadow.compute_tracker_tilt(azim, elev)
astro_angle = float(tracker["tracker_theta"])
else:
astro_angle = 0.0 # night — stowed
records.append({
"timestamp": ts,
"date": str(current),
"solar_elevation": elev,
"solar_azimuth": azim,
"astro_angle": astro_angle,
})
current += timedelta(days=1)
df = pd.DataFrame(records).set_index("timestamp")
daylight = df[df["solar_elevation"] > 2]
print(f" {len(daylight)} daylight slots computed")
if len(daylight):
print(f" Astro angle range: [{daylight['astro_angle'].min():.1f}, "
f"{daylight['astro_angle'].max():.1f}]°")
return df
# ---------------------------------------------------------------------------
# Step 6: Compare planned vs actual angles
# ---------------------------------------------------------------------------
def step6_compare_angles(
plans: dict,
tracker_data: dict[str, pd.DataFrame],
astro_df: pd.DataFrame,
) -> pd.DataFrame:
"""Compare planned offsets + astronomical angles against actual tracker angles."""
print("\n[Step 6] Comparing planned vs actual tracker angles...")
if not tracker_data:
print(" No tracker data available — skipping comparison")
return pd.DataFrame()
# Use first available tracker for comparison
tracker_name = next(iter(tracker_data))
actual_df = tracker_data[tracker_name].copy()
print(f" Using {tracker_name} for comparison ({len(actual_df)} records)")
comparisons = []
for day_str, plan in plans.items():
if "error" in plan:
continue
slots = plan.get("slots", [])
for slot in slots:
time_str = slot["time"]
offset = slot["offset_deg"]
# Build timestamp
ts = pd.Timestamp(f"{day_str} {time_str}", tz="UTC")
# Get astronomical angle
if ts in astro_df.index:
astro = astro_df.loc[ts, "astro_angle"]
else:
# Find nearest
idx = astro_df.index.get_indexer([ts], method="nearest")[0]
astro = astro_df.iloc[idx]["astro_angle"] if idx >= 0 else 0.0
planned_angle = astro + offset
# Find nearest actual angle
actual_angle = None
if not actual_df.empty and "angle" in actual_df.columns:
nearest_idx = actual_df.index.get_indexer([ts], method="nearest")
if nearest_idx[0] >= 0:
actual_row = actual_df.iloc[nearest_idx[0]]
time_diff = abs((actual_df.index[nearest_idx[0]] - ts).total_seconds())
if time_diff < 1800: # within 30 min
actual_angle = float(actual_row["angle"])
comparisons.append({
"timestamp": ts,
"date": day_str,
"time": time_str,
"astro_angle": astro,
"planned_offset": offset,
"planned_angle": planned_angle,
"actual_angle": actual_angle,
"gate_passed": slot["gate_passed"],
"deviation": abs(planned_angle - actual_angle) if actual_angle is not None else None,
})
comp_df = pd.DataFrame(comparisons)
if comp_df.empty:
print(" No comparison data generated")
return comp_df
valid = comp_df.dropna(subset=["deviation"])
if len(valid):
mae = valid["deviation"].mean()
max_dev = valid["deviation"].max()
within_2 = (valid["deviation"] <= 2.0).sum()
print(f" Matched records: {len(valid)}")
print(f" Mean Absolute Error: {mae:.2f}°")
print(f" Max deviation: {max_dev:.2f}°")
print(f" Within ±2° tolerance: {within_2}/{len(valid)} "
f"({within_2/len(valid)*100:.0f}%)")
# Per-day breakdown
print(f"\n {'Date':<12} {'Slots':>6} {'MAE':>7} {'MaxDev':>7} {'Within2°':>9}")
for day, grp in valid.groupby("date"):
d_mae = grp["deviation"].mean()
d_max = grp["deviation"].max()
d_ok = (grp["deviation"] <= 2.0).sum()
print(f" {day:<12} {len(grp):>6} {d_mae:>6.2f}° {d_max:>6.2f}° "
f"{d_ok:>4}/{len(grp):<4}")
else:
print(" No matched actual vs planned angle data")
return comp_df
# ---------------------------------------------------------------------------
# Step 7: Validate InterventionGate decisions
# ---------------------------------------------------------------------------
def step7_validate_gate(plans: dict, weather_df: pd.DataFrame) -> list[dict]:
"""Check gate decisions against weather conditions."""
print("\n[Step 7] Validating InterventionGate decisions...")
violations = []
for day_str, plan in plans.items():
if "error" in plan:
continue
for slot in plan.get("slots", []):
time_str = slot["time"]
offset = slot["offset_deg"]
gate = slot["gate_passed"]
tags = slot.get("tags", [])
ts = pd.Timestamp(f"{day_str} {time_str}", tz="UTC")
hour = ts.hour + ts.minute / 60.0
# Get weather at this slot
temp_c = None
ghi = None
if not weather_df.empty:
idx = weather_df.index.get_indexer([ts], method="nearest")
if idx[0] >= 0:
row = weather_df.iloc[idx[0]]
time_diff = abs((weather_df.index[idx[0]] - ts).total_seconds())
if time_diff < 1800:
temp_c = row.get("airTemperature")
par = row.get("PAR")
if par is not None and pd.notna(par):
ghi = par / 2.1
if temp_c is not None and pd.notna(temp_c):
temp_c = float(temp_c)
# Check violations
if gate and offset > 0:
# Intervention was allowed — verify conditions
if hour < NO_SHADE_BEFORE_HOUR:
violations.append({
"date": day_str, "time": time_str,
"type": "SHADE_BEFORE_10",
"detail": f"Shading at {hour:.1f}h (before {NO_SHADE_BEFORE_HOUR}:00)",
"severity": "CRITICAL",
})
if temp_c is not None and temp_c < SHADE_ELIGIBLE_TLEAF_ABOVE:
violations.append({
"date": day_str, "time": time_str,
"type": "SHADE_BELOW_TEMP",
"detail": f"Shading at {temp_c:.1f}°C (threshold: {SHADE_ELIGIBLE_TLEAF_ABOVE}°C)",
"severity": "WARNING",
})
if ghi is not None and ghi < SHADE_ELIGIBLE_GHI_ABOVE:
violations.append({
"date": day_str, "time": time_str,
"type": "SHADE_LOW_GHI",
"detail": f"Shading at GHI {ghi:.0f} W/m² (threshold: {SHADE_ELIGIBLE_GHI_ABOVE})",
"severity": "WARNING",
})
elif not gate and offset == 0:
# Gate blocked — verify it SHOULD have been blocked
if temp_c is not None and temp_c >= SHADE_ELIGIBLE_TLEAF_ABOVE and \
ghi is not None and ghi >= SHADE_ELIGIBLE_GHI_ABOVE and \
hour >= NO_SHADE_BEFORE_HOUR:
# Conditions seem favorable but gate blocked — might be CWSI
# Not necessarily a violation (CWSI proxy could block)
pass
if violations:
critical = [v for v in violations if v["severity"] == "CRITICAL"]
warnings = [v for v in violations if v["severity"] == "WARNING"]
print(f" {len(critical)} CRITICAL violations, {len(warnings)} warnings")
for v in critical:
print(f" !! {v['date']} {v['time']}: {v['detail']}")
for v in warnings[:5]:
print(f" ? {v['date']} {v['time']}: {v['detail']}")
if len(warnings) > 5:
print(f" ... and {len(warnings) - 5} more warnings")
else:
print(" No gate violations found — all decisions are consistent")
return violations
# ---------------------------------------------------------------------------
# Step 8: Verify energy budget compliance
# ---------------------------------------------------------------------------
def step8_budget_compliance(plans: dict) -> list[dict]:
"""Check that no plan exceeds its daily budget."""
print("\n[Step 8] Verifying energy budget compliance...")
results = []
for day_str, plan in plans.items():
if "error" in plan:
continue
budget = plan.get("daily_budget_kwh", 0)
cost = plan.get("total_energy_cost_kwh", 0)
util = plan.get("budget_utilisation_pct", 0)
exceeded = cost > budget
result = {
"date": day_str,
"budget_kwh": budget,
"cost_kwh": cost,
"utilisation_pct": util,
"exceeded": exceeded,
}
results.append(result)
if exceeded:
print(f" !! {day_str}: BUDGET EXCEEDED — cost {cost:.4f} > budget {budget:.4f} kWh")
if results:
total_budget = sum(r["budget_kwh"] for r in results)
total_cost = sum(r["cost_kwh"] for r in results)
any_exceeded = any(r["exceeded"] for r in results)
print(f" Total budget: {total_budget:.4f} kWh, total cost: {total_cost:.4f} kWh "
f"({total_cost/total_budget*100:.1f}%)" if total_budget > 0 else " No budget data")
if not any_exceeded:
print(" All days within budget — PASS")
return results
# ---------------------------------------------------------------------------
# Step 9: Cross-validate FvCB model
# ---------------------------------------------------------------------------
def step9_fvcb_validation(weather_df: pd.DataFrame) -> list[dict]:
"""Run FvCB model on available weather data and check consistency."""
print("\n[Step 9] Cross-validating FvCB model outputs...")
try:
from src.models.farquhar_model import FarquharModel
except ImportError:
try:
from src.farquhar_model import FarquharModel
except ImportError:
print(" FarquharModel not importable — skipping")
return []
model = FarquharModel()
results = []
if weather_df.empty:
print(" No weather data for FvCB validation")
return results
# Sample some daylight records
daylight = weather_df.between_time("05:00", "17:00")
if daylight.empty:
# Timestamps are UTC — Sde Boker is UTC+2/+3, so daylight is ~03:00–15:00 UTC
daylight = weather_df[
(weather_df.index.hour >= 4) & (weather_df.index.hour <= 16)
]
sample = daylight.dropna(subset=["airTemperature"]).head(50)
for ts, row in sample.iterrows():
temp = float(row["airTemperature"])
par = float(row.get("PAR", 500)) if pd.notna(row.get("PAR")) else 500.0
try:
result = model.calc_photosynthesis_semillon(
PAR=par,
Tleaf=temp,
CO2=400.0,
VPD=2.0,
Tair=temp,
)
if isinstance(result, tuple) and len(result) >= 3:
A, limiting, shading_helps = result[0], result[1], result[2]
else:
A = result
limiting = "unknown"
shading_helps = temp >= SEMILLON_TRANSITION_TEMP_C
# Consistency checks
issues = []
if A < 0:
issues.append("negative_A")
if A > 40:
issues.append("A_too_high")
if temp < SEMILLON_TRANSITION_TEMP_C and shading_helps:
issues.append("shading_helps_below_transition")
if temp >= SEMILLON_TRANSITION_TEMP_C and not shading_helps and limiting == "rubisco":
issues.append("rubisco_limited_but_shading_not_helpful")
results.append({
"timestamp": str(ts),
"temp_c": temp,
"par": par,
"A": round(A, 2),
"limiting": str(limiting),
"shading_helps": bool(shading_helps),
"issues": issues,
})
except Exception as e:
results.append({
"timestamp": str(ts),
"temp_c": temp,
"par": par,
"error": str(e),
})
valid = [r for r in results if "error" not in r]
issues_found = [r for r in valid if r.get("issues")]
if valid:
a_values = [r["A"] for r in valid]
print(f" {len(valid)} slots evaluated")
print(f" A range: {min(a_values):.2f}{max(a_values):.2f} µmol/m²/s "
f"(mean {sum(a_values)/len(a_values):.2f})")
shading_count = sum(1 for r in valid if r["shading_helps"])
print(f" Shading helps: {shading_count}/{len(valid)} slots")
if issues_found:
print(f" {len(issues_found)} consistency issues found:")
for r in issues_found[:5]:
print(f" {r['timestamp']}: {r['issues']} "
f"(T={r['temp_c']:.1f}°C, A={r['A']:.2f})")
else:
print(" No FvCB consistency issues — PASS")
errors = [r for r in results if "error" in r]
if errors:
print(f" {len(errors)} FvCB computation errors")
for r in errors[:3]:
print(f" {r['timestamp']}: {r['error']}")
return results
# ---------------------------------------------------------------------------
# Step 10: Summary scorecard
# ---------------------------------------------------------------------------
def step10_scorecard(
plans: dict,
comp_df: pd.DataFrame,
violations: list[dict],
budget_results: list[dict],
fvcb_results: list[dict],
tracker_data: dict,
energy_df: pd.DataFrame,
) -> dict:
"""Generate and print the final verification scorecard."""
print("\n" + "=" * 70)
print(" VERIFICATION SCORECARD")
print("=" * 70)
scorecard = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"days_analyzed": len(plans),
"checks": {},
}
# 1. Data availability
has_trackers = bool(tracker_data)
has_energy = not energy_df.empty
has_plans = any("error" not in p for p in plans.values())
print(f"\n Data Availability:")
print(f" Tracker telemetry: {'YES' if has_trackers else 'NO'}")
print(f" Energy production: {'YES' if has_energy else 'NO'}")
print(f" Planner output: {'YES' if has_plans else 'NO'}")
scorecard["checks"]["data_availability"] = {
"trackers": has_trackers, "energy": has_energy, "plans": has_plans,
}
# 2. Angle alignment
if not comp_df.empty:
valid = comp_df.dropna(subset=["deviation"])
if len(valid):
mae = valid["deviation"].mean()
within_2 = (valid["deviation"] <= 2.0).mean() * 100
status = "PASS" if mae < 5.0 else "WARN" if mae < 15.0 else "FAIL"
print(f"\n Angle Alignment ({status}):")
print(f" MAE: {mae:.2f}°")
print(f" Within ±2° tolerance: {within_2:.0f}%")
scorecard["checks"]["angle_alignment"] = {
"status": status, "mae_deg": round(mae, 2),
"within_tolerance_pct": round(within_2, 1),
}
# 3. Gate compliance
critical = [v for v in violations if v["severity"] == "CRITICAL"]
warnings = [v for v in violations if v["severity"] == "WARNING"]
gate_status = "PASS" if not critical else "FAIL"
print(f"\n Gate Compliance ({gate_status}):")
print(f" Critical violations: {len(critical)}")
print(f" Warnings: {len(warnings)}")
scorecard["checks"]["gate_compliance"] = {
"status": gate_status,
"critical": len(critical),
"warnings": len(warnings),
}
# 4. Budget compliance
exceeded = [r for r in budget_results if r.get("exceeded")]
budget_status = "PASS" if not exceeded else "FAIL"
print(f"\n Budget Compliance ({budget_status}):")
print(f" Days exceeding budget: {len(exceeded)}/{len(budget_results)}")
if budget_results:
total_cost = sum(r["cost_kwh"] for r in budget_results)
total_budget = sum(r["budget_kwh"] for r in budget_results)
print(f" Total spend: {total_cost:.4f} / {total_budget:.4f} kWh")
scorecard["checks"]["budget_compliance"] = {
"status": budget_status, "days_exceeded": len(exceeded),
}
# 5. FvCB consistency
fvcb_issues = [r for r in fvcb_results if r.get("issues")]
fvcb_errors = [r for r in fvcb_results if "error" in r]
fvcb_status = "PASS" if not fvcb_issues and not fvcb_errors else "WARN" if not fvcb_errors else "FAIL"
print(f"\n FvCB Model ({fvcb_status}):")
print(f" Consistency issues: {len(fvcb_issues)}")
print(f" Computation errors: {len(fvcb_errors)}")
scorecard["checks"]["fvcb_model"] = {
"status": fvcb_status,
"issues": len(fvcb_issues),
"errors": len(fvcb_errors),
}
# 6. Overall
all_statuses = [c.get("status", "PASS") for c in scorecard["checks"].values() if isinstance(c, dict)]
if "FAIL" in all_statuses:
overall = "FAIL"
elif "WARN" in all_statuses:
overall = "WARN"
else:
overall = "PASS"
print(f"\n {'=' * 40}")
print(f" OVERALL: {overall}")
print(f" {'=' * 40}")
scorecard["overall"] = overall
return scorecard
# ---------------------------------------------------------------------------
# Per-day scorecard table
# ---------------------------------------------------------------------------
def print_daily_table(plans, comp_df, violations, budget_results):
"""Print the per-day scorecard table from Step 10."""
print("\n Per-Day Scorecard:")
print(f" {'Date':<12} {'Interv':>7} {'Budget%':>8} {'MAE°':>6} "
f"{'GateViol':>9} {'Status':>8}")
print(f" {'-'*60}")
days = sorted(plans.keys())
for day in days:
plan = plans[day]
if "error" in plan:
print(f" {day:<12} {'ERROR':>7}")
continue
n_interv = plan.get("n_intervention_slots", 0)
util = plan.get("budget_utilisation_pct", 0)
# MAE for this day
if not comp_df.empty:
day_comp = comp_df[comp_df["date"] == day].dropna(subset=["deviation"])
mae = day_comp["deviation"].mean() if len(day_comp) else float("nan")
else:
mae = float("nan")
# Gate violations for this day
day_viol = [v for v in violations if v["date"] == day and v["severity"] == "CRITICAL"]
status = "PASS"
if day_viol:
status = "FAIL"
elif mae > 10:
status = "WARN"
mae_str = f"{mae:.1f}" if not math.isnan(mae) else "N/A"
print(f" {day:<12} {n_interv:>7} {util:>7.1f}% {mae_str:>6} "
f"{len(day_viol):>9} {status:>8}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
import argparse
parser = argparse.ArgumentParser(description="SolarWine control system verification")
parser.add_argument("--days", type=int, default=10, help="Number of days to verify (default: 10)")
parser.add_argument("--output", type=str, help="Save JSON report to this path")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(name)-15s %(levelname)-7s %(message)s",
datefmt="%H:%M:%S",
)
end_date = date.today() - timedelta(days=1) # yesterday
start_date = end_date - timedelta(days=args.days - 1)
print("=" * 70)
print(f" SolarWine Control System Verification")
print(f" Period: {start_date}{end_date} ({args.days} days)")
print("=" * 70)
# Connect to ThingsBoard
tb = get_tb_client()
start_dt = datetime(start_date.year, start_date.month, start_date.day, tzinfo=timezone.utc)
end_dt = datetime(end_date.year, end_date.month, end_date.day, 23, 59, 59, tzinfo=timezone.utc)
# Steps 1-3: Data collection
tracker_data = step1_tracker_angles(tb, start_dt, end_dt)
energy_df = step2_energy_production(tb, start_dt, end_dt)
weather_df = step3_weather_data(tb, start_dt, end_dt)
# Steps 4-5: Planning & astronomy
plans = step4_run_planner(weather_df, start_date, end_date)
astro_df = step5_astronomical_angles(start_date, end_date)
# Steps 6-9: Analysis
comp_df = step6_compare_angles(plans, tracker_data, astro_df)
violations = step7_validate_gate(plans, weather_df)
budget_results = step8_budget_compliance(plans)
fvcb_results = step9_fvcb_validation(weather_df)
# Step 10: Final scorecard
scorecard = step10_scorecard(
plans, comp_df, violations, budget_results, fvcb_results,
tracker_data, energy_df,
)
print_daily_table(plans, comp_df, violations, budget_results)
# Save report
if args.output:
report = {
"scorecard": scorecard,
"plans": plans,
"violations": violations,
"budget_results": budget_results,
"fvcb_results": fvcb_results,
"comparison_summary": {
"total_records": len(comp_df) if not comp_df.empty else 0,
"matched_records": len(comp_df.dropna(subset=["deviation"])) if not comp_df.empty else 0,
},
}
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
print(f"\nDetailed report saved to: {args.output}")
print("\nVerification complete.")
if __name__ == "__main__":
main()