Source code for micromet.report.tools

from scipy.signal import find_peaks
import pandas as pd
import numpy as np

import plotly.graph_objects as go
from typing import Union, List, Dict

import matplotlib.pyplot as plt


[docs] def find_irr_dates( df, swc_col="SWC_1_1_1", do_plot=False, dist=20, height=30, prom=0.6 ): """ Detect irrigation events from a soil water content time series. This function identifies peaks in soil water content (SWC) data that are likely to be irrigation events, based on their prominence, height, and the distance between them. Parameters ---------- df : pd.DataFrame A DataFrame with a DatetimeIndex containing the SWC data. swc_col : str, optional The name of the column containing SWC data. Defaults to 'SWC_1_1_1'. do_plot : bool, optional If True, a plot of the SWC time series with the detected irrigation events will be displayed. Defaults to False. dist : int, optional The minimum number of time steps between detected peaks. Defaults to 20. height : float, optional The minimum height of the peaks to be considered irrigation events. Defaults to 30. prom : float, optional The minimum prominence of the peaks. Defaults to 0.6. Returns ------- tuple[pd.DatetimeIndex, pd.Series] A tuple containing the dates of the detected irrigation events and the corresponding SWC values. """ df_irr_season = df[df.index.month.isin([4, 5, 6, 7, 8, 9, 10])] peaks, _ = find_peaks( df_irr_season[swc_col], distance=dist, height=height, prominence=(prom, None) ) dates_of_irr = df_irr_season.iloc[peaks].index swc_during_irr = df_irr_season[swc_col].iloc[peaks] if do_plot: plt.plot(df.index, df[swc_col]) plt.plot(dates_of_irr, swc_during_irr, "x") plt.show() return dates_of_irr, swc_during_irr
[docs] def find_gaps(df, columns, missing_value=-9999, min_gap_periods=1): """ Find and report gaps in time series data. This function identifies continuous periods of missing data (either NaN or a specified missing value) in one or more columns of a DataFrame. Parameters ---------- df : pd.DataFrame A DataFrame with a regular time series index. columns : str or list of str The column(s) to check for gaps. missing_value : numeric, optional A specific value to be treated as missing. Defaults to -9999. min_gap_periods : int, optional The minimum number of consecutive missing periods to be considered a gap. Defaults to 1. Returns ------- pd.DataFrame A DataFrame with information about each detected gap, including start and end times, duration, and the number of missing records. """ if isinstance(columns, str): columns = [columns] # Initialize list to store gap information gaps = [] for col in columns: # Create boolean mask for missing values is_missing = df[col].isna() | (df[col] == missing_value) # Get the frequency of the time series as a pandas timedelta freq = pd.tseries.frequencies.to_offset(pd.infer_freq(df.index)) # Find runs of missing values missing_runs = (is_missing != is_missing.shift()).cumsum()[is_missing] if len(missing_runs) == 0: continue # Group consecutive missing values for run_id in missing_runs.unique(): run_mask = missing_runs == run_id run_indices = missing_runs[run_mask].index # Only consider runs longer than min_gap_periods if len(run_indices) > min_gap_periods: gap_start = run_indices[0] gap_end = run_indices[-1] # Calculate duration in hours duration_hours = (gap_end - gap_start).total_seconds() / 3600 gaps.append( { "gap_start": gap_start, "gap_end": gap_end, "duration_hours": duration_hours, "missing_records": len(run_indices), "column": col, } ) if not gaps: return pd.DataFrame( columns=[ "gap_start", "gap_end", "duration_hours", "missing_records", "column", ] ) return pd.DataFrame(gaps).sort_values("gap_start").reset_index(drop=True)
[docs] def plot_gaps(gaps_df, title="Time Series Data Gaps"): """ Create a Gantt chart visualization of gaps in time series data. This function takes a DataFrame of gap information and creates a Gantt chart to visualize the duration and timing of data gaps for different variables. Parameters ---------- gaps_df : pd.DataFrame A DataFrame containing gap information, as returned by the `find_gaps` function. title : str, optional The title for the plot. Defaults to "Time Series Data Gaps". Returns ------- go.Figure or None An interactive Plotly figure showing the data gaps, or None if there are no gaps to plot. """ if len(gaps_df) == 0: print("No gaps found to plot.") return None # Create figure fig = go.Figure() # Get unique columns and assign colors unique_columns = gaps_df["column"].unique() # Define a set of colors manually colors = [ "rgb(166,206,227)", "rgb(31,120,180)", "rgb(178,223,138)", "rgb(51,160,44)", "rgb(251,154,153)", "rgb(227,26,28)", "rgb(253,191,111)", "rgb(255,127,0)", "rgb(202,178,214)", ] # Cycle through colors if more variables than colors color_map = dict( zip( unique_columns, [colors[i % len(colors)] for i in range(len(unique_columns))], ) ) # Add gaps as horizontal bars for idx, row in gaps_df.iterrows(): fig.add_trace( go.Bar( x=[row["duration_hours"]], y=[row["column"]], orientation="h", base=[(row["gap_start"] - pd.Timestamp.min).total_seconds() / 3600], marker_color=color_map[row["column"]], name=row["column"], showlegend=False, hovertemplate=( f"Column: {row['column']}<br>" + f"Start: {row['gap_start']}<br>" + f"End: {row['gap_end']}<br>" + f"Duration: {row['duration_hours']:.1f} hours<br>" + f"Missing Records: {row['missing_records']}" ), ) ) # Update layout fig.update_layout( title=title, xaxis_title="Time", yaxis_title="Variables", barmode="overlay", height=max(200, 100 * len(unique_columns)), showlegend=False, xaxis=dict(tickformat="%Y-%m-%d %H:%M", type="date"), ) return fig
[docs] def detect_extreme_variations( df: pd.DataFrame, fields: Union[str, List[str]] = None, frequency: str = "D", variation_threshold: float = 3.0, null_value: Union[float, int] = -9999, min_periods: int = 2, ) -> Dict[str, pd.DataFrame]: """ Detect extreme variations in time series data. This function analyzes one or more fields in a DataFrame to identify points that deviate significantly from the mean within a given time frequency. Parameters ---------- df : pd.DataFrame A DataFrame with a DatetimeIndex. fields : str or list of str, optional The column(s) to analyze. If None, all numeric columns are used. Defaults to None. frequency : str, optional The frequency for grouping data (e.g., 'D' for daily). Defaults to 'D'. variation_threshold : float, optional The number of standard deviations from the mean to be considered an extreme variation. Defaults to 3.0. null_value : float or int, optional A value to be treated as null. Defaults to -9999. min_periods : int, optional The minimum number of valid observations required to calculate variation. Defaults to 2. Returns ------- dict A dictionary containing the calculated variations, a boolean DataFrame of extreme points, and a summary of the analysis. """ # Validate input DataFrame if not isinstance(df.index, pd.DatetimeIndex): raise ValueError("DataFrame must have a datetime index") # Create copy of DataFrame df_copy = df.copy() # Replace null values df_copy = df_copy.replace(null_value, np.nan) # Select fields to analyze if fields is None: fields = df_copy.select_dtypes(include=[np.number]).columns.tolist() elif isinstance(fields, str): fields = [fields] # Initialize results variations = pd.DataFrame(index=df_copy.index) extreme_points = pd.DataFrame(index=df_copy.index) summary_stats = [] # Calculate variations and detect extremes for each field for field in fields: # Group by frequency and calculate statistics grouped = df_copy[field].groupby(pd.Grouper(freq=frequency)) # Calculate variation metrics field_var = f"{field}_variation" variations[field_var] = grouped.transform( lambda x: ( np.abs(x - x.mean()) / x.std() if len(x.dropna()) >= min_periods else np.nan ) ) # Flag extreme variations extreme_points[f"{field}_extreme"] = variations[field_var] > variation_threshold # Calculate summary statistics field_summary = { "field": field, "total_observations": len(df_copy[field].dropna()), "extreme_variations": extreme_points[f"{field}_extreme"].sum(), "mean_variation": variations[field_var].mean(), "max_variation": variations[field_var].max(), "std_variation": variations[field_var].std(), } summary_stats.append(field_summary) # Create summary DataFrame summary_df = pd.DataFrame(summary_stats) return { "variations": variations, "extreme_points": extreme_points, "summary": summary_df, }
[docs] def clean_extreme_variations( df: pd.DataFrame, fields: Union[str, List[str]] = None, frequency: str = "D", variation_threshold: float = 3.0, null_value: Union[float, int] = -9999, min_periods: int = 2, replacement_method: str = "nan", ) -> Dict[str, Union[pd.DataFrame, pd.DataFrame]]: """ Clean extreme variations from time series data. This function identifies and replaces extreme values in a DataFrame using one of several methods. Parameters ---------- df : pd.DataFrame A DataFrame with a DatetimeIndex. fields : str or list of str, optional The column(s) to clean. If None, all numeric columns are used. Defaults to None. frequency : str, optional The frequency for analyzing variations. Defaults to 'D'. variation_threshold : float, optional The threshold for detecting extreme variations. Defaults to 3.0. null_value : float or int, optional A value to treat as null. Defaults to -9999. min_periods : int, optional The minimum number of observations for calculation. Defaults to 2. replacement_method : str, optional The method for replacing extreme values ('nan', 'interpolate', 'mean', 'median'). Defaults to 'nan'. Returns ------- dict A dictionary containing the cleaned data, a summary of the cleaning process, and the points that were removed. """ # Validate replacement method valid_methods = ["nan", "interpolate", "mean", "median"] if replacement_method not in valid_methods: raise ValueError(f"replacement_method must be one of {valid_methods}") # Detect extreme variations variation_results = detect_extreme_variations( df=df, fields=fields, frequency=frequency, variation_threshold=variation_threshold, null_value=null_value, min_periods=min_periods, ) # Create copy of input DataFrame cleaned_df = df.copy() # Initialize summary statistics cleaning_summary = [] removed_points = pd.DataFrame(index=df.index) # Process each field if fields is None: fields = df.select_dtypes(include=[np.number]).columns.tolist() elif isinstance(fields, str): fields = [fields] for field in fields: # Get extreme points for this field extreme_mask = variation_results["extreme_points"][f"{field}_extreme"] # Store removed values removed_points[field] = np.where(extreme_mask, cleaned_df[field], np.nan) # Apply replacement method if replacement_method == "nan": cleaned_df.loc[extreme_mask, field] = np.nan elif replacement_method == "interpolate": temp_series = cleaned_df[field].copy() temp_series[extreme_mask] = np.nan cleaned_df[field] = temp_series.interpolate(method="time") elif replacement_method in ["mean", "median"]: grouped = cleaned_df[field].groupby(pd.Grouper(freq=frequency)) if replacement_method == "mean": replacements = grouped.transform("mean") else: replacements = grouped.transform("median") cleaned_df.loc[extreme_mask, field] = replacements[extreme_mask] # Calculate cleaning summary cleaning_stats = { "field": field, "points_removed": extreme_mask.sum(), "percent_removed": (extreme_mask.sum() / len(df)) * 100, "replacement_method": replacement_method, } cleaning_summary.append(cleaning_stats) return { "cleaned_data": cleaned_df, "cleaning_summary": pd.DataFrame(cleaning_summary), "removed_points": removed_points, }
[docs] def polar_to_cartesian_dataframe(df, wd_column="WD", dist_column="Dist"): """ Convert polar coordinates in a DataFrame to Cartesian coordinates. Parameters ---------- df : pd.DataFrame A DataFrame containing the polar coordinate data. wd_column : str, optional The name of the column with the wind direction in degrees. Defaults to "WD". dist_column : str, optional The name of the column with the distance. Defaults to "Dist". Returns ------- pd.DataFrame The DataFrame with added 'X' and 'Y' columns. """ # Create copies of the input columns to avoid modifying original data wd = df[wd_column].copy() dist = df[dist_column].copy() # Identify invalid values (-9999 or NaN) invalid_mask = (wd == -9999) | (dist == -9999) | wd.isna() | dist.isna() # Convert degrees from north to standard polar angle (radians) where valid theta_radians = np.radians(90 - wd) # Calculate Cartesian coordinates, setting invalid values to NaN df[f"X_{dist_column}"] = np.where( invalid_mask, np.nan, dist * np.cos(theta_radians) ) df[f"Y_{dist_column}"] = np.where( invalid_mask, np.nan, dist * np.sin(theta_radians) ) return df
[docs] def aggregate_to_daily_centroid( df, date_column="Timestamp", x_column="X", y_column="Y", weighted=True, ): """ Aggregate half-hourly coordinate data to daily centroids. This function calculates the daily centroid of a set of coordinates, with an option to weight the calculation by another variable. Parameters ---------- df : pd.DataFrame A DataFrame with timestamp and coordinate data. date_column : str, optional The name of the column containing the timestamps. Defaults to "Timestamp". x_column : str, optional The name of the column with the X coordinates. Defaults to "X". y_column : str, optional The name of the column with the Y coordinates. Defaults to "Y". weighted : bool, optional If True, the centroid calculation is weighted by the 'ET' column. Defaults to True. Returns ------- pd.DataFrame A DataFrame with the aggregated daily centroids. """ # Define a lambda function to compute the weighted mean: wm = lambda x: np.average(x, weights=df.loc[x.index, "ET"]) # Ensure datetime format df[date_column] = pd.to_datetime(df[date_column]) # Group by date (ignoring time component) df["Date"] = df[date_column].dt.date # Calculate centroid (mean of X and Y) if weighted: # Compute weighted average using ET as weights daily_centroids = ( df.groupby("Date") .apply( lambda g: pd.Series( { x_column: (g[x_column] * g["ET"]).sum() / g["ET"].sum(), y_column: (g[y_column] * g["ET"]).sum() / g["ET"].sum(), } ) ) .reset_index() ) else: daily_centroids = ( df.groupby("Date").agg({x_column: "mean", y_column: "mean"}).reset_index() ) # Groupby and aggregate with namedAgg [1]: return daily_centroids
[docs] def compute_Cw(sigma_w, u_star, target=1.25): """ Compute the vertical velocity correction factor, Cw. This function calculates Cw based on the ratio of the standard deviation of vertical velocity (sigma_w) to the friction velocity (u_star). Parameters ---------- sigma_w : float The standard deviation of the vertical velocity. u_star : float The friction velocity. target : float, optional The target ratio for the correction. Defaults to 1.25. Returns ------- float The calculated correction factor, Cw. Returns 1.0 if no correction is needed, or NaN if the ratio is invalid. """ ratio = sigma_w / u_star if np.isnan(ratio) or np.isclose(u_star, 0): return np.nan # Only apply correction if measured ratio < target if ratio < target: return (target / ratio) ** 2 else: return 1.0
[docs] def filter_near_neutral(z_over_L, lower=-0.1, upper=0.0): """ Filter for near-neutral atmospheric stability conditions. This function returns a boolean mask indicating where the stability parameter z/L falls within a specified range for near-neutral conditions. Parameters ---------- z_over_L : array_like An array or Series of z/L values. lower : float, optional The lower bound for near-neutral stability. Defaults to -0.1. upper : float, optional The upper bound for near-neutral stability. Defaults to 0.0. Returns ------- np.ndarray A boolean mask that is True for near-neutral conditions. """ z_over_L = np.asarray(z_over_L, dtype=float) mask = (z_over_L >= lower) & (z_over_L < upper) return mask
# Example usage: if __name__ == "__main__": # Create sample data dates = pd.date_range(start="2024-01-01", end="2024-01-02", freq="30min") df = pd.DataFrame(index=dates) # Create multiple columns with gaps df["temperature"] = 20 + np.random.randn(len(dates)) df["humidity"] = 60 + np.random.randn(len(dates)) df["pressure"] = 1013 + np.random.randn(len(dates)) # Insert some gaps df.loc["2024-01-01 10:00":"2024-01-01 12:00", "temperature"] = -9999 df.loc["2024-01-01 15:00":"2024-01-01 16:00", "humidity"] = np.nan df.loc["2024-01-01 18:00":"2024-01-01 20:00", "pressure"] = -9999 # Find gaps gaps_df = find_gaps(df, ["temperature", "humidity", "pressure"], min_gap_periods=1) # Create and show plot fig = plot_gaps(gaps_df, "Sample Data Gaps") fig.show()
[docs] def subset_year(df, year, subset_type='growing_season', gs_start='04-01', gs_end='10-31'): """ Subsets a DataFrame based on a seasonal window for a specific year. This function filters data based on a DatetimeIndex. It can extract either the "growing season" within a single year or the "winter" period that spans from the end of the specified year to the start of the next. Args: df (pd.DataFrame): The input DataFrame. Must have a DatetimeIndex. year (int): The year to perform the subsetting for. subset_type (str, optional): The type of subset to return. Options are 'growing_season' or 'winter'. Defaults to 'growing_season'. gs_start (str, optional): The start date of the growing season in 'MM-DD' format. Defaults to '04-01'. gs_end (str, optional): The end date of the growing season in 'MM-DD' format. Defaults to '10-31'. Returns: pd.DataFrame: A subsetted DataFrame if data is found; otherwise, returns None. Raises: AttributeError: If the DataFrame index is not a DatetimeIndex. """ if subset_type == 'growing_season': mask = (df.index >= f'{year}-{gs_start} 0:00') & (df.index <= f'{year}-{gs_end} 23:30') output = df[mask] if subset_type == 'winter': mask = (df.index > f'{year}-{gs_end} 23:30') & (df.index < f'{year+1}-{gs_start} 0:00') output = df[mask] if output.empty: print(f"No data in {subset_type} for year {year}") else: print(f'Data subset from {output.index.min()} to {output.index.max()}') return output