Source code for micromet.qaqc.netrad_limits

"""
AmeriFlux-like Timestamp Alignment QA/QC

Implements the core ideas from the AmeriFlux Timestamp Alignment Module:
- Compute potential incoming shortwave at the top of atmosphere (SW_IN_POT)
  in local standard time (no DST).
- Build 15-day non-overlapping "maximum diurnal composites".
- Compute (1) % of time composite observed radiation exceeds potential and
  (2) lag (in time steps) at which cross-correlation between observed and
      potential is maximized.

Also provides heuristic flags for:
- Time zone mismatch / DST usage
- Timestamp START vs END mis-specification
- Stream desynchronization between SW_IN and PPFD_IN
- Possible radiation sensor issues (shading/not level/unexpectedly high)

References
----------
AmeriFlux Data QA/QC: Timestamp Alignment Module
(Design: 15-day max diurnal composite; local standard time; exceedance &
 cross-correlation interpretation). See:
https://ameriflux.lbl.gov/data/flux-data-products/data-qaqc/timestamp-alignment-module/
"""

from __future__ import annotations

import argparse
from dataclasses import dataclass
from typing import Dict, Iterable, Optional, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from datetime import datetime
import pytz
from math import sin, cos, radians, pi, asin

# Constants
SOLAR_CONSTANT = 1367  # W/m²
ALBEDO = 0.25  # average for natural terrain
EMISSIVITY = 0.98  # ground emissivity
STEFAN_BOLTZMANN = 5.67e-8  # W/m²/K⁴
LATITUDE = 39.5  # Utah average latitude in degrees
LONGITUDE = -111.5  # Utah average longitude in degrees


[docs] def solar_declination(doy: int) -> float: """ Calculate the solar declination angle for a given day of the year. The solar declination is the angle between the Earth's equatorial plane and the line connecting the centers of the Earth and the Sun. Parameters ---------- doy : int The day of the year (1-365 or 1-366). Returns ------- float The solar declination angle in radians. """ return radians(23.44) * sin(2 * pi * (284 + doy) / 365)
[docs] def hour_angle(hour: int) -> float: """ Calculate the hour angle for a given hour of the day. The hour angle is the angular displacement of the Sun east or west of the local meridian due to Earth's rotation. Parameters ---------- hour : int The hour of the day (0-23). Returns ------- float The hour angle in radians. """ return radians(15 * (hour - 12))
[docs] def solar_elevation(doy: int, hour: int, latitude: float = LATITUDE) -> float: """ Calculate the solar elevation angle. The solar elevation angle is the angle of the Sun above the horizon. Parameters ---------- doy : int The day of the year. hour : int The hour of the day. latitude : float, optional The latitude in degrees. Defaults to LATITUDE. Returns ------- float The solar elevation angle in radians. """ decl = solar_declination(doy) lat_rad = radians(latitude) ha = hour_angle(hour) sin_elev = sin(lat_rad) * sin(decl) + cos(lat_rad) * cos(decl) * cos(ha) return max(0, asin(sin_elev)) # radians
[docs] def clear_sky_radiation(doy: int, hour: int, latitude: float = LATITUDE) -> float: """ Estimate the clear-sky incoming shortwave radiation. This function provides a simplified estimation of the incoming shortwave radiation on a clear day, taking into account the solar constant, Earth-Sun distance, and atmospheric transmissivity. Parameters ---------- doy : int The day of the year. hour : int The hour of the day. latitude : float, optional The latitude in degrees. Defaults to LATITUDE. Returns ------- float The estimated clear-sky radiation in W/m^2. """ elev_rad = solar_elevation(doy, hour, latitude) if elev_rad <= 0: return 0.0 d_r = 1 + 0.033 * cos(2 * pi * doy / 365) # Earth-Sun distance factor cos_zenith = cos(pi / 2 - elev_rad) rs = SOLAR_CONSTANT * d_r * cos_zenith * 0.75 # ~0.75 = clear sky transmissivity return rs
[docs] def longwave_radiation(T_kelvin): """ Estimate the longwave radiation using the Stefan-Boltzmann law. This function calculates the longwave radiation emitted by a surface based on its temperature, using the Stefan-Boltzmann law. Parameters ---------- T_kelvin : float The temperature of the surface in Kelvin. Returns ------- float The estimated longwave radiation in W/m^2. """ return EMISSIVITY * STEFAN_BOLTZMANN * T_kelvin**4
[docs] def estimate_net_radiation_range(doy, hour): """ Estimate the min/max net radiation for a given hour and day of the year. This function calculates a plausible range for net radiation by estimating the components of the radiation budget (shortwave and longwave radiation) under typical conditions. Parameters ---------- doy : int The day of the year. hour : int The hour of the day. Returns ------- tuple[float, float] A tuple containing the minimum and maximum estimated net radiation in W/m^2. """ rs_down = clear_sky_radiation(doy, hour) # Net shortwave rs_net = rs_down * (1 - ALBEDO) # Assume typical diurnal surface temperature range in K Tmin_K = 273.15 + 5 # typical min in early morning Tmax_K = 273.15 + 40 # hot afternoon summer temp # Incoming longwave (simplified as ~cloudless sky) lw_down_min = longwave_radiation(Tmin_K - 5) # colder sky at night lw_down_max = longwave_radiation(Tmax_K - 15) # warmer sky in afternoon # Outgoing longwave from surface lw_up_min = longwave_radiation(Tmin_K) lw_up_max = longwave_radiation(Tmax_K) # Net radiation range Rn_min = rs_net + lw_down_min - lw_up_max Rn_max = rs_net + lw_down_max - lw_up_min return Rn_min, Rn_max
[docs] def add_buffer(min_max: tuple, buffer: float = 100): """ Add a buffer to a min/max tuple, with a hard lower limit. This function expands a range defined by a min/max tuple by adding a buffer. The minimum value is not allowed to go below -200. Parameters ---------- min_max : tuple A tuple containing the minimum and maximum values. buffer : float, optional The buffer to add to the max value and subtract from the min value. Defaults to 100. Returns ------- tuple[float, float] The buffered minimum and maximum values. """ if min_max[0] - buffer <= -200: minv = -200 else: minv = min_max[0] - buffer maxv = min_max[1] + buffer return minv, maxv
if __name__ == "__main__": # Example usage doy = 172 # summer solstice ~June 21 hour = 14 # 2 PM Rn_min, Rn_max = estimate_net_radiation_range(doy, hour) print(f"DOY: {doy}, Hour: {hour}") print(f"Estimated Net Radiation Range: {Rn_min:.1f} W/m² to {Rn_max:.1f} W/m²") def calc_diurnal_range(doy): """ Calculate the diurnal range of net radiation for a given day. This function calculates the estimated minimum and maximum net radiation for each hour of a given day of the year. Parameters ---------- doy : int The day of the year. Returns ------- tuple[list[float], list[float]] A tuple of two lists, where the first list contains the minimum net radiation for each hour and the second list contains the maximum. """ hours = np.arange(0, 24) rn_min_list, rn_max_list = zip( *[estimate_net_radiation_range(doy, h) for h in hours] ) return rn_min_list, rn_max_list # ----------------------------- Utilities ------------------------------------ # def _infer_freq_minutes(dt: pd.DatetimeIndex) -> int: """ Infer the sampling interval in whole minutes from a DatetimeIndex. This function calculates the median difference between consecutive timestamps in a DatetimeIndex and returns the result in whole minutes. Parameters ---------- dt : pd.DatetimeIndex The DatetimeIndex from which to infer the frequency. Returns ------- int The inferred sampling interval in minutes. Raises ------ ValueError If the index contains fewer than two timestamps or if the inferred interval is not positive. """ # Ensure we are working with nanoseconds for consistent calculation # pandas 3.0+ may use microseconds by default dt_ns = pd.to_datetime(dt).values.astype("datetime64[ns]").view("i8") diffs = np.diff(dt_ns) if len(diffs) == 0: raise ValueError("Need at least two timestamps to infer frequency.") med = np.median(diffs) minutes = int(round(med / 1e9 / 60.0)) if minutes <= 0: raise ValueError("Non-positive inferred sampling interval.") return minutes def _to_local_standard_time( dt: pd.DatetimeIndex, std_utc_offset_hours: float, assume_naive_is_local: bool = False, ) -> pd.DatetimeIndex: """ Convert timestamps to local standard time (no DST) using a fixed UTC offset. This function takes a DatetimeIndex and converts it to a fixed-offset timezone, effectively removing any daylight saving time adjustments. Parameters ---------- dt : pd.DatetimeIndex Input timestamps, which can be timezone-naive or timezone-aware. std_utc_offset_hours : float The UTC offset for the local standard time (e.g., -7 for MST). assume_naive_is_local : bool, optional If True and the input is timezone-naive, it is assumed to be in local standard time. If False, it is assumed to be in UTC. Defaults to False. Returns ------- pd.DatetimeIndex A DatetimeIndex localized to a fixed-offset timezone (no DST). """ offset = pd.Timedelta(hours=std_utc_offset_hours) fixed_tz = pytz.FixedOffset(int(offset.total_seconds() // 60)) if dt.tz is None: if assume_naive_is_local: return dt.tz_localize(fixed_tz) else: # Assume input is UTC, convert to fixed offset return dt.tz_localize("UTC").tz_convert(fixed_tz) else: return dt.tz_convert(fixed_tz) # ------------------------ Solar / SW_IN_POT model --------------------------- #
[docs] def sw_in_pot_noaa( dt_local_standard: pd.DatetimeIndex, lat_deg: float | None = 40.7607, lon_deg: float | None = -111.8939, std_utc_offset_hours: int | None = -7, ) -> pd.Series: """ Compute top-of-atmosphere shortwave irradiance using NOAA's approximations. This function calculates the potential incoming shortwave radiation at the top of the atmosphere on a horizontal surface, using solar position approximations from NOAA. Parameters ---------- dt_local_standard : pd.DatetimeIndex A DatetimeIndex in local standard time (fixed UTC offset, no DST). lat_deg : float Latitude in degrees (positive for Northern Hemisphere). lon_deg : float Longitude in degrees (positive for Eastern Hemisphere). std_utc_offset_hours : float The local standard time UTC offset (e.g., -7 for MST). Returns ------- pd.Series A Series of potential incoming shortwave radiation (W/m^2), indexed by the input DatetimeIndex. Raises ------ ValueError If the input DatetimeIndex is not timezone-aware. """ if dt_local_standard.tz is None: raise ValueError("dt_local_standard must be tz-aware fixed-offset (no DST).") # Constants G_SC = 1361.0 # W m^-2, solar constant # Vectorized time components ts = dt_local_standard.tz_convert(None) # drop tz for numeric ops doy = ts.dayofyear.values hour = ts.hour.values minute = ts.minute.values second = ts.second.values # Fractional year (radians), NOAA formulation gamma = 2.0 * np.pi * (doy - 1 + (hour - 12) / 24.0) / 365.0 # Equation of time (minutes) and solar declination (radians) eqtime = 229.18 * ( 0.000075 + 0.001868 * np.cos(gamma) - 0.032077 * np.sin(gamma) - 0.014615 * np.cos(2 * gamma) - 0.040849 * np.sin(2 * gamma) ) decl = ( 0.006918 - 0.399912 * np.cos(gamma) + 0.070257 * np.sin(gamma) - 0.006758 * np.cos(2 * gamma) + 0.000907 * np.sin(2 * gamma) - 0.002697 * np.cos(3 * gamma) + 0.00148 * np.sin(3 * gamma) ) # Time offset (minutes) if lon_deg is None or std_utc_offset_hours is None: raise ValueError("Must provide lon_deg and std_utc_offset_hours.") else: time_offset = eqtime + 4.0 * lon_deg - 60.0 * std_utc_offset_hours # True solar time (minutes) tst = hour * 60.0 + minute + second / 60.0 + time_offset # Hour angle (degrees -> radians) ha_deg = (tst / 4.0) - 180.0 ha = np.deg2rad(ha_deg) if lat_deg is None: raise ValueError("Must provide lat_deg.") else: lat = np.deg2rad(lat_deg) # Solar zenith cosine cos_zen = np.sin(lat) * np.sin(decl) + np.cos(lat) * np.cos(decl) * np.cos(ha) # Earth-Sun distance correction factor (Spencer) e0 = ( 1.00011 + 0.034221 * np.cos(gamma) + 0.00128 * np.sin(gamma) + 0.000719 * np.cos(2 * gamma) + 0.000077 * np.sin(2 * gamma) ) i0 = G_SC * e0 * np.maximum(0.0, cos_zen) # W m^-2, 0 at night return pd.Series(i0, index=dt_local_standard)
# ----------------- Maximum diurnal composite in 15-day windows -------------- #
[docs] @dataclass class WindowComposite: """ A container for the results of a 15-day window analysis. This dataclass holds the composite data and statistical results for a single 15-day analysis window. Attributes ---------- year : int The year of the analysis window. window_id : int The 15-day window number within the year. step_minutes : int The sampling interval in minutes. steps_per_day : int The number of time steps per day. comp_pot : np.ndarray The maximum diurnal composite for potential incoming shortwave radiation. comp_sw : np.ndarray, optional The maximum diurnal composite for observed shortwave radiation. comp_ppfd : np.ndarray, optional The maximum diurnal composite for observed PPFD. pct_exceed_sw : float, optional The percentage of time the observed SW composite exceeds the potential. pct_exceed_ppfd : float, optional The percentage of time the observed PPFD composite exceeds the potential. lag_sw : int, optional The lag (in time steps) at which the SW cross-correlation is maximized. corr_sw : float, optional The maximum cross-correlation for shortwave radiation. lag_ppfd : int, optional The lag (in time steps) at which the PPFD cross-correlation is maximized. corr_ppfd : float, optional The maximum cross-correlation for PPFD. """ year: int window_id: int step_minutes: int steps_per_day: int comp_pot: np.ndarray comp_sw: Optional[np.ndarray] comp_ppfd: Optional[np.ndarray] pct_exceed_sw: Optional[float] pct_exceed_ppfd: Optional[float] lag_sw: Optional[int] corr_sw: Optional[float] lag_ppfd: Optional[int] corr_ppfd: Optional[float]
def _max_diurnal_composite( s: pd.Series, step_minutes: int, ) -> np.ndarray: """ Build a 'maximum diurnal composite' over a 15-day window. This function calculates the maximum value for each time slot of the day over a 15-day period, creating a composite diurnal cycle. Parameters ---------- s : pd.Series A Series of values with a DatetimeIndex in local standard time. step_minutes : int The sampling interval in minutes. Returns ------- np.ndarray An array of length `steps_per_day` containing the maximum value for each time slot. """ steps_per_day = int(round(1440 / step_minutes)) # Slot index within day [0 .. steps_per_day-1] slot = (s.index.hour * 60 + s.index.minute) // step_minutes # type: ignore # Group by day + slot, take max by slot: df = pd.DataFrame({"slot": slot, "val": s.values}) grp = df.groupby("slot")["val"].max() comp = np.full(steps_per_day, np.nan, dtype=float) comp[grp.index.values] = grp.values return comp def _xcorr_best_lag( a: np.ndarray, b: np.ndarray, max_lag: int ) -> Tuple[Optional[int], Optional[float]]: """ Find the lag of maximum Pearson correlation between two arrays. This function calculates the Pearson correlation between two arrays at different lags (from -max_lag to +max_lag) and returns the lag that results in the highest correlation. Parameters ---------- a : np.ndarray The first array. b : np.ndarray The second array. max_lag : int The maximum lag (in steps) to check in each direction. Returns ------- tuple[Optional[int], Optional[float]] A tuple containing the best lag and the corresponding correlation, or (None, None) if there is insufficient data. """ best_lag, best_corr = None, None for lag in range(-max_lag, max_lag + 1): if lag < 0: x = a[-lag:] y = b[: len(b) + lag] elif lag > 0: x = a[: len(a) - lag] y = b[lag:] else: x = a y = b mask = np.isfinite(x) & np.isfinite(y) if mask.sum() >= 6: # need enough points r = np.corrcoef(x[mask], y[mask])[0, 1] if (best_corr is None) or (r > best_corr): best_corr = float(r) best_lag = int(lag) return best_lag, best_corr def _fifteen_day_window_id(doy: int) -> int: """ Calculate the 1-based 15-day window number for a given day of the year. Parameters ---------- doy : int The day of the year (1-365 or 1-366). Returns ------- int The 15-day window number (1-25). """ return 1 + (doy - 1) // 15
[docs] def analyze_timestamp_alignment( df: pd.DataFrame, *, lat: float | None = LATITUDE, lon: float | None = LONGITUDE, std_utc_offset_hours: int = -7, time_from: str = "CENTER", start_col: str = "TIMESTAMP_START", end_col: str = "TIMESTAMP_END", time_col: Optional[str] = None, sw_col: str = "SW_IN", ppfd_col: str = "PPFD_IN", assume_naive_is_local: bool = False, max_lag_steps: int = 6, ) -> Tuple[pd.DataFrame, Dict[Tuple[int, int], WindowComposite]]: """ Perform the main timestamp alignment analysis. This function analyzes the timestamp alignment of radiation data by comparing observed values against potential top-of-atmosphere radiation. It computes composites, cross-correlations, and other statistics for 15-day windows. Parameters ---------- df : pd.DataFrame The input DataFrame containing timestamp and radiation data. lat : float Latitude of the site. lon : float Longitude of the site. std_utc_offset_hours : float The UTC offset for local standard time. time_from : str, optional How to interpret the timestamp ('CENTER', 'START', 'END'). Defaults to 'CENTER'. start_col : str, optional Name of the start timestamp column. Defaults to 'TIMESTAMP_START'. end_col : str, optional Name of the end timestamp column. Defaults to 'TIMESTAMP_END'. time_col : str, optional Name of a single datetime column. Defaults to None. sw_col : str, optional Name of the shortwave radiation column. Defaults to 'SW_IN'. ppfd_col : str, optional Name of the PPFD column. Defaults to 'PPFD_IN'. assume_naive_is_local : bool, optional Whether to assume naive timestamps are in local time. Defaults to False. max_lag_steps : int, optional Maximum lag for cross-correlation. Defaults to 6. Returns ------- tuple[pd.DataFrame, dict] A tuple containing a summary DataFrame of the analysis and a dictionary of WindowComposite objects for each window. """ # 1) Build DateTimeIndex idx = None if time_col and time_col in df.columns: idx = pd.to_datetime(df[time_col], errors="coerce") else: if (start_col in df.columns) and (end_col in df.columns): start = pd.to_datetime(df[start_col].astype(str), errors="coerce") end = pd.to_datetime(df[end_col].astype(str), errors="coerce") if time_from.upper() == "CENTER": dt = start + (end - start) / 2 elif time_from.upper() == "START": dt = start elif time_from.upper() == "END": dt = end else: raise ValueError("time_from must be one of {'CENTER','START','END'}") idx = dt else: raise ValueError("Provide a time column or TIMESTAMP_START/END columns.") # Attach index & keep only needed columns work = df.copy() work.index = pd.DatetimeIndex(idx) cols = [c for c in [sw_col, ppfd_col] if c in work.columns] work = work[cols].sort_index() # 2) Convert to local standard time (fixed offset) dt_lst = _to_local_standard_time( work.index, # type: ignore std_utc_offset_hours=std_utc_offset_hours, assume_naive_is_local=assume_naive_is_local, ) work.index = dt_lst # 3) Infer sampling step and compute SW_IN_POT step_minutes = _infer_freq_minutes(work.index) pot = sw_in_pot_noaa( work.index, lat_deg=lat, lon_deg=lon, std_utc_offset_hours=std_utc_offset_hours ) # 4) Organize by year and 15-day window out_rows = [] composites: Dict[Tuple[int, int], WindowComposite] = {} steps_per_day = int(round(1440 / step_minutes)) df_all = pd.concat( {"SW_IN": work.get(sw_col), "PPFD_IN": work.get(ppfd_col), "SW_IN_POT": pot}, axis=1, ) # Year/window grouping years = work.index.year windows = _fifteen_day_window_id(work.index.dayofyear) # type: ignore gkey = pd.MultiIndex.from_arrays([years, windows], names=["year", "window_id"]) # type: ignore for (year, window_id), sub in df_all.groupby(gkey): # Build composites comp_pot = _max_diurnal_composite( sub["SW_IN_POT"].dropna(), step_minutes=step_minutes ) comp_sw = None if "SW_IN" in sub and sub["SW_IN"].notna().any(): comp_sw = _max_diurnal_composite( sub["SW_IN"].dropna(), step_minutes=step_minutes ) comp_ppfd = None if "PPFD_IN" in sub and sub["PPFD_IN"].notna().any(): comp_ppfd = _max_diurnal_composite( sub["PPFD_IN"].dropna(), step_minutes=step_minutes ) # Percent exceedance (observed > potential) using composites pct_exceed_sw = None lag_sw = None corr_sw = None if comp_sw is not None: mask = np.isfinite(comp_sw) & np.isfinite(comp_pot) if mask.any(): pct_exceed_sw = 100.0 * np.mean(comp_sw[mask] > comp_pot[mask]) lag_sw, corr_sw = _xcorr_best_lag(comp_pot, comp_sw, max_lag=max_lag_steps) pct_exceed_ppfd = None lag_ppfd = None corr_ppfd = None if comp_ppfd is not None: mask = np.isfinite(comp_ppfd) & np.isfinite(comp_pot) if mask.any(): pct_exceed_ppfd = 100.0 * np.mean(comp_ppfd[mask] > comp_pot[mask]) lag_ppfd, corr_ppfd = _xcorr_best_lag( comp_pot, comp_ppfd, max_lag=max_lag_steps ) composites[(year, window_id)] = WindowComposite( year=year, window_id=window_id, step_minutes=step_minutes, steps_per_day=steps_per_day, comp_pot=comp_pot, comp_sw=comp_sw, comp_ppfd=comp_ppfd, pct_exceed_sw=pct_exceed_sw, # type: ignore pct_exceed_ppfd=pct_exceed_ppfd, # type: ignore lag_sw=lag_sw, corr_sw=corr_sw, lag_ppfd=lag_ppfd, corr_ppfd=corr_ppfd, ) out_rows.append( { "year": year, "window_id": window_id, "step_minutes": step_minutes, "pct_exceed_sw": pct_exceed_sw, "pct_exceed_ppfd": pct_exceed_ppfd, "lag_steps_sw": lag_sw, "corr_sw": corr_sw, "lag_steps_ppfd": lag_ppfd, "corr_ppfd": corr_ppfd, } ) summary_df = ( pd.DataFrame(out_rows).sort_values(["year", "window_id"]).reset_index(drop=True) ) return summary_df, composites
# ---------------------------- Heuristic flags -------------------------------- #
[docs] def flag_issues(summary: pd.DataFrame) -> Dict[str, str]: """ Apply simple heuristics to flag likely timestamp and data quality issues. This function uses a set of heuristics to identify potential issues such as timezone mismatches, DST usage, and sensor problems based on the summary of the timestamp alignment analysis. Parameters ---------- summary : pd.DataFrame A DataFrame containing the summary of the timestamp alignment analysis. Returns ------- dict[str, str] A dictionary of flagged issues, where keys are issue types and values are descriptive messages. """ notes = {} # Overall stats across windows sw_lags = summary["lag_steps_sw"].dropna().astype(int) ppfd_lags = summary["lag_steps_ppfd"].dropna().astype(int) sw_ex = summary["pct_exceed_sw"].dropna() ppfd_ex = summary["pct_exceed_ppfd"].dropna() def majority_abs_ge(series: pd.Series, thr: int, frac=0.6) -> bool: return (series.abs() >= thr).mean() >= frac if len(series) else False # Time zone mismatch (≈ 1h) if many windows at |lag| >= 2 steps (30-min data) if majority_abs_ge(sw_lags, 2) or majority_abs_ge(ppfd_lags, 2): notes["timezone_or_dst"] = ( "Many 15-day windows show ~1-hour offset (|lag|≥2 steps for 30-min data): " "possible time-zone mismatch or DST usage." ) # START vs END mis-specification (~30 min) if majority_abs_ge(sw_lags, 1) and (sw_lags.abs() == 1).mean() > 0.5: notes["start_vs_end"] = ( "Cross-correlation peaks at ±1 step (~30 min) in most windows: " "possible START vs END timestamp mis-specification." ) # DST heuristic: look for lag jumping by ~2 steps around typical DST months if len(sw_lags): df = summary.dropna(subset=["lag_steps_sw"]).copy() # Approximate month from window center df["month_est"] = ( ((df["window_id"] - 0.5) * 15 / 30.5 + 1).clip(1, 12).round().astype(int) ) spring = df[df["month_est"].between(3, 4)]["lag_steps_sw"].astype(int) fall = df[df["month_est"].between(10, 11)]["lag_steps_sw"].astype(int) if len(spring) and len(fall) and (spring.mean() - fall.mean()) >= 2: notes["dst"] = ( "Lag changes by ~2 steps between spring and fall windows: " "data may include DST timestamps." ) # Stream desynchronization: SW vs PPFD lags disagree if len(sw_lags) and len(ppfd_lags): if (sw_lags - ppfd_lags).abs().median() >= 1: notes["desync"] = ( "SW_IN and PPFD_IN peak at different lags: streams may be desynchronized." ) # Radiation sensor anomalies: frequent exceedance of potential if (len(sw_ex) and (sw_ex > 5).mean() > 0.5) or ( len(ppfd_ex) and (ppfd_ex > 5).mean() > 0.5 ): notes["radiation_qc"] = ( "Observed radiation frequently exceeds potential (early/late-day exceedance): " "check leveling, shading, and calibration; also verify timestamp alignment." ) return notes
# ------------------------------- Plotting ------------------------------------ #
[docs] def plot_summary( summary: pd.DataFrame, composites: Dict[Tuple[int, int], WindowComposite], which_year: Optional[int] = None, outfile_prefix: Optional[str] = None, ): """ Create and display summary plots of the timestamp alignment analysis. This function generates a set of plots to visualize the results of the timestamp alignment analysis, including percent exceedance, lag times, and a composite overlay for the "worst" window. Parameters ---------- summary : pd.DataFrame A DataFrame containing the summary of the analysis. composites : dict A dictionary of WindowComposite objects for each analysis window. which_year : int, optional The year to plot. If None, all years are plotted. Defaults to None. outfile_prefix : str, optional A prefix for the output plot filenames. If provided, plots are saved to files. Defaults to None. Returns ------- dict A dictionary of matplotlib Figure handles for the generated plots. """ figs = {} data = summary.copy() if which_year is not None: data = data[data["year"] == which_year] if data.empty: raise ValueError(f"No data found for year={which_year}") # Panel A: exceedance bars figA, axA = plt.subplots(figsize=(10, 4)) x = np.arange(len(data)) axA.bar(x - 0.2, data["pct_exceed_sw"].values, width=0.4, label="% exceed SW_IN") # type: ignore if "pct_exceed_ppfd" in data: axA.bar( x + 0.2, data["pct_exceed_ppfd"].values, width=0.4, label="% exceed PPFD_IN" # type: ignore ) axA.set_xticks(x) axA.set_xticklabels([f"W{w}" for w in data["window_id"]], rotation=0) axA.set_ylabel("% of composite steps with obs > POT") axA.set_title("Percent exceedance by 15-day window") axA.legend() figs["exceedance"] = figA if outfile_prefix: figA.savefig(f"{outfile_prefix}_exceedance.png", dpi=150, bbox_inches="tight") # Panel B: lag bars figB, axB = plt.subplots(figsize=(10, 4)) axB.bar(x - 0.2, data["lag_steps_sw"].values, width=0.4, label="Lag steps (SW_IN)") # type: ignore if "lag_steps_ppfd" in data: axB.bar( x + 0.2, data["lag_steps_ppfd"].values, # type: ignore width=0.4, label="Lag steps (PPFD_IN)", ) axB.axhline(0, linewidth=1) axB.set_xticks(x) axB.set_xticklabels([f"W{w}" for w in data["window_id"]]) axB.set_ylabel("Lag (steps); for 30-min data, 2 steps ≈ 1 hour") axB.set_title("Best cross-correlation lag by 15-day window") axB.legend() figs["lags"] = figB if outfile_prefix: figB.savefig(f"{outfile_prefix}_lags.png", dpi=150, bbox_inches="tight") # Panel C: worst window overlay # Choose the window with max absolute lag (preferring SW), else max exceedance cand = data.copy() cand["_score"] = cand["lag_steps_sw"].abs().fillna(0) if cand["_score"].max() == 0: cand["_score"] = cand["pct_exceed_sw"].fillna(0) yr, w = int(cand.iloc[cand["_score"].idxmax()]["year"]), int( # type: ignore cand.iloc[cand["_score"].idxmax()]["window_id"] # type: ignore ) wc = composites[(yr, w)] t = np.arange(wc.steps_per_day) * wc.step_minutes / 60.0 # hours since midnight figC, axC = plt.subplots(figsize=(10, 4)) axC.plot(t, wc.comp_pot, label="SW_IN_POT (TOA)") if wc.comp_sw is not None: axC.plot(t, wc.comp_sw, label="SW_IN (max diurnal composite)") if wc.comp_ppfd is not None: axC.plot(t, wc.comp_ppfd, label="PPFD_IN (max diurnal composite)") axC.set_xlabel("Local standard time (hours)") axC.set_ylabel("Irradiance / PPFD (units as provided)") axC.set_title(f"Window W{w} in {yr}: composite overlay (worst window)") axC.legend() figs["overlay"] = figC if outfile_prefix: figC.savefig( f"{outfile_prefix}_overlay_W{w}_{yr}.png", dpi=150, bbox_inches="tight" ) plt.show() return figs