Source code for micromet.report.gap_summary

import pandas as pd
from pandas.tseries.frequencies import to_offset


[docs] def summarize_gaps( df: pd.DataFrame, station_level: str = "STATIONID", time_level: str = "DATETIME_END", expected_freq: str = "30min", columns: list | None = None, ) -> pd.DataFrame: """ Summarize runs of missing data (NaNs) per column for each station in a MultiIndex DataFrame indexed by (station, datetime). Parameters ---------- df : pd.DataFrame Input DataFrame with a MultiIndex (station_level, time_level). station_level : str, default "STATIONID" Name of the station level in the index. time_level : str, default "DATETIME_START" Name of the datetime level in the index. expected_freq : str, default "30min" The expected sampling frequency. Used to build a complete timeline per station so that missing timestamps become explicit NaNs. columns : list[str] | None Subset of columns to analyze. Defaults to all columns. Returns ------- pd.DataFrame Columns: - STATIONID - COLUMN - GAP_START - GAP_END - N_STEPS_MISSING - HOURS_MISSING - GAP_KIND ("MissingTimestamp", "NaN", or "Mixed") """ if not isinstance(df.index, pd.MultiIndex): raise TypeError("df must have a MultiIndex (station, datetime).") if station_level not in df.index.names or time_level not in df.index.names: raise KeyError( "MultiIndex must contain the specified station_level and time_level." ) # Work on a sorted copy df = df.copy() df = df.sort_index() if columns is None: columns = list(df.columns) # Frequency as a Timedelta (e.g., 30 minutes) freq_td = pd.Timedelta(to_offset(expected_freq)) hours_per_step = freq_td / pd.Timedelta(hours=1) records = [] # Iterate station by station stations = df.index.get_level_values(station_level).unique() for stn in stations: # Slice one station: index becomes time_level dfx = df.xs(stn, level=station_level) # Ensure the time index is datetime and sorted time_idx = pd.to_datetime(dfx.index) dfx = dfx.set_index(time_idx).sort_index() original_idx = dfx.index # Build a complete timeline so *missing timestamps* are turned into NaNs full_idx = pd.date_range( start=original_idx.min(), end=original_idx.max(), freq=expected_freq ) # Mask telling which timestamps were missing in the original index missing_row_mask = pd.Series( ~pd.Index(full_idx).isin(original_idx), index=full_idx ) # Reindex to full timeline dfr = dfx.reindex(full_idx) for col in columns: col_na = dfr[col].isna() if not col_na.any(): continue # no gaps for this column # Label contiguous runs (True/False) and keep only True-runs (gaps) run_id = (col_na != col_na.shift(1)).cumsum() for rid, run_mask in col_na.groupby(run_id): if not run_mask.iloc[0]: continue # this run is of non-NaNs run_times = run_mask.index gap_start = run_times[0] gap_end = run_times[-1] n_steps = int(run_mask.sum()) # Determine the kind of gap: missing timestamps vs NaNs vs mixed row_missing_in_run = missing_row_mask.loc[run_times] if row_missing_in_run.all(): kind = "MissingTimestamp" elif not row_missing_in_run.any(): kind = "NaN" else: kind = "Mixed" records.append( { "STATIONID": stn, "COLUMN": col, "GAP_START": gap_start, "GAP_END": gap_end, "N_STEPS_MISSING": n_steps, "HOURS_MISSING": n_steps * hours_per_step, "GAP_KIND": kind, } ) out = pd.DataFrame.from_records(records) if not out.empty: out = out.sort_values(["STATIONID", "COLUMN", "GAP_START"]).reset_index( drop=True ) else: # Ensure expected columns even when no gaps out = pd.DataFrame( columns=[ "STATIONID", "COLUMN", "GAP_START", "GAP_END", "N_STEPS_MISSING", "HOURS_MISSING", "GAP_KIND", ] ) return out
[docs] def compare_gap_summaries( gaps_a: pd.DataFrame, gaps_b: pd.DataFrame, expected_freq: str = "30min", min_steps: int = 1, ) -> pd.DataFrame: """ Compare two gap-summary DataFrames (from `summarize_gaps`) and highlight where one dataset has coverage that could fill the other's gaps. Parameters ---------- gaps_a, gaps_b : pd.DataFrame DataFrames returned by `summarize_gaps`. Must include the columns: ['STATIONID','COLUMN','GAP_START','GAP_END','N_STEPS_MISSING','HOURS_MISSING','GAP_KIND']. expected_freq : str, default "30min" Sampling frequency. Used to compute discrete step counts and to treat intervals on the expected time grid. min_steps : int, default 1 Only report fillable segments with at least this many steps. Returns ------- pd.DataFrame One row per *fillable segment*. Columns: - TARGET_DATASET ("A" or "B") - SOURCE_DATASET ("B" or "A") - STATIONID - COLUMN - TARGET_GAP_START - TARGET_GAP_END - FILLABLE_START - FILLABLE_END - N_STEPS_FILLABLE - HOURS_FILLABLE - TARGET_N_STEPS_MISSING - COVERAGE_RATIO (steps_fillable / TARGET_N_STEPS_MISSING) - TARGET_GAP_KIND """ req = {"STATIONID", "COLUMN", "GAP_START", "GAP_END", "N_STEPS_MISSING"} for name, g in [("gaps_a", gaps_a), ("gaps_b", gaps_b)]: missing = req - set(g.columns) if missing: raise KeyError(f"{name} missing required columns: {missing}") # Normalize dtypes and sort def _prep(g): g = g.copy() g["GAP_START"] = pd.to_datetime(g["GAP_START"]) g["GAP_END"] = pd.to_datetime(g["GAP_END"]) if "GAP_KIND" not in g.columns: g["GAP_KIND"] = "Unknown" return g.sort_values( ["STATIONID", "COLUMN", "GAP_START", "GAP_END"] ).reset_index(drop=True) gaps_a = _prep(gaps_a) gaps_b = _prep(gaps_b) freq_td = pd.Timedelta(to_offset(expected_freq)) hours_per_step = freq_td / pd.Timedelta(hours=1) # Build a quick lookup: for each (station, column), list of (start, end) gaps def _build_lookup(g): d = {} for (stn, col), sub in g.groupby(["STATIONID", "COLUMN"], sort=False): d[(stn, col)] = list(zip(sub["GAP_START"], sub["GAP_END"])) return d gapsB_lookup = _build_lookup(gaps_b) gapsA_lookup = _build_lookup(gaps_a) def _steps_inclusive(s, e): # number of discrete samples on the regular grid from s..e inclusive return int(((e - s) // freq_td) + 1) def _subtract_interval(base, subtracts): """Given a base [a0,a1] (inclusive, on grid) and a list of subtract intervals (inclusive), return list of remaining inclusive intervals on the same grid.""" a0, a1 = base if a0 > a1: return [] # Clip subtracts to base cl = [] for s, e in subtracts: s1 = max(s, a0) e1 = min(e, a1) if s1 <= e1: cl.append((s1, e1)) if not cl: return [(a0, a1)] cl.sort(key=lambda x: x[0]) segs = [] cur = a0 for s, e in cl: # segment before s (subtract is inclusive) before_end = s - freq_td if cur <= before_end: segs.append((cur, before_end)) # skip the subtracted run cur = e + freq_td if cur > a1: break if cur <= a1: segs.append((cur, a1)) return segs def _direction_fill(target_gaps, source_lookup, target_label, source_label): """Compute fillable segments where `source` can fill `target`.""" out_rows = [] for _, r in target_gaps.iterrows(): key = (r["STATIONID"], r["COLUMN"]) base = (r["GAP_START"], r["GAP_END"]) subtracts = source_lookup.get(key, []) fill_segments = _subtract_interval(base, subtracts) for fs, fe in fill_segments: steps = _steps_inclusive(fs, fe) if steps < min_steps: continue out_rows.append( { "TARGET_DATASET": target_label, "SOURCE_DATASET": source_label, "STATIONID": r["STATIONID"], "COLUMN": r["COLUMN"], "TARGET_GAP_START": r["GAP_START"], "TARGET_GAP_END": r["GAP_END"], "FILLABLE_START": fs, "FILLABLE_END": fe, "N_STEPS_FILLABLE": steps, "HOURS_FILLABLE": steps * hours_per_step, "TARGET_N_STEPS_MISSING": int(r["N_STEPS_MISSING"]), "COVERAGE_RATIO": steps / int(r["N_STEPS_MISSING"]), "TARGET_GAP_KIND": r.get("GAP_KIND", "Unknown"), } ) if not out_rows: return pd.DataFrame( columns=[ "TARGET_DATASET", "SOURCE_DATASET", "STATIONID", "COLUMN", "TARGET_GAP_START", "TARGET_GAP_END", "FILLABLE_START", "FILLABLE_END", "N_STEPS_FILLABLE", "HOURS_FILLABLE", "TARGET_N_STEPS_MISSING", "COVERAGE_RATIO", "TARGET_GAP_KIND", ] ) return ( pd.DataFrame(out_rows) .sort_values(["STATIONID", "COLUMN", "TARGET_GAP_START", "FILLABLE_START"]) .reset_index(drop=True) ) # B can fill A (subtract A's gaps by B's gaps) fill_B_to_A = _direction_fill( gaps_a, gapsB_lookup, target_label="A", source_label="B" ) # A can fill B fill_A_to_B = _direction_fill( gaps_b, gapsA_lookup, target_label="B", source_label="A" ) # Combine combined = pd.concat([fill_B_to_A, fill_A_to_B], ignore_index=True) return combined.sort_values( ["STATIONID", "COLUMN", "TARGET_DATASET", "TARGET_GAP_START", "FILLABLE_START"] ).reset_index(drop=True)