"""
Timestamp transformation functions for the reformatter pipeline.
This module handles all datetime-related operations including timestamp
detection, conversion, resampling, and formatting.
"""
import logging
import pandas as pd
[docs]
def infer_datetime_col(df: pd.DataFrame, logger: logging.Logger) -> str | None:
"""
Infer the name of the timestamp column in a DataFrame.
This function searches for a timestamp column in the DataFrame by
checking a list of common names (e.g., 'TIMESTAMP_END'). If a
matching column is found, its name is returned. Otherwise, it logs
a warning and returns the name of the first column.
Parameters
----------
df : pd.DataFrame
The DataFrame to search for a timestamp column.
logger : logging.Logger
The logger to use for warning messages.
Returns
-------
str or None
The name of the timestamp column if found, otherwise the name of
the first column.
"""
datetime_col_options = ["TIMESTAMP_END", "TIMESTAMP_END_1"]
datetime_col_options += [col.lower() for col in datetime_col_options]
for cand in datetime_col_options:
if cand in df.columns:
return cand
logger.warning("No TIMESTAMP column in dataframe")
return df.iloc[:, 0].name # type: ignore
[docs]
def fix_timestamps(df: pd.DataFrame, logger: logging.Logger) -> pd.DataFrame:
"""
Convert the timestamp column to datetime objects and handle missing values.
This function identifies the timestamp column, converts it to datetime
objects, and removes any rows where the timestamp could not be parsed.
Parameters
----------
df : pd.DataFrame
The input DataFrame with a timestamp column.
logger : logging.Logger
The logger for tracking progress and warnings.
Returns
-------
pd.DataFrame
The DataFrame with a 'DATETIME_END' column of datetime objects.
"""
df = df.copy()
if "TIMESTAMP" in df.columns:
df = df.drop(["TIMESTAMP"], axis=1)
ts_col = infer_datetime_col(df, logger)
if ts_col is None:
return df
logger.debug(f"TS col {ts_col}")
logger.debug(f"TIMESTAMP_END col {df[ts_col][0]}")
ts_format = "%Y%m%d%H%M"
df["DATETIME_END"] = pd.to_datetime(df[ts_col], format=ts_format, errors="coerce")
logger.debug(f"Len of unfixed timestamps {len(df)}")
df = df.dropna(subset=["DATETIME_END"])
logger.debug(f"Len of fixed timestamps {len(df)}")
return df
[docs]
def resample_timestamps(df: pd.DataFrame, interval: int, logger: logging.Logger) -> pd.DataFrame:
"""
Resample a DataFrame to 30- or 60- minute intervals.
This function resamples the DataFrame to a fixed 30-or 60-minute frequency
based on the 'DATETIME_END' column. It also handles duplicate
timestamps by selecting the first available value.
Parameters
----------
df : pd.DataFrame
The input DataFrame with a 'DATETIME_END' column.
interval: int
The resampling interval in minutes (30 or 60 minutes)
logger : logging.Logger
The logger for tracking progress.
Returns
-------
pd.DataFrame
The resampled DataFrame with a 30- or 60-minute frequency index.
"""
today = pd.Timestamp("today").floor("D")
df = df[df["DATETIME_END"] <= today]
df = (
df.drop_duplicates(subset=["DATETIME_END"])
.set_index("DATETIME_END")
.sort_index()
)
if (interval ==30) or (interval==60):
interval_str = str(interval)+"min"
logger.debug(f'Resampling at interval of {interval_str}')
else:
logger.debug(f"Interval not 30 or 60 minutes; resampling at default rate of 30 minutes")
interval_str = "30min"
interval = 30
df = df.resample(interval_str).agg('first')
df["SAMPLING_INTERVAL"] = interval
logger.debug(f"Len of resampled timestamps {len(df)}")
return df
[docs]
def timestamp_reset(df: pd.DataFrame, minutes: int = 30) -> pd.DataFrame:
"""
Reset TIMESTAMP_START and TIMESTAMP_END columns based on the DataFrame index.
This function generates new 'TIMESTAMP_START' and 'TIMESTAMP_END' columns
based on the DataFrame's datetime index. The 'TIMESTAMP_START' is calculated
by subtracting a specified number of minutes to the start time.
Parameters
----------
df : pd.DataFrame
The input DataFrame with a datetime index.
minutes : int, optional
The number of minutes to add to the start time to calculate the
end time. Defaults to 30.
Returns
-------
pd.DataFrame
The DataFrame with updated 'TIMESTAMP_START' and 'TIMESTAMP_END' columns.
"""
df["TIMESTAMP_END"] = df.index.strftime("%Y%m%d%H%M").astype(int)
df["TIMESTAMP_START"] = (
(df.index - pd.Timedelta(minutes=minutes)).strftime("%Y%m%d%H%M").astype(int)
)
return df
[docs]
def add_ameriflux_timestamps(df, interval_minutes=30):
"""
Creates TIMESTAMP_START and TIMESTAMP_END columns from a DatetimeIndex
in the YYYYMMDDHHmm format required by AmeriFlux.
"""
# 1. Derive the start and end datetime objects
# Assuming your current index is the END of the interval
dt_end = df.index
dt_start = df.index - pd.Timedelta(minutes=interval_minutes)
# 2. Convert to the specific string format: 202407301500
df['TIMESTAMP_START'] = dt_start.strftime('%Y%m%d%H%M').astype(object)
df['TIMESTAMP_END'] = dt_end.strftime('%Y%m%d%H%M').astype(object)
return(df)
__all__ = [
"infer_datetime_col",
"fix_timestamps",
"resample_timestamps",
"timestamp_reset",
'add_ameriflux_timestamps'
]