Source code for micromet.format.reformatter

"""
This module provides the Reformatter class for cleaning and standardizing
station data for flux/met processing, with integrated timestamp alignment checks.
"""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple, Union

import pandas as pd
import numpy as np

import micromet.format.reformatter_vars as reformatter_vars
import micromet.qaqc.variable_limits as variable_limits
from micromet.utils import logger_check
from micromet.format import transformers
from micromet.qaqc.netrad_limits import analyze_timestamp_alignment, flag_issues


[docs] class Reformatter: """ A class to clean and standardize station data for flux/met processing. This class provides a pipeline for preparing raw station data by applying a series of transformations, including fixing timestamps, renaming columns, applying physical limits, and checking timestamp alignment. Parameters ---------- var_limits_csv : str or Path, optional Path to a CSV file containing variable limits. If not provided, default limits are used. drop_soil : bool, optional If True, extra soil-related columns are dropped. Defaults to True. check_timestamps : bool, optional If True, perform timestamp alignment analysis on radiation data. Defaults to False. site_lat : float, optional Latitude of the site (required if check_timestamps=True). site_lon : float, optional Longitude of the site (required if check_timestamps=True). site_utc_offset : float, optional UTC offset in hours for the site (required if check_timestamps=True). logger : logging.Logger, optional A logger for tracking the reformatting process. If not provided, a default logger is used. Attributes ---------- logger : logging.Logger The logger used for logging messages. config : dict A dictionary of configuration parameters for the reformatting process. varlimits : pd.DataFrame A DataFrame containing the physical limits for each variable. drop_soil : bool A flag indicating whether to drop extra soil columns. check_timestamps : bool A flag indicating whether to perform timestamp alignment checks. site_lat : float The latitude of the site. site_lon : float The longitude of the site. site_utc_offset : float The UTC offset of the site in hours. """
[docs] def __init__( self, var_limits_csv: str | Path | None = None, drop_soil: bool = True, check_timestamps: bool = False, site_lat: float | None = None, site_lon: float | None = None, site_utc_offset: int = -7, logger: logging.Logger | None = None, ): """ Initialize the Reformatter. Parameters ---------- var_limits_csv : str or Path, optional Path to a CSV file containing variable limits. drop_soil : bool, optional If True, extra soil-related columns are dropped. Defaults to True. check_timestamps : bool, optional If True, perform timestamp alignment analysis. Defaults to False. site_lat : float, optional Latitude of the site (required if check_timestamps=True). site_lon : float, optional Longitude of the site (required if check_timestamps=True). site_utc_offset : float, optional UTC offset in hours (required if check_timestamps=True). logger : logging.Logger, optional A logger for tracking the reformatting process. """ self.logger = logger_check(logger) self.config = reformatter_vars.config if var_limits_csv is None: self.varlimits = variable_limits.limits else: if isinstance(var_limits_csv, str): var_limits_csv = Path(var_limits_csv) self.varlimits = pd.read_csv( var_limits_csv, index_col=0, na_values=["-9999", "NAN", "NaN", "nan"] ) self.logger.debug(f"Loaded variable limits from {var_limits_csv}") self.drop_soil = drop_soil self.check_timestamps = check_timestamps self.site_lat = site_lat self.site_lon = site_lon self.site_utc_offset = site_utc_offset # Validate timestamp check parameters if self.check_timestamps: if any(x is None for x in [site_lat, site_lon, site_utc_offset]): raise ValueError( "site_lat, site_lon, and site_utc_offset are required when " "check_timestamps=True" )
[docs] def prepare(self, df, interval=30, data_type="eddy"): """Current method - keep for backward compatibility""" df, report, _ = self.process(df, interval=interval, data_type=data_type) return df, report
[docs] def preprocess(self, df, data_type="eddy", interval=30): """ Preprocess the data by applying initial cleaning and standardization steps. """ self.logger.info("Starting reformat (%s rows)", len(df)) # Standard pipeline df = df.pipe(transformers.fix_timestamps, logger=self.logger) df = df.pipe( transformers.rename_columns, data_type=data_type, config=self.config, logger=self.logger, ) df = df.pipe(transformers.make_unique_cols) df = df.pipe(transformers.set_number_types, logger=self.logger) df = df.pipe( transformers.resample_timestamps, interval=interval, logger=self.logger ) df = df.pipe(transformers.timestamp_reset, minutes=interval) return df
[docs] def finalize(self, df): """ Finalize the data by applying cleaning and standardization steps. """ df = df.pipe(transformers.fill_na_drop_dups) df = df.pipe(transformers.apply_fixes, logger=self.logger) # note that important to apply_fixes and rename_columns before running physical limits! df, mask, report = transformers.apply_physical_limits(df) # Timestamp alignment check (if enabled and radiation data available) timestamp_results = None if self.check_timestamps: timestamp_results = self._check_timestamp_alignment(df) if self.drop_soil: df = df.pipe( transformers.drop_extra_soil_columns, config=self.config, logger=self.logger, ) df = df.pipe(transformers.drop_extras, config=self.config).fillna( transformers.MISSING_VALUE ) df = df.pipe(transformers.col_order, logger=self.logger) self.logger.info("Done; final shape: %s", df.shape) return df, report, timestamp_results
[docs] def process( self, df: pd.DataFrame, interval: int, data_type: str = "eddy" ) -> Tuple[pd.DataFrame, pd.DataFrame, Optional[Dict]]: """ Prepare the data by applying a series of cleaning and standardization steps. This method takes a DataFrame of station data and applies a pipeline of transformations to clean and standardize it. The steps include fixing timestamps, renaming columns, setting numeric types, resampling, applying physical limits, and optionally checking timestamp alignment. Parameters ---------- df : pd.DataFrame The input DataFrame of station data. data_type : str, optional The type of data being processed (e.g., 'eddy', 'met'). This is used to determine which column renaming map to use. Defaults to 'eddy'. interval: int The sampling interval used with the data; must be either 30 or 60 minutes Returns ------- tuple[pd.DataFrame, pd.DataFrame, dict | None] A tuple containing: - The prepared DataFrame with standardized and cleaned data. - A report DataFrame detailing the changes made during the application of physical limits. - A dictionary with timestamp alignment results (if check_timestamps=True), or None otherwise. Contains keys: 'summary', 'composites', 'flags'. """ self.logger.info("Starting reformat (%s rows)", len(df)) # Standard pipeline df = df.pipe(transformers.fix_timestamps, logger=self.logger) df = df.pipe( transformers.rename_columns, data_type=data_type, config=self.config, logger=self.logger, ) df = df.pipe(transformers.make_unique_cols) df = df.pipe(transformers.set_number_types, logger=self.logger) df = df.pipe( transformers.resample_timestamps, interval=interval, logger=self.logger ) df = df.pipe(transformers.timestamp_reset, minutes=interval) df = df.pipe(transformers.fill_na_drop_dups) df = df.pipe(transformers.apply_fixes, logger=self.logger) # note that important to apply_fixes and rename_columns before running physical limits! df, mask, report = transformers.apply_physical_limits(df) # Timestamp alignment check (if enabled and radiation data available) timestamp_results = None if self.check_timestamps: timestamp_results = self._check_timestamp_alignment(df) if self.drop_soil: df = df.pipe( transformers.drop_extra_soil_columns, config=self.config, logger=self.logger, ) df = df.pipe(transformers.drop_extras, config=self.config).fillna( transformers.MISSING_VALUE ) df = df.pipe(transformers.col_order, logger=self.logger) self.logger.info("Done; final shape: %s", df.shape) return df, report, timestamp_results
def _check_timestamp_alignment(self, df: pd.DataFrame) -> Dict | None: """ Perform timestamp alignment analysis on radiation data. This method analyzes SW_IN and/or PPFD_IN against theoretical top-of-atmosphere radiation to detect potential timestamp issues such as timezone errors, DST problems, or sensor issues. Parameters ---------- df : pd.DataFrame The DataFrame with processed data. Returns ------- dict | None A dictionary containing: - 'summary': DataFrame with window-by-window analysis results - 'composites': Dictionary of WindowComposite objects - 'flags': Dictionary of detected issues Returns None if radiation columns are not available. """ # Check if we have the necessary radiation columns has_sw = "SW_IN" in df.columns or "SW_IN_1_1_1" in df.columns has_ppfd = "PPFD_IN" in df.columns or "PPFD_IN_1_1_1" in df.columns if not (has_sw or has_ppfd): self.logger.warning( "SW_IN and PPFD_IN not found in data; " "skipping timestamp alignment check" ) return None self.logger.info("Performing timestamp alignment analysis...") try: # Run the analysis summary, composites = analyze_timestamp_alignment( df, lat=self.site_lat, lon=self.site_lon, std_utc_offset_hours=self.site_utc_offset, time_from="END", # Since we use TIMESTAMP_END sw_col="SW_IN" if "SW_IN" in df.columns else "SW_IN_1_1_1", ppfd_col="PPFD_IN" if "PPFD_IN" in df.columns else "PPFD_IN_1_1_1", assume_naive_is_local=False, ) # Flag potential issues flags = flag_issues(summary) # Log any detected issues if flags: self.logger.warning("Timestamp alignment issues detected:") for issue_type, message in flags.items(): self.logger.warning(f" {issue_type}: {message}") else: self.logger.info("No significant timestamp alignment issues detected") return {"summary": summary, "composites": composites, "flags": flags} except Exception as e: self.logger.error(f"Error during timestamp alignment check: {e}") return None