"""
Data validation and quality control functions for the reformatter pipeline.
This module handles applying physical limits to data values and detecting
stuck or anomalous sensor readings.
"""
import math
from typing import Iterable, Optional, Tuple, Union
import numpy as np
import pandas as pd
import micromet.qaqc.variable_limits as variable_limits
[docs]
def apply_physical_limits(
df: pd.DataFrame,
how: str = "mask",
inplace: bool = False,
prefer_longest_key: bool = True,
return_mask: bool = False,
round_et: bool=True
) -> tuple[pd.DataFrame, pd.DataFrame | None, pd.DataFrame]:
"""
Apply physical Min/Max bounds to columns in a DataFrame.
This function applies physical limits (minimum and maximum) to the columns
of a DataFrame. It can either mask out-of-bounds values with NaN or clip
them to the limits.
Parameters
----------
df : pd.DataFrame
The input DataFrame to which the limits will be applied.
how : str, optional
The method to use for applying limits: 'mask' (default) or 'clip'.
inplace : bool, optional
If True, modify the DataFrame in place. Defaults to False.
prefer_longest_key : bool, optional
If True, prefer longer matching keys from the limits dictionary.
Defaults to True.
return_mask : bool, optional
If True, return a boolean mask of the values that were flagged.
Defaults to False.
round_et : bool, optoinal
If True, ET values below 0 will be rounded to 1 digit before applying variable limits.
Defaults to False
Returns
-------
tuple[pd.DataFrame, pd.DataFrame | None, pd.DataFrame]
A tuple containing:
- The DataFrame with physical limits applied.
- A boolean mask of flagged values (if `return_mask` is True).
- A report summarizing the number of flagged values for each column.
"""
if how not in {"mask", "clip"}:
raise ValueError("how must be 'mask' or 'clip'")
limits_dict = variable_limits.limits
out = df if inplace else df.copy()
no_limits = ['CO2_DENSITY_SIGMA_1_1_1', 'FC_SAMPLES_1_1_1',
'H_SAMPLES_1_1_1','LE_SAMPLES_1_1_1', 'RECORD',
'TAU_QC_1_1_1', 'FC_QC_1_1_1','LE_QC_1_1_1' ]
col_list = [i for i in out.columns if i not in no_limits]
keys = list(limits_dict.keys())
if prefer_longest_key:
keys.sort(key=len, reverse=True)
round_cols = ['ET_1_1_1', 'ET_1_1_2']
if round_et:
for col in round_cols:
if col in out.columns:
mask = df[col] < 0
out.loc[mask, col] = out.loc[mask, col].round(1)
col_map = {}
for key in keys:
matching_cols = [c for c in col_list if str(c).startswith(key)]
if not matching_cols:
continue
lim = limits_dict[key]
mn = lim.get("Min", np.nan)
mx = lim.get("Max", np.nan)
for col in matching_cols:
if col not in col_map or (
prefer_longest_key and len(key) > len(col_map[col]["key"])
):
col_map[col] = {"key": key, "Min": mn, "Max": mx}
mask_df = pd.DataFrame(False, index=out.index, columns=out.columns)
records = []
NA_PLACEHOLDER = -9999
PLACEHOLDER2 = -999900
for col, info in col_map.items():
key = info["key"]
mn = info["Min"]
mx = info["Max"]
ser = pd.to_numeric(out[col], errors="coerce")
is_na_placeholder = ser == NA_PLACEHOLDER
ser = ser.mask(is_na_placeholder, np.nan)
is_na_placeholder2 = ser == PLACEHOLDER2
ser = ser.mask(is_na_placeholder2, np.nan)
lower_ok = (
ser >= mn
if not (pd.isna(mn) or (isinstance(mn, float) and math.isnan(mn)))
else pd.Series(True, index=ser.index)
)
upper_ok = (
ser <= mx
if not (pd.isna(mx) or (isinstance(mx, float) and math.isnan(mx)))
else pd.Series(True, index=ser.index)
)
ok = lower_ok & upper_ok
oor = ~ok
n_below = int((~lower_ok & ser.notna()).sum())
n_above = int((~upper_ok & ser.notna()).sum())
n_oor = int((oor & ser.notna()).sum())
if how == "mask":
ser_out = ser.where(ok)
else:
ser_out = ser
if not pd.isna(mn):
ser_out = ser_out.clip(lower=mn)
if not pd.isna(mx):
ser_out = ser_out.clip(upper=mx)
out[col] = ser_out.astype(float) if ser_out.isna().any() else ser_out
mask_df[col] = oor
records.append(
{
"column": col,
"matched_key": key,
"min": mn,
"max": mx,
"n_below": n_below,
"n_above": n_above,
"n_flagged": n_oor,
"pct_flagged": (n_oor / ser.notna().sum() * 100.0) if ser.notna().sum() else 0.0,
}
)
report = pd.DataFrame.from_records(records).sort_values(
["n_flagged", "column"], ascending=[False, True]
)
return (out, (mask_df if return_mask else None), report)
[docs]
def mask_stuck_values(
df: pd.DataFrame,
threshold: Union[int, str, pd.Timedelta],
columns: Optional[Iterable[str]] = None,
tolerance: Optional[float] = None,
mask_value=np.nan,
return_mask: bool = False,
) -> Union[
Tuple[pd.DataFrame, pd.DataFrame], Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
]:
"""
Detect and mask 'stuck' values in a datetime-indexed DataFrame.
A run is considered 'stuck' when the series does not change (within an optional
numeric tolerance) for at least `threshold`. Threshold can be a count of rows
(int) or a time duration (str like '30min' / '2H' or pd.Timedelta).
Parameters
----------
df : pd.DataFrame
DataFrame with a DatetimeIndex (required).
threshold : int | str | pd.Timedelta
Minimum length of a non-changing run to be masked.
- If int: count of consecutive rows (e.g., 5).
- If str or Timedelta: minimum duration (e.g., '30min', pd.Timedelta('2H')).
columns : iterable[str], optional
Subset of columns to check. Defaults to all columns.
tolerance : float, optional
For numeric columns only: treat changes with absolute difference <= tolerance
as 'no change'. If None, exact equality is used.
mask_value : any, default np.nan
Value to assign to masked entries.
return_mask : bool, default False
If True, also return a boolean DataFrame mask where True marks masked cells.
Returns
-------
masked_df : pd.DataFrame
Copy of `df` with stuck runs masked.
report : pd.DataFrame
Tidy report with one row per masked run, columns:
['column','value','start','end','n_rows','duration','threshold_type','threshold_value']
mask_df : pd.DataFrame (optional)
Boolean DataFrame (same shape as `df[columns]`) with True where values were masked.
Notes
-----
- NaNs act as boundaries and are never considered part of a 'stuck' run.
- For irregular time steps and time-based thresholds, the run 'duration'
is computed as end_time - start_time (inclusive of row timestamps).
- Entire runs that meet/exceed the threshold are masked (not just the tail beyond threshold).
"""
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("df must have a DatetimeIndex.")
# Normalize inputs
cols = list(columns) if columns is not None else list(df.columns)
if isinstance(threshold, int):
thresh_type = "count"
thresh_count = threshold
thresh_delta = None
else:
thresh_type = "time"
thresh_delta = pd.to_timedelta(threshold)
thresh_count = None
# Prepare mask and report accumulator
mask_df = pd.DataFrame(False, index=df.index, columns=cols)
report_rows = []
for col in cols:
s = df[col]
# Boundaries: treat NaNs as breaking runs
notna = s.notna()
# Determine "change points"
if pd.api.types.is_numeric_dtype(s) and tolerance is not None:
# consider 'no change' if difference <= tolerance
# mark a change when |diff| > tol
diff = s.diff().abs()
changed = (diff > tolerance) | (~notna) | (~notna.shift(1, fill_value=False)) # type: ignore
else:
# exact equality
# change occurs when current != previous OR either is NaN
prev = s.shift(1)
changed = (s != prev) | (~notna) | (~prev.notna())
# Group by segments of constant value (between change points)
group_id = changed.cumsum()
# Iterate groups that are non-NaN and constant
for gid, idx in s.groupby(group_id).groups.items():
# idx is an index of row positions (labels)
block = s.loc[idx]
if block.isna().any():
# skip blocks with NaN; we don't mask NaNs and they break runs
continue
# For safety, verify constancy within tolerance/equality
if pd.api.types.is_numeric_dtype(block) and tolerance is not None:
is_const = (block.max() - block.min()) <= tolerance
else:
is_const = block.nunique(dropna=False) == 1
if not is_const:
continue # shouldn't happen often, but keep it robust
# Compute run stats
start_time = block.index[0]
end_time = block.index[-1]
n_rows = block.size
duration = end_time - start_time # timedelta
meets = False
if thresh_type == "count":
meets = n_rows >= thresh_count # type: ignore
else:
# For single-row runs, duration == 0; interpret as < threshold
meets = duration >= thresh_delta
if meets:
# Mask the entire run
mask_df.loc[block.index, col] = True
# Stuck value for report (representative)
val = block.iloc[0]
report_rows.append(
{
"column": col,
"value": val,
"start": start_time,
"end": end_time,
"n_rows": n_rows,
"duration": duration,
"threshold_type": thresh_type,
"threshold_value": (
thresh_count if thresh_type == "count" else thresh_delta
),
}
)
# Build outputs
masked_df = df.copy()
for col in cols:
masked_df.loc[mask_df[col], col] = mask_value
report = (
pd.DataFrame(report_rows)
.sort_values(["column", "start"])
.reset_index(drop=True)
)
return (masked_df, report, mask_df) if return_mask else (masked_df, report)
__all__ = [
"apply_physical_limits",
"mask_stuck_values",
]