#!/usr/bin/env python3 """ SolarWine Control System Verification Script ============================================= 10-step manual verification comparing DayAheadPlanner output against actual tracker behavior from ThingsBoard over the last N days. Steps: 1. Pull actual tracker angles from ThingsBoard 2. Pull actual energy production from Plant asset 3. Pull historical IMS weather (or use TB ambient sensor) 4. Run DayAheadPlanner for each day (with real weather as "perfect forecast") 5. Compute astronomical tracking angles for comparison 6. Compare planned angles vs actual angles (MAE, max deviation) 7. Validate InterventionGate decisions against conditions 8. Verify energy budget compliance 9. Cross-validate FvCB model outputs 10. Generate summary report / scorecard Usage ----- # Default: last 10 days python scripts/verify_control_system.py # Custom range python scripts/verify_control_system.py --days 7 # Save detailed JSON report python scripts/verify_control_system.py --output Data/verification_report.json """ from __future__ import annotations import json import logging import math import os import sys from datetime import date, datetime, timedelta, timezone from pathlib import Path import numpy as np import pandas as pd # Ensure project root is importable sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from config.settings import ( CANDIDATE_OFFSETS, NO_SHADE_BEFORE_HOUR, SEMILLON_TRANSITION_TEMP_C, SHADE_ELIGIBLE_GHI_ABOVE, SHADE_ELIGIBLE_TLEAF_ABOVE, SITE_LATITUDE, SITE_LONGITUDE, SYSTEM_CAPACITY_KW, TRACKER_ID_MAP, ) logger = logging.getLogger("verification") # --------------------------------------------------------------------------- # ThingsBoard connection # --------------------------------------------------------------------------- def get_tb_client(): """Create a ThingsBoard client with prod credentials.""" from src.data.thingsboard_client import ThingsBoardClient, ThingsBoardConfig config = ThingsBoardConfig( host=os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/"), username=os.environ.get("THINGSBOARD_USERNAME"), password=os.environ.get("THINGSBOARD_PASSWORD"), ) return ThingsBoardClient(config) # --------------------------------------------------------------------------- # Step 1: Pull actual tracker angles # --------------------------------------------------------------------------- def step1_tracker_angles(tb, start: datetime, end: datetime) -> dict[str, pd.DataFrame]: """Fetch tracker angle timeseries for all 4 trackers.""" print("\n[Step 1] Pulling actual tracker angles from ThingsBoard...") tracker_data = {} for tid, tname in TRACKER_ID_MAP.items(): try: df = tb.get_timeseries( tname, ["angle"], start, end, limit=10000, agg="NONE", ) if not df.empty: tracker_data[tname] = df print(f" {tname}: {len(df)} records, " f"angle range [{df['angle'].min():.1f}, {df['angle'].max():.1f}]°") else: print(f" {tname}: no data") except Exception as e: print(f" {tname}: ERROR - {e}") return tracker_data # --------------------------------------------------------------------------- # Step 2: Pull actual energy production # --------------------------------------------------------------------------- def step2_energy_production(tb, start: datetime, end: datetime) -> pd.DataFrame: """Fetch Plant asset energy production.""" print("\n[Step 2] Pulling actual energy production from Plant asset...") try: df = tb.get_asset_timeseries( "Plant", ["production"], start, end, limit=10000, interval_ms=3_600_000, # hourly aggregation agg="SUM", ) if not df.empty: # production is in Wh, convert to kWh daily = df.resample("D").sum() / 1000.0 for idx, row in daily.iterrows(): print(f" {idx.strftime('%Y-%m-%d')}: {row.get('production', 0):.1f} kWh") else: print(" No energy data available") return df except Exception as e: print(f" ERROR: {e}") return pd.DataFrame() # --------------------------------------------------------------------------- # Step 3: Pull historical weather (IMS or TB ambient sensor) # --------------------------------------------------------------------------- def step3_weather_data(tb, start: datetime, end: datetime) -> pd.DataFrame: """Fetch weather data from Air1 ambient sensor (temp, PAR for GHI proxy).""" print("\n[Step 3] Pulling weather data from Air1 (ambient sensor)...") keys = ["airTemperature", "PAR", "windSpeed"] try: df = tb.get_timeseries("Air1", keys, start, end, limit=10000, agg="NONE") if not df.empty: # Resample to 15-min df = df.resample("15min").mean() print(f" {len(df)} records (15-min resampled)") if "airTemperature" in df.columns: temps = df["airTemperature"].dropna() if len(temps): print(f" Temperature: {temps.min():.1f}–{temps.max():.1f}°C " f"(mean {temps.mean():.1f}°C)") if "PAR" in df.columns: par = df["PAR"].dropna() if len(par): # PAR (µmol/m²/s) → GHI proxy: GHI ≈ PAR / 2.1 print(f" PAR: {par.min():.0f}–{par.max():.0f} µmol/m²/s " f"(GHI proxy: {par.max()/2.1:.0f} W/m²)") else: print(" No weather data from Air1") return df except Exception as e: print(f" ERROR: {e}") return pd.DataFrame() # --------------------------------------------------------------------------- # Step 4: Run DayAheadPlanner for each day # --------------------------------------------------------------------------- def step4_run_planner(weather_df: pd.DataFrame, start_date: date, end_date: date) -> dict[str, dict]: """Run DayAheadPlanner for each day using real weather as forecast.""" print("\n[Step 4] Running DayAheadPlanner for each day...") from src.day_ahead_planner import DayAheadPlanner from src.energy_budget import EnergyBudgetPlanner planner = DayAheadPlanner() budget_planner = EnergyBudgetPlanner() # Get annual plan for budget allocation year = start_date.year try: annual = budget_planner.compute_annual_plan(year) month_budget = annual.get("monthly_budgets", {}) except Exception: month_budget = {} plans = {} current = start_date while current <= end_date: day_str = str(current) # Extract 96 temperature and GHI values for this day day_start = pd.Timestamp(current, tz="UTC") day_end = day_start + pd.Timedelta(hours=24) - pd.Timedelta(minutes=15) day_times = pd.date_range(day_start, periods=96, freq="15min") temps = [25.0] * 96 ghis = [0.0] * 96 if not weather_df.empty: day_weather = weather_df.loc[ (weather_df.index >= day_start) & (weather_df.index < day_start + pd.Timedelta(days=1)) ] for i, ts in enumerate(day_times): # Find closest weather record if len(day_weather) > 0: idx = day_weather.index.get_indexer([ts], method="nearest")[0] if idx >= 0 and idx < len(day_weather): row = day_weather.iloc[idx] if "airTemperature" in row and pd.notna(row["airTemperature"]): temps[i] = float(row["airTemperature"]) if "PAR" in row and pd.notna(row["PAR"]): # PAR → GHI proxy ghis[i] = float(row["PAR"]) / 2.1 # Daily budget: use monthly allocation / 30 as simple estimate month = current.month monthly_kwh = month_budget.get(month, 0.5) import calendar days_in_month = calendar.monthrange(current.year, current.month)[1] daily_budget = monthly_kwh / days_in_month if monthly_kwh > 0 else 0.5 try: plan = planner.plan_day( target_date=current, forecast_temps=temps, forecast_ghi=ghis, daily_budget_kwh=daily_budget, ) plans[day_str] = plan.to_dict() n_interv = plan.n_intervention_slots print(f" {day_str}: {len(plan.slots)} slots, " f"{n_interv} interventions, " f"cost {plan.total_energy_cost_kwh:.4f}/{daily_budget:.4f} kWh " f"({plan.budget_utilisation_pct:.1f}%)") except Exception as e: print(f" {day_str}: PLANNER ERROR - {e}") plans[day_str] = {"error": str(e)} current += timedelta(days=1) return plans # --------------------------------------------------------------------------- # Step 5: Compute astronomical tracking angles # --------------------------------------------------------------------------- def step5_astronomical_angles(start_date: date, end_date: date) -> pd.DataFrame: """Compute expected astronomical tracking angles for each 15-min slot.""" print("\n[Step 5] Computing astronomical tracking angles...") from src.shading.solar_geometry import ShadowModel shadow = ShadowModel() records = [] current = start_date while current <= end_date: day_start = pd.Timestamp(current, tz="UTC") times = pd.date_range(day_start, periods=96, freq="15min") solar_pos = shadow.get_solar_position(times) for i, ts in enumerate(times): elev = float(solar_pos.iloc[i]["solar_elevation"]) azim = float(solar_pos.iloc[i]["solar_azimuth"]) if elev > 2: tracker = shadow.compute_tracker_tilt(azim, elev) astro_angle = float(tracker["tracker_theta"]) else: astro_angle = 0.0 # night — stowed records.append({ "timestamp": ts, "date": str(current), "solar_elevation": elev, "solar_azimuth": azim, "astro_angle": astro_angle, }) current += timedelta(days=1) df = pd.DataFrame(records).set_index("timestamp") daylight = df[df["solar_elevation"] > 2] print(f" {len(daylight)} daylight slots computed") if len(daylight): print(f" Astro angle range: [{daylight['astro_angle'].min():.1f}, " f"{daylight['astro_angle'].max():.1f}]°") return df # --------------------------------------------------------------------------- # Step 6: Compare planned vs actual angles # --------------------------------------------------------------------------- def step6_compare_angles( plans: dict, tracker_data: dict[str, pd.DataFrame], astro_df: pd.DataFrame, ) -> pd.DataFrame: """Compare planned offsets + astronomical angles against actual tracker angles.""" print("\n[Step 6] Comparing planned vs actual tracker angles...") if not tracker_data: print(" No tracker data available — skipping comparison") return pd.DataFrame() # Use first available tracker for comparison tracker_name = next(iter(tracker_data)) actual_df = tracker_data[tracker_name].copy() print(f" Using {tracker_name} for comparison ({len(actual_df)} records)") comparisons = [] for day_str, plan in plans.items(): if "error" in plan: continue slots = plan.get("slots", []) for slot in slots: time_str = slot["time"] offset = slot["offset_deg"] # Build timestamp ts = pd.Timestamp(f"{day_str} {time_str}", tz="UTC") # Get astronomical angle if ts in astro_df.index: astro = astro_df.loc[ts, "astro_angle"] else: # Find nearest idx = astro_df.index.get_indexer([ts], method="nearest")[0] astro = astro_df.iloc[idx]["astro_angle"] if idx >= 0 else 0.0 planned_angle = astro + offset # Find nearest actual angle actual_angle = None if not actual_df.empty and "angle" in actual_df.columns: nearest_idx = actual_df.index.get_indexer([ts], method="nearest") if nearest_idx[0] >= 0: actual_row = actual_df.iloc[nearest_idx[0]] time_diff = abs((actual_df.index[nearest_idx[0]] - ts).total_seconds()) if time_diff < 1800: # within 30 min actual_angle = float(actual_row["angle"]) comparisons.append({ "timestamp": ts, "date": day_str, "time": time_str, "astro_angle": astro, "planned_offset": offset, "planned_angle": planned_angle, "actual_angle": actual_angle, "gate_passed": slot["gate_passed"], "deviation": abs(planned_angle - actual_angle) if actual_angle is not None else None, }) comp_df = pd.DataFrame(comparisons) if comp_df.empty: print(" No comparison data generated") return comp_df valid = comp_df.dropna(subset=["deviation"]) if len(valid): mae = valid["deviation"].mean() max_dev = valid["deviation"].max() within_2 = (valid["deviation"] <= 2.0).sum() print(f" Matched records: {len(valid)}") print(f" Mean Absolute Error: {mae:.2f}°") print(f" Max deviation: {max_dev:.2f}°") print(f" Within ±2° tolerance: {within_2}/{len(valid)} " f"({within_2/len(valid)*100:.0f}%)") # Per-day breakdown print(f"\n {'Date':<12} {'Slots':>6} {'MAE':>7} {'MaxDev':>7} {'Within2°':>9}") for day, grp in valid.groupby("date"): d_mae = grp["deviation"].mean() d_max = grp["deviation"].max() d_ok = (grp["deviation"] <= 2.0).sum() print(f" {day:<12} {len(grp):>6} {d_mae:>6.2f}° {d_max:>6.2f}° " f"{d_ok:>4}/{len(grp):<4}") else: print(" No matched actual vs planned angle data") return comp_df # --------------------------------------------------------------------------- # Step 7: Validate InterventionGate decisions # --------------------------------------------------------------------------- def step7_validate_gate(plans: dict, weather_df: pd.DataFrame) -> list[dict]: """Check gate decisions against weather conditions.""" print("\n[Step 7] Validating InterventionGate decisions...") violations = [] for day_str, plan in plans.items(): if "error" in plan: continue for slot in plan.get("slots", []): time_str = slot["time"] offset = slot["offset_deg"] gate = slot["gate_passed"] tags = slot.get("tags", []) ts = pd.Timestamp(f"{day_str} {time_str}", tz="UTC") hour = ts.hour + ts.minute / 60.0 # Get weather at this slot temp_c = None ghi = None if not weather_df.empty: idx = weather_df.index.get_indexer([ts], method="nearest") if idx[0] >= 0: row = weather_df.iloc[idx[0]] time_diff = abs((weather_df.index[idx[0]] - ts).total_seconds()) if time_diff < 1800: temp_c = row.get("airTemperature") par = row.get("PAR") if par is not None and pd.notna(par): ghi = par / 2.1 if temp_c is not None and pd.notna(temp_c): temp_c = float(temp_c) # Check violations if gate and offset > 0: # Intervention was allowed — verify conditions if hour < NO_SHADE_BEFORE_HOUR: violations.append({ "date": day_str, "time": time_str, "type": "SHADE_BEFORE_10", "detail": f"Shading at {hour:.1f}h (before {NO_SHADE_BEFORE_HOUR}:00)", "severity": "CRITICAL", }) if temp_c is not None and temp_c < SHADE_ELIGIBLE_TLEAF_ABOVE: violations.append({ "date": day_str, "time": time_str, "type": "SHADE_BELOW_TEMP", "detail": f"Shading at {temp_c:.1f}°C (threshold: {SHADE_ELIGIBLE_TLEAF_ABOVE}°C)", "severity": "WARNING", }) if ghi is not None and ghi < SHADE_ELIGIBLE_GHI_ABOVE: violations.append({ "date": day_str, "time": time_str, "type": "SHADE_LOW_GHI", "detail": f"Shading at GHI {ghi:.0f} W/m² (threshold: {SHADE_ELIGIBLE_GHI_ABOVE})", "severity": "WARNING", }) elif not gate and offset == 0: # Gate blocked — verify it SHOULD have been blocked if temp_c is not None and temp_c >= SHADE_ELIGIBLE_TLEAF_ABOVE and \ ghi is not None and ghi >= SHADE_ELIGIBLE_GHI_ABOVE and \ hour >= NO_SHADE_BEFORE_HOUR: # Conditions seem favorable but gate blocked — might be CWSI # Not necessarily a violation (CWSI proxy could block) pass if violations: critical = [v for v in violations if v["severity"] == "CRITICAL"] warnings = [v for v in violations if v["severity"] == "WARNING"] print(f" {len(critical)} CRITICAL violations, {len(warnings)} warnings") for v in critical: print(f" !! {v['date']} {v['time']}: {v['detail']}") for v in warnings[:5]: print(f" ? {v['date']} {v['time']}: {v['detail']}") if len(warnings) > 5: print(f" ... and {len(warnings) - 5} more warnings") else: print(" No gate violations found — all decisions are consistent") return violations # --------------------------------------------------------------------------- # Step 8: Verify energy budget compliance # --------------------------------------------------------------------------- def step8_budget_compliance(plans: dict) -> list[dict]: """Check that no plan exceeds its daily budget.""" print("\n[Step 8] Verifying energy budget compliance...") results = [] for day_str, plan in plans.items(): if "error" in plan: continue budget = plan.get("daily_budget_kwh", 0) cost = plan.get("total_energy_cost_kwh", 0) util = plan.get("budget_utilisation_pct", 0) exceeded = cost > budget result = { "date": day_str, "budget_kwh": budget, "cost_kwh": cost, "utilisation_pct": util, "exceeded": exceeded, } results.append(result) if exceeded: print(f" !! {day_str}: BUDGET EXCEEDED — cost {cost:.4f} > budget {budget:.4f} kWh") if results: total_budget = sum(r["budget_kwh"] for r in results) total_cost = sum(r["cost_kwh"] for r in results) any_exceeded = any(r["exceeded"] for r in results) print(f" Total budget: {total_budget:.4f} kWh, total cost: {total_cost:.4f} kWh " f"({total_cost/total_budget*100:.1f}%)" if total_budget > 0 else " No budget data") if not any_exceeded: print(" All days within budget — PASS") return results # --------------------------------------------------------------------------- # Step 9: Cross-validate FvCB model # --------------------------------------------------------------------------- def step9_fvcb_validation(weather_df: pd.DataFrame) -> list[dict]: """Run FvCB model on available weather data and check consistency.""" print("\n[Step 9] Cross-validating FvCB model outputs...") try: from src.models.farquhar_model import FarquharModel except ImportError: try: from src.farquhar_model import FarquharModel except ImportError: print(" FarquharModel not importable — skipping") return [] model = FarquharModel() results = [] if weather_df.empty: print(" No weather data for FvCB validation") return results # Sample some daylight records daylight = weather_df.between_time("05:00", "17:00") if daylight.empty: # Timestamps are UTC — Sde Boker is UTC+2/+3, so daylight is ~03:00–15:00 UTC daylight = weather_df[ (weather_df.index.hour >= 4) & (weather_df.index.hour <= 16) ] sample = daylight.dropna(subset=["airTemperature"]).head(50) for ts, row in sample.iterrows(): temp = float(row["airTemperature"]) par = float(row.get("PAR", 500)) if pd.notna(row.get("PAR")) else 500.0 try: result = model.calc_photosynthesis_semillon( PAR=par, Tleaf=temp, CO2=400.0, VPD=2.0, Tair=temp, ) if isinstance(result, tuple) and len(result) >= 3: A, limiting, shading_helps = result[0], result[1], result[2] else: A = result limiting = "unknown" shading_helps = temp >= SEMILLON_TRANSITION_TEMP_C # Consistency checks issues = [] if A < 0: issues.append("negative_A") if A > 40: issues.append("A_too_high") if temp < SEMILLON_TRANSITION_TEMP_C and shading_helps: issues.append("shading_helps_below_transition") if temp >= SEMILLON_TRANSITION_TEMP_C and not shading_helps and limiting == "rubisco": issues.append("rubisco_limited_but_shading_not_helpful") results.append({ "timestamp": str(ts), "temp_c": temp, "par": par, "A": round(A, 2), "limiting": str(limiting), "shading_helps": bool(shading_helps), "issues": issues, }) except Exception as e: results.append({ "timestamp": str(ts), "temp_c": temp, "par": par, "error": str(e), }) valid = [r for r in results if "error" not in r] issues_found = [r for r in valid if r.get("issues")] if valid: a_values = [r["A"] for r in valid] print(f" {len(valid)} slots evaluated") print(f" A range: {min(a_values):.2f}–{max(a_values):.2f} µmol/m²/s " f"(mean {sum(a_values)/len(a_values):.2f})") shading_count = sum(1 for r in valid if r["shading_helps"]) print(f" Shading helps: {shading_count}/{len(valid)} slots") if issues_found: print(f" {len(issues_found)} consistency issues found:") for r in issues_found[:5]: print(f" {r['timestamp']}: {r['issues']} " f"(T={r['temp_c']:.1f}°C, A={r['A']:.2f})") else: print(" No FvCB consistency issues — PASS") errors = [r for r in results if "error" in r] if errors: print(f" {len(errors)} FvCB computation errors") for r in errors[:3]: print(f" {r['timestamp']}: {r['error']}") return results # --------------------------------------------------------------------------- # Step 10: Summary scorecard # --------------------------------------------------------------------------- def step10_scorecard( plans: dict, comp_df: pd.DataFrame, violations: list[dict], budget_results: list[dict], fvcb_results: list[dict], tracker_data: dict, energy_df: pd.DataFrame, ) -> dict: """Generate and print the final verification scorecard.""" print("\n" + "=" * 70) print(" VERIFICATION SCORECARD") print("=" * 70) scorecard = { "generated_at": datetime.now(timezone.utc).isoformat(), "days_analyzed": len(plans), "checks": {}, } # 1. Data availability has_trackers = bool(tracker_data) has_energy = not energy_df.empty has_plans = any("error" not in p for p in plans.values()) print(f"\n Data Availability:") print(f" Tracker telemetry: {'YES' if has_trackers else 'NO'}") print(f" Energy production: {'YES' if has_energy else 'NO'}") print(f" Planner output: {'YES' if has_plans else 'NO'}") scorecard["checks"]["data_availability"] = { "trackers": has_trackers, "energy": has_energy, "plans": has_plans, } # 2. Angle alignment if not comp_df.empty: valid = comp_df.dropna(subset=["deviation"]) if len(valid): mae = valid["deviation"].mean() within_2 = (valid["deviation"] <= 2.0).mean() * 100 status = "PASS" if mae < 5.0 else "WARN" if mae < 15.0 else "FAIL" print(f"\n Angle Alignment ({status}):") print(f" MAE: {mae:.2f}°") print(f" Within ±2° tolerance: {within_2:.0f}%") scorecard["checks"]["angle_alignment"] = { "status": status, "mae_deg": round(mae, 2), "within_tolerance_pct": round(within_2, 1), } # 3. Gate compliance critical = [v for v in violations if v["severity"] == "CRITICAL"] warnings = [v for v in violations if v["severity"] == "WARNING"] gate_status = "PASS" if not critical else "FAIL" print(f"\n Gate Compliance ({gate_status}):") print(f" Critical violations: {len(critical)}") print(f" Warnings: {len(warnings)}") scorecard["checks"]["gate_compliance"] = { "status": gate_status, "critical": len(critical), "warnings": len(warnings), } # 4. Budget compliance exceeded = [r for r in budget_results if r.get("exceeded")] budget_status = "PASS" if not exceeded else "FAIL" print(f"\n Budget Compliance ({budget_status}):") print(f" Days exceeding budget: {len(exceeded)}/{len(budget_results)}") if budget_results: total_cost = sum(r["cost_kwh"] for r in budget_results) total_budget = sum(r["budget_kwh"] for r in budget_results) print(f" Total spend: {total_cost:.4f} / {total_budget:.4f} kWh") scorecard["checks"]["budget_compliance"] = { "status": budget_status, "days_exceeded": len(exceeded), } # 5. FvCB consistency fvcb_issues = [r for r in fvcb_results if r.get("issues")] fvcb_errors = [r for r in fvcb_results if "error" in r] fvcb_status = "PASS" if not fvcb_issues and not fvcb_errors else "WARN" if not fvcb_errors else "FAIL" print(f"\n FvCB Model ({fvcb_status}):") print(f" Consistency issues: {len(fvcb_issues)}") print(f" Computation errors: {len(fvcb_errors)}") scorecard["checks"]["fvcb_model"] = { "status": fvcb_status, "issues": len(fvcb_issues), "errors": len(fvcb_errors), } # 6. Overall all_statuses = [c.get("status", "PASS") for c in scorecard["checks"].values() if isinstance(c, dict)] if "FAIL" in all_statuses: overall = "FAIL" elif "WARN" in all_statuses: overall = "WARN" else: overall = "PASS" print(f"\n {'=' * 40}") print(f" OVERALL: {overall}") print(f" {'=' * 40}") scorecard["overall"] = overall return scorecard # --------------------------------------------------------------------------- # Per-day scorecard table # --------------------------------------------------------------------------- def print_daily_table(plans, comp_df, violations, budget_results): """Print the per-day scorecard table from Step 10.""" print("\n Per-Day Scorecard:") print(f" {'Date':<12} {'Interv':>7} {'Budget%':>8} {'MAE°':>6} " f"{'GateViol':>9} {'Status':>8}") print(f" {'-'*60}") days = sorted(plans.keys()) for day in days: plan = plans[day] if "error" in plan: print(f" {day:<12} {'ERROR':>7}") continue n_interv = plan.get("n_intervention_slots", 0) util = plan.get("budget_utilisation_pct", 0) # MAE for this day if not comp_df.empty: day_comp = comp_df[comp_df["date"] == day].dropna(subset=["deviation"]) mae = day_comp["deviation"].mean() if len(day_comp) else float("nan") else: mae = float("nan") # Gate violations for this day day_viol = [v for v in violations if v["date"] == day and v["severity"] == "CRITICAL"] status = "PASS" if day_viol: status = "FAIL" elif mae > 10: status = "WARN" mae_str = f"{mae:.1f}" if not math.isnan(mae) else "N/A" print(f" {day:<12} {n_interv:>7} {util:>7.1f}% {mae_str:>6} " f"{len(day_viol):>9} {status:>8}") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): import argparse parser = argparse.ArgumentParser(description="SolarWine control system verification") parser.add_argument("--days", type=int, default=10, help="Number of days to verify (default: 10)") parser.add_argument("--output", type=str, help="Save JSON report to this path") parser.add_argument("--verbose", "-v", action="store_true") args = parser.parse_args() level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig( level=level, format="%(asctime)s %(name)-15s %(levelname)-7s %(message)s", datefmt="%H:%M:%S", ) end_date = date.today() - timedelta(days=1) # yesterday start_date = end_date - timedelta(days=args.days - 1) print("=" * 70) print(f" SolarWine Control System Verification") print(f" Period: {start_date} → {end_date} ({args.days} days)") print("=" * 70) # Connect to ThingsBoard tb = get_tb_client() start_dt = datetime(start_date.year, start_date.month, start_date.day, tzinfo=timezone.utc) end_dt = datetime(end_date.year, end_date.month, end_date.day, 23, 59, 59, tzinfo=timezone.utc) # Steps 1-3: Data collection tracker_data = step1_tracker_angles(tb, start_dt, end_dt) energy_df = step2_energy_production(tb, start_dt, end_dt) weather_df = step3_weather_data(tb, start_dt, end_dt) # Steps 4-5: Planning & astronomy plans = step4_run_planner(weather_df, start_date, end_date) astro_df = step5_astronomical_angles(start_date, end_date) # Steps 6-9: Analysis comp_df = step6_compare_angles(plans, tracker_data, astro_df) violations = step7_validate_gate(plans, weather_df) budget_results = step8_budget_compliance(plans) fvcb_results = step9_fvcb_validation(weather_df) # Step 10: Final scorecard scorecard = step10_scorecard( plans, comp_df, violations, budget_results, fvcb_results, tracker_data, energy_df, ) print_daily_table(plans, comp_df, violations, budget_results) # Save report if args.output: report = { "scorecard": scorecard, "plans": plans, "violations": violations, "budget_results": budget_results, "fvcb_results": fvcb_results, "comparison_summary": { "total_records": len(comp_df) if not comp_df.empty else 0, "matched_records": len(comp_df.dropna(subset=["deviation"])) if not comp_df.empty else 0, }, } Path(args.output).parent.mkdir(parents=True, exist_ok=True) with open(args.output, "w") as f: json.dump(report, f, indent=2, default=str) print(f"\nDetailed report saved to: {args.output}") print("\nVerification complete.") if __name__ == "__main__": main()