"""
Column cleanup and type conversion functions for the reformatter pipeline.
This module handles dropping unwanted columns, setting proper data types,
and filtering soil-related columns.
"""
import logging
from typing import List, Sequence, Union
import numpy as np
import pandas as pd
# Constants for soil column filtering
SOIL_SENSOR_SKIP_INDEX: int = 3
DEFAULT_SOIL_DROP_LIMIT: int = 4
[docs]
def drop_extra_soil_columns(
df: pd.DataFrame, config: dict, logger: logging.Logger
) -> pd.DataFrame:
"""
Drop redundant or unused soil-related columns from the DataFrame.
This function identifies and removes soil-related columns that are
considered extra or redundant based on the provided configuration.
Parameters
----------
df : pd.DataFrame
The input DataFrame with soil-related columns.
config : dict
The configuration dictionary containing lists of columns to drop.
logger : logging.Logger
The logger for tracking the column dropping process.
Returns
-------
pd.DataFrame
The DataFrame with extra soil columns removed.
"""
df = df.copy()
math_soils: Sequence[str] = config.get("math_soils_v2", [])
to_drop: List[str] = []
for col in df.columns:
parts = col.split("_")
if len(parts) >= 3 and parts[0] in {"SWC", "TS", "EC", "K"}:
try:
if int(parts[1]) >= SOIL_SENSOR_SKIP_INDEX:
to_drop.append(col)
continue
except ValueError:
pass
if col in math_soils[:-DEFAULT_SOIL_DROP_LIMIT]:
to_drop.append(col)
continue
if parts[0] in {"VWC", "Ka"} or col.endswith("cm_N") or col.endswith("cm_S"):
to_drop.append(col)
if to_drop:
logger.info("Dropping %d redundant soil columns", len(to_drop))
df = df.drop(columns=to_drop, errors="ignore")
return df
[docs]
def set_number_types(df: pd.DataFrame, logger: logging.Logger) -> pd.DataFrame:
"""
Convert columns in a DataFrame to the appropriate numeric types.
This function iterates through the columns of a DataFrame and converts
them to numeric types (integer or float) where appropriate. It handles
special cases for certain columns and logs warnings for duplicate columns.
Parameters
----------
df : pd.DataFrame
The input DataFrame.
logger : logging.Logger
The logger for tracking the type conversion process.
Returns
-------
pd.DataFrame
The DataFrame with columns converted to numeric types.
"""
logger.debug(f"Setting number types: {df.head(3)}")
dupes = pd.Series(df.columns).value_counts()
logger.debug(dupes[dupes > 1])
for col in df.columns:
logger.debug(f"Setting number types {col}")
pos = np.where(df.columns == col)[0]
if len(pos) == 1:
if col in ["MO_LENGTH", "RECORD", "FILE_NO", "DATALOGGER_NO"]:
df[col] = pd.to_numeric(df[col], downcast="integer", errors="coerce")
elif col in ["DATETIME_END"]:
df[col] = df[col]
elif col in ["TIMESTAMP_START", "TIMESTAMP_END", "SSITC"]:
df[col] = pd.to_numeric(df[col], downcast="integer", errors="coerce")
else:
df[col] = pd.to_numeric(df[col], errors="coerce")
else:
logger.warning(f"Column {col} appears multiple times in DataFrame")
for p in pos:
s = df.iloc[:, p]
if col in [
"MO_LENGTH",
"RECORD",
"FILE_NO",
"DATALOGGER_NO",
"TIMESTAMP_START",
"TIMESTAMP_END",
"SSITC",
]:
df.iloc[:, p] = pd.to_numeric(
s, downcast="integer", errors="coerce"
)
elif col == "DATETIME_END":
continue
else:
df.iloc[:, p] = pd.to_numeric(s, errors="coerce")
logger.debug(f"Set number types: {len(df)}")
return df
[docs]
def process_and_match_columns(
df_full: pd.DataFrame,
amflux: Union[pd.DataFrame, pd.Series]
) -> pd.DataFrame:
"""
Cleans column names of df_full by removing '_1', '_2', '_3', and '_4'
suffixes, compares the cleaned names against an 'amflux' variable list,
and returns a DataFrame of the results, along with printing the unmatched columns.
Args:
df_full: The DataFrame whose columns need to be cleaned and matched.
amflux: A DataFrame or Series that contains the 'Variable' column
or is the Series of variables to match against.
Returns:
A DataFrame containing the original columns, the cleaned columns,
and a boolean indicating if the cleaned column is in the amflux list.
"""
# 1. Column Cleaning Logic
clean_columns = list(df_full.columns)
# Iteratively remove suffixes: '_1', '_2', '_3', '_4'
# This loop is a condensed way to achieve the same result as the four
# separate list comprehensions in the original code.
suffixes_to_remove = ['_1', '_2', '_3', '_4']
for suffix in suffixes_to_remove:
clean_columns = [item.split(suffix)[0] for item in clean_columns]
clean_columns_series = pd.Series(clean_columns)
# 2. Determine the AMERIFLUX Variable List for Matching
# Handle both Series and DataFrame inputs for amflux
if isinstance(amflux, pd.DataFrame) and 'Variable' in amflux.columns:
amflux_variables = amflux['Variable']
elif isinstance(amflux, pd.Series):
amflux_variables = amflux
else:
raise ValueError("The 'amflux' argument must be a pandas Series or a DataFrame with a 'Variable' column.")
# 3. Matching
is_in_amflux = clean_columns_series.isin(amflux_variables)
# 4. Create Results DataFrame
results_df = pd.DataFrame({
'all_columns': df_full.columns,
'clean_columns': clean_columns,
'is_in_amflux': is_in_amflux
})
# 5. Print and Return
unmatched_df = results_df[results_df.is_in_amflux == False].sort_values('clean_columns')
print('COLUMNS NOT IN AMERIFLUX VARIABLE LIST\n')
print(unmatched_df)
return results_df
__all__ = [
"SOIL_SENSOR_SKIP_INDEX",
"DEFAULT_SOIL_DROP_LIMIT",
"drop_extra_soil_columns",
"set_number_types",
"drop_extras",
"process_and_match_columns",
]