import requests
import datetime
import logging
from typing import Union, Tuple, Optional
from requests.auth import HTTPBasicAuth
import pandas as pd
from io import BytesIO
import configparser
import sqlalchemy
from micromet.format.reformatter import Reformatter
from micromet.utils import logger_check
micromet_version = "0.2.1"
[docs]
class StationDataDownloader:
"""
A class to manage downloading data from a station's logger.
This class handles the connection and data download from a Campbell
Scientific data logger via its web API.
Parameters
----------
config : configparser.ConfigParser or dict
A configuration object containing station details and credentials.
logger : logging.Logger, optional
A logger for logging messages. If None, a new logger is created.
Attributes
----------
config : configparser.ConfigParser or dict
The configuration object.
logger : logging.Logger
The logger instance.
logger_credentials : requests.auth.HTTPBasicAuth
The authentication credentials for the logger.
"""
[docs]
def __init__(
self,
config: Union[configparser.ConfigParser, dict],
logger: logging.Logger = None,
):
"""
Initialize the StationDataDownloader.
Parameters
----------
config : configparser.ConfigParser or dict
A configuration object containing station details and
credentials.
logger : logging.Logger, optional
A logger for logging messages. If None, a new logger is
created.
"""
self.config = config
self.logger = logger_check(logger)
self.logger_credentials = HTTPBasicAuth(
config["LOGGER"]["login"], config["LOGGER"]["pw"]
)
def _get_port(self, station: str, loggertype: str = "eddy") -> int:
"""
Get the port number for a given station and logger type.
Parameters
----------
station : str
The identifier for the station.
loggertype : str, optional
The type of logger ('eddy' or 'met'). Defaults to 'eddy'.
Returns
-------
int
The port number for the specified station and logger type.
"""
port_key = f"{loggertype}_port"
return int(self.config[station].get(port_key, 80))
[docs]
def get_times(
self, station: str, loggertype: str = "eddy"
) -> Tuple[Optional[str], str]:
"""
Retrieve the current time from the logger and the system.
This method queries a station's logger for its current time and
also gets the current system time for comparison.
Parameters
----------
station : str
The identifier for the station.
loggertype : str, optional
The type of logger ('eddy' or 'met'). Defaults to 'eddy'.
Returns
-------
tuple[str | None, str]
A tuple containing the logger's current time as a string and
the system's current time as a string.
"""
ip = self.config[station]["ip"]
port = self._get_port(station, loggertype)
clk_url = f"http://{ip}:{port}/?"
clk_args = {
"command": "ClockCheck",
"uri": "dl",
"format": "json",
}
clktimeresp = requests.get(
clk_url, params=clk_args, auth=self.logger_credentials
).json()
clktime = clktimeresp.get("time")
comptime = f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}"
return clktime, comptime
[docs]
@staticmethod
def get_station_id(stationid: str) -> str:
"""
Extract the station ID from a full station identifier string.
Parameters
----------
stationid : str
The full station identifier (e.g., 'US-ABC').
Returns
-------
str
The extracted station ID (e.g., 'ABC').
"""
return stationid.split("-")[-1]
[docs]
def download_from_station(
self,
station: str,
loggertype: str = "eddy",
mode: str = "since-time",
p1: str = "0",
p2: str = "0",
):
"""
Download data from a station's logger.
This method constructs a request to the station's web API to
download data based on the specified parameters.
Parameters
----------
station : str
The identifier for the station.
loggertype : str, optional
The type of logger ('eddy' or 'met'). Defaults to 'eddy'.
mode : str, optional
The data query mode ('since-time', 'most-recent', etc.).
Defaults to 'since-time'.
p1 : str, optional
The primary parameter for the query (e.g., start time).
Defaults to "0".
p2 : str, optional
The secondary parameter for the query (e.g., end time).
Defaults to "0".
Returns
-------
tuple[pd.DataFrame | None, float | None, int]
A tuple containing the downloaded data as a DataFrame, the
size of the data packet in MB, and the HTTP status code.
"""
ip = self.config[station]["ip"]
port = self._get_port(station, loggertype)
tabletype = (
"Flux_AmeriFluxFormat" if loggertype == "eddy" else "Statistics_AmeriFlux"
)
url = f"http://{ip}:{port}/tables.html?"
params = {
"command": "DataQuery",
"mode": f"{mode}",
"format": "toA5",
"uri": f"dl:{tabletype}",
}
if p1 == "0" or p1 == 0:
params["p1"] = "0"
else:
params["p1"] = p1
if p2 == "0" or p2 == 0:
if mode == "since-time":
params["p1"] = (
f"{datetime.datetime.now() - datetime.timedelta(days=10):%Y-%m-%d}"
)
else:
params["p2"] = p2
response = requests.get(url, params=params, auth=self.logger_credentials)
if response.status_code == 200:
raw_data = pd.read_csv(BytesIO(response.content), skiprows=[0, 2, 3])
pack_size = len(response.content) * 1e-6
return raw_data, pack_size, response.status_code
else:
self.logger.error(f"Error downloading from station: {response.status_code}")
return None, None, response.status_code
[docs]
class StationDataProcessor(StationDataDownloader):
"""
A class for processing and managing station data.
This class extends `StationDataDownloader` to add functionality for
reformatting data, interacting with a database, and managing the
overall data processing workflow.
Parameters
----------
config : configparser.ConfigParser or dict
A configuration object with station details.
engine : sqlalchemy.engine.base.Engine
A SQLAlchemy engine for database connections.
logger : logging.Logger, optional
A logger for logging messages.
Attributes
----------
engine : sqlalchemy.engine.base.Engine
The SQLAlchemy engine instance.
"""
[docs]
def __init__(
self,
config: Union[configparser.ConfigParser, dict],
engine: sqlalchemy.engine.base.Engine,
logger: logging.Logger = None,
):
"""
Initialize the StationDataProcessor.
Parameters
----------
config : configparser.ConfigParser or dict
A configuration object with station details.
engine : sqlalchemy.engine.base.Engine
A SQLAlchemy engine for database connections.
logger : logging.Logger, optional
A logger for logging messages.
"""
super().__init__(config, logger)
self.config = config
self.engine = engine
self.logger = logger_check(logger)
[docs]
def get_station_data(
self,
station: str,
reformat: bool = True,
loggertype: str = "eddy",
config_path: str = "./data/reformatter_vars.yml",
var_limits_csv: str = "./data/extreme_values.csv",
drop_soil: bool = False,
) -> Tuple[Optional[pd.DataFrame], Optional[float]]:
"""
Fetch and process data for a single station.
This method downloads data from a station, optionally reformats
it, and returns the processed data.
Parameters
----------
station : str
The identifier for the station.
reformat : bool, optional
Whether to reformat the downloaded data. Defaults to True.
loggertype : str, optional
The type of logger ('eddy' or 'met'). Defaults to 'eddy'.
config_path : str, optional
The path to the reformatter configuration file.
var_limits_csv : str, optional
The path to the variable limits CSV file.
drop_soil : bool, optional
Whether to drop soil-related data. Defaults to False.
Returns
-------
tuple[pd.DataFrame | None, float | None]
A tuple containing the processed DataFrame and the size of
the downloaded data packet in MB.
"""
last_date = self.get_max_date(station, loggertype)
raw_data, pack_size, status_code = self.download_from_station(
station,
loggertype=loggertype,
mode="since-time",
p1=f"{last_date:%Y-%m-%d}",
)
if status_code == 200:
if raw_data is not None and reformat:
am_data = Reformatter(
config_path=config_path,
var_limits_csv=var_limits_csv,
drop_soil=drop_soil,
)
am_df = am_data.prepare(raw_data)
# am_data = Reformatter(raw_data)
# am_df = am_data.et_data
else:
am_df = raw_data
return am_df, pack_size
self.logger.error(f"Error fetching station data: {status_code}")
return None, None
[docs]
@staticmethod
def remove_existing_records(
df: pd.DataFrame,
column_to_check: str,
values_to_remove: list,
logger: logging.Logger = None,
) -> pd.DataFrame:
"""
Remove rows from a DataFrame that already exist in the database.
Parameters
----------
df : pd.DataFrame
The input DataFrame.
column_to_check : str
The name of the column to check for existing values.
values_to_remove : list
A list of values to be removed from the DataFrame.
logger : logging.Logger, optional
A logger for logging messages. Defaults to None.
Returns
-------
pd.DataFrame
The DataFrame with existing records removed.
"""
logger = logger_check(logger)
column_variations = [
column_to_check,
column_to_check.upper(),
column_to_check.lower(),
]
for col in column_variations:
if col in df.columns:
logger.info(f"Column '{col}' found in DataFrame")
remaining = df[~df[col].isin(values_to_remove)]
logger.info(f"{len(remaining)} records remaining after filtering")
logger.info(f"Removing {len(df) - len(remaining)} records")
return remaining
raise ValueError(f"Column '{column_to_check}' not found in DataFrame")
[docs]
def compare_sql_to_station(
self,
df: pd.DataFrame,
station: str,
field: str = "timestamp_end",
loggertype: str = "eddy",
) -> pd.DataFrame:
"""
Compare station data with records in the database and filter new entries.
Parameters
----------
df : pd.DataFrame
The DataFrame containing the station data.
station : str
The identifier for the station.
field : str, optional
The field to use for comparison. Defaults to "timestamp_end".
loggertype : str, optional
The type of logger ('eddy' or 'met'). Defaults to 'eddy'.
Returns
-------
pd.DataFrame
A DataFrame containing only the new records.
"""
table = f"amflux{loggertype}"
query = f"SELECT {field} FROM {table} WHERE stationid = '{station}';"
exist = pd.read_sql(query, con=self.engine)
existing = exist["timestamp_end"].values
return self.remove_existing_records(df, field, existing, self.logger)
[docs]
def get_max_date(self, station: str, loggertype: str = "eddy") -> datetime.datetime:
"""
Get the maximum timestamp from the station's data in the database.
Parameters
----------
station : str
The identifier for the station.
loggertype : str, optional
The type of logger ('eddy' or 'met'). Defaults to 'eddy'.
Returns
-------
datetime.datetime
The latest timestamp found in the database for the station.
"""
table = f"amflux{loggertype}"
query = f"SELECT MAX(timestamp_end) AS max_value FROM {table} WHERE stationid = '{station}';"
df = pd.read_sql(query, con=self.engine)
return df["max_value"].iloc[0]
[docs]
def database_columns(self, dat: str) -> list:
"""
Get the list of column names for a given database table.
Parameters
----------
dat : str
The type of data ('eddy' or 'met'), which corresponds to
the table name.
Returns
-------
list
A list of column names in the specified table.
"""
table = f"amflux{dat}"
query = f"SELECT * FROM {table} LIMIT 0;"
df = pd.read_sql(query, con=self.engine)
return df.columns.tolist()
[docs]
def process_station_data(
self,
site_folders: dict,
config_path: str = "./data/reformatter_vars.yml",
var_limits_csv: str = "./data/extreme_values.csv",
) -> None:
"""
Process and upload data for all specified stations.
This method iterates through a dictionary of site folders,
fetches data for each station, processes it, and uploads it
to the database.
Parameters
----------
site_folders : dict
A dictionary mapping station IDs to folder names.
config_path : str, optional
The path to the reformatter configuration file.
Defaults to "./data/reformatter_vars.yml".
var_limits_csv : str, optional
The path to the variable limits CSV file.
Defaults to "./data/extreme_values.csv".
"""
for stationid, name in site_folders.items():
station = self.get_station_id(stationid)
self.logger.info(f"Processing station: {stationid}")
for dat in ["eddy", "met"]:
if dat not in self.config[station]:
continue
try:
stationtime, comptime = self.get_times(station, loggertype=dat)
am_df, pack_size = self.get_station_data(
station,
loggertype=dat,
config_path=config_path,
var_limits_csv=var_limits_csv,
)
except Exception as e:
self.logger.error(f"Error fetching data for {stationid}: {e}")
continue
if am_df is None:
self.logger.warning(f"No data for {stationid}")
continue
am_cols = self.database_columns(dat)
am_df_filt = self.compare_sql_to_station(am_df, station, loggertype=dat)
self.logger.info(f"Filtered {len(am_df_filt)} records")
stats = self._prepare_upload_stats(
am_df_filt,
stationid,
dat,
pack_size,
len(am_df),
len(am_df_filt),
stationtime,
comptime,
)
# Upload data
am_df_filt = am_df_filt.rename(columns=str.lower)
# Check for columns that are not in the database
upload_cols = []
for col in am_df_filt.columns:
if col in am_cols:
upload_cols.append(col)
self._upload_to_database(am_df_filt[upload_cols], stats, dat)
self._print_processing_summary(station, stats, self.logger)
def _prepare_upload_stats(
self,
df: pd.DataFrame,
stationid: str,
tabletype: str,
pack_size: float,
raw_len: int,
filtered_len: int,
stationtime: str,
comptime: str,
) -> dict:
"""
Prepare a dictionary of statistics about the data upload.
Parameters
----------
df : pd.DataFrame
The DataFrame being uploaded.
stationid : str
The identifier for the station.
tabletype : str
The type of data table.
pack_size : float
The size of the data packet in MB.
raw_len : int
The number of rows in the raw data.
filtered_len : int
The number of rows after filtering.
stationtime : str
The timestamp from the station's logger.
comptime : str
The timestamp from the system running the script.
Returns
-------
dict
A dictionary of upload statistics.
"""
return {
"stationid": stationid,
"talbetype": tabletype,
"mindate": df["TIMESTAMP_START"].min(),
"maxdate": df["TIMESTAMP_START"].max(),
"datasize_mb": pack_size,
"stationdf_len": raw_len,
"uploaddf_len": filtered_len,
"stationtime": stationtime,
"comptime": comptime,
"micromet_version": micromet_version,
}
def _upload_to_database(self, df: pd.DataFrame, stats: dict, dat: str) -> None:
"""
Upload data and statistics to the database.
Parameters
----------
df : pd.DataFrame
The DataFrame to be uploaded.
stats : dict
A dictionary of statistics to be uploaded.
dat : str
The type of data ('eddy' or 'met'), used to determine the
table name.
"""
df.to_sql(f"amflux{dat}", con=self.engine, if_exists="append", index=False)
pd.DataFrame([stats]).to_sql(
"uploadstats", con=self.engine, if_exists="append", index=False
)
@staticmethod
def _print_processing_summary(
station: str, stats: dict, logger: logging.Logger = None
) -> None:
"""
Print a summary of the data processing.
Parameters
----------
station : str
The identifier for the station.
stats : dict
A dictionary of statistics from the processing.
logger : logging.Logger, optional
A logger for outputting the summary. Defaults to None.
"""
logger = logger_check(logger)
logger.info(f"Station {station}")
logger.info(f"Mindate {stats['mindate']} Maxdate {stats['maxdate']}")
logger.info(f"data size = {stats['datasize_mb']}")
logger.info(f"{stats['uploaddf_len']} vs {stats['stationdf_len']} rows")