Source code for micromet.format.transformers.corrections

"""
Data correction functions for the reformatter pipeline.

This module contains variable-specific corrections and data value fixes,
including handling special values, unit conversions, and merging duplicate columns.
"""

import logging
import re

import numpy as np
import pandas as pd


[docs] def apply_fixes(df: pd.DataFrame, logger: logging.Logger) -> pd.DataFrame: """ Apply a set of minor, variable-specific data corrections. This function serves as a pipeline for applying several small, targeted fixes to the data, such as correcting 'TAU' values, converting soil water content to percent, and scaling SSITC test values. Parameters ---------- df : pd.DataFrame The input DataFrame to be fixed. logger : logging.Logger The logger for tracking the fixes being applied. Returns ------- pd.DataFrame The DataFrame with all fixes applied. """ df = tau_fixer(df, threshold=0.5, logger=logger) df = fix_swc_percent(df, logger) df = ssitc_scale(df, logger) return df
[docs] def tau_fixer(df: pd.DataFrame, threshold: float = 0.5, logger: logging.Logger = None) -> pd.DataFrame: """ Replace zero values in the 'TAU' column with NaN and flips sign if needed. Loops through all columns with TAU in the name that don't also have SSITC or QC in the name. This function checks for zero values or negative infinity values in the 'TAU' column and replaces them with NaN. This is often done to handle cases where zero represents a missing or invalid measurement. The function also determines whether to reverse the sign of TAU. If more than the specified threshold of TAU values are positive, it flips the sign of all TAU values. Parameters ---------- df : pd.DataFrame The input DataFrame with a 'TAU' column. Returns ------- pd.DataFrame The DataFrame with zero values in 'TAU' replaced by NaN. """ df2 = df.copy() tau_mask = df2.columns.str.contains('TAU') ssitc_mask = df2.columns.str.contains('SSITC') qc_mask = df2.columns.str.contains('QC') tau_col = df2.columns[tau_mask & ~ssitc_mask & ~qc_mask] logger.debug(f"TAU columns identified for fixing: {tau_col.tolist()}") for col in tau_col: bad_idx = (df2[col] == 0) | (df2[col] == -np.inf) df2.loc[bad_idx, col] = np.nan positive_mask = (df2[col] > 0) negative_mask = (df2[col] < 0) percent_positive = positive_mask.sum()/(positive_mask.sum() + negative_mask.sum()) if percent_positive > threshold: df2[col] *= -1 logger.debug(f"{col} values were flipped for station due to {percent_positive:.2%} of values being positive.") else: logger.debug(f"{col} values were not flipped for station due to only {percent_positive:.2%} of values being positive.") return df2
[docs] def fix_swc_percent(df: pd.DataFrame, logger: logging.Logger) -> pd.DataFrame: """ Convert fractional soil water content (SWC) values to percentages. This function checks soil water content columns (those starting with 'SWC_') and, if the values appear to be fractional (<= 1.5), multiplies them by 100 to convert them to percentages. Parameters ---------- df : pd.DataFrame The input DataFrame with SWC columns. logger : logging.Logger The logger for tracking the conversion process. Returns ------- pd.DataFrame The DataFrame with SWC values converted to percentages where applicable. """ df = df.copy() def _fix_one(s: pd.Series) -> pd.Series: s = pd.to_numeric(s, errors="coerce") m = s.max(skipna=True) if pd.notna(m) and m <= 1.5: s = s * 100.0 logger.debug(f"Converted {s.name} from fraction to percent") return s for name in [c for c in df.columns if str(c).startswith("SWC_")]: obj = df.loc[:, name] if isinstance(obj, pd.DataFrame): for sub in obj.columns: df[sub] = _fix_one(df[sub]) else: df[name] = _fix_one(obj) return df
[docs] def ssitc_scale(df: pd.DataFrame, logger: logging.Logger) -> pd.DataFrame: """ Scale SSITC (Signal Strength and Integrity Test) columns. This function checks specific SSITC columns and, if their values exceed a certain threshold (3), applies a scaling and rating transformation to them. Parameters ---------- df : pd.DataFrame The input DataFrame with SSITC columns. logger : logging.Logger The logger for tracking the scaling process. Returns ------- pd.DataFrame The DataFrame with SSITC columns scaled where applicable. """ ssitc_bases = [ "FC_SSITC_TEST", "LE_SSITC_TEST", "ET_SSITC_TEST", "H_SSITC_TEST", "TAU_SSITC_TEST", ] ssitc_columns = [ col for col in df.columns if any(col.startswith(base) for base in ssitc_bases) ] for column in ssitc_columns: if column in df.columns: if df[column].max() > 3: df[column] = scale_and_convert(df[column]) logger.debug(f"Scaled SSITC {column}") logger.debug(f"Scaled SSITC len: {len(df)}") return df
[docs] def scale_and_convert(column: pd.Series) -> pd.Series: """ Apply a rating transformation and convert the column to float type. This function applies a 'rating' function to each element of the Series and then converts the entire Series to float. Parameters ---------- column : pd.Series The input Series to be transformed. Returns ------- pd.Series The transformed and converted Series. """ column = column.apply(rating) return column
[docs] def rating(x): """ Categorize a numeric value into a discrete rating level (0, 1, or 2). This function categorizes a numeric value into one of three levels: - 0 for values between 0 and 3. - 1 for values between 4 and 6. - 2 for all other values. Parameters ---------- x : numeric or None The input value to be rated. Returns ------- int The rating level (0, 1, or 2). """ if x is None or np.isnan(x): x = 0 else: if 0 <= x <= 3: x = 0 elif 4 <= x <= 6: x = 1 else: x = 2 return x
[docs] def fill_na_drop_dups(df: pd.DataFrame) -> pd.DataFrame: """ Merge any number of duplicate columns with numeric suffixes (``.1``, ``.2``, ...), treating ``-9999`` as missing, and drop redundant duplicates. This function groups columns by their base name (the part before a trailing ``.<number>`` suffix). For each group, it merges values across the base column (if present) and all suffixed duplicates by preferring the first non-missing value at each row. During merging, the sentinel value ``-9999`` is treated as missing (converted to ``NaN``). After merging, remaining missing values are filled back with ``-9999`` and all duplicate suffixed columns are dropped, preserving the base column as the canonical result. Parameters ---------- df : pandas.DataFrame Input DataFrame that may contain duplicate columns named with numeric suffixes (e.g., ``"A.1"``, ``"A.2"``, ...). The unsuffixed base column (e.g., ``"A"``) is optional. Sentinel missing values are expected to be encoded as ``-9999``. Returns ------- pandas.DataFrame A new DataFrame where, for each base column, all suffixed duplicates have been merged into the base column and the duplicates removed. Any remaining missing values are filled with ``-9999``. Notes ----- - Columns are grouped by the regex pattern ``r"^(?P<base>.+?)\\.(?P<idx>\\d+)$"``. Columns not matching this pattern are treated as base columns. - Merge precedence follows ascending numeric suffix order, with the base column (if present) considered first. - The input DataFrame is not modified in place; a copy is returned. Examples -------- >>> import pandas as pd >>> import numpy as np >>> df = pd.DataFrame({ ... "A": [1, -9999, 3, -9999], ... "A.1": [np.nan, 2, -9999, 4], ... "A.2": [-9999, 9, np.nan, -9999], ... "B.1": [10, -9999, np.nan, 13], # no base 'B' column present ... "B.3": [np.nan, 11, 12, -9999] ... }) >>> fill_na_drop_dups(df) A B 0 1 10.0 1 2 11.0 2 3 12.0 3 4 13.0 """ df_out = df.copy() pattern = re.compile(r"^(?P<base>.+?)\.(?P<idx>\d+)$") # Group columns by base name with numeric suffixes collected and sorted groups: dict[str, list[tuple[int, str]]] = {} for col in df_out.columns: m = pattern.match(col) if m: base = m.group("base") idx = int(m.group("idx")) groups.setdefault(base, []).append((idx, col)) else: # Ensure singleton group for base-only column groups.setdefault(col, []).append((0, col)) to_drop: list[str] = [] for base, items in groups.items(): # Sort by numeric suffix (base column, if present, has idx==0) items_sorted = sorted(items, key=lambda t: t[0]) merged = None for _, col in items_sorted: s = df_out[col].replace(-9999, np.nan) merged = s if merged is None else merged.combine_first(s) # Re-impose sentinel for any remaining NaNs merged = merged.fillna(-9999) # Write back to base column (create if it didn't exist) df_out[base] = merged # Drop all duplicates except the base for _, col in items_sorted: if col != base: to_drop.append(col) if to_drop: # Deduplicate in case of overlap df_out = df_out.drop(columns=list(dict.fromkeys(to_drop))) return df_out
__all__ = [ "apply_fixes", "tau_fixer", "fix_swc_percent", "ssitc_scale", "scale_and_convert", "rating", "fill_na_drop_dups", ]