micromet package

Subpackages

Submodules

micromet.pipeline module

Complete pipeline for processing micrometeorological data with Micromet.

This module provides high-level orchestration for the complete data processing workflow, from raw data files to cleaned, validated, and analyzed datasets.

Classes

Pipeline : Main orchestration class for data processing PipelineConfig : Configuration container for pipeline settings ProcessingResult : Container for processing results and metadata

Functions

run_pipeline : Convenience function to run complete pipeline process_station : Process a single station’s data batch_process : Process multiple stations

Examples

Basic usage:

>>> from micromet.pipeline import Pipeline
>>>
>>> # Process a single file
>>> pipeline = Pipeline()
>>> result = pipeline.process_file(
...     'data/US-UTW_Flux.dat',
...     site_id='US-UTW'
... )
>>>
>>> # Batch process all stations
>>> results = pipeline.batch_process(
...     input_dir='./raw_data',
...     output_dir='./processed_data'
... )

Command-line usage:

$ python -m micromet.pipeline –site US-UTW –input data/ –output results/ $ python -m micromet.pipeline –batch –input data/ –output results/

class micromet.pipeline.Pipeline(config=None, logger=None)[source]

Bases: object

Main orchestration class for micrometeorological data processing.

This class coordinates the complete workflow from raw data files to cleaned, validated, and analyzed datasets.

Parameters:
config

Pipeline configuration.

Type:

PipelineConfig

logger

Logger instance.

Type:

logging.Logger

reader

Data reader instance.

Type:

AmerifluxDataProcessor

__init__(config=None, logger=None)[source]

Initialize the Pipeline.

batch_process(input_dir, output_dir, pattern='*Flux*.dat', data_type='eddy')[source]

Process multiple files in a directory.

Parameters:
  • input_dir (Union[str, Path]) – Directory containing input files.

  • output_dir (Union[str, Path]) – Directory for output files.

  • pattern (str) – Glob pattern for finding input files. Defaults to “Flux.dat”.

  • data_type (str) – Type of data (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

Results for all processed files.

Return type:

List[ProcessingResult]

process_file(input_file, site_id=None, output_dir=None, data_type='eddy')[source]

Process a single data file through the complete pipeline.

Parameters:
  • input_file (Union[str, Path]) – Path to input data file.

  • site_id (Optional[str]) – Station identifier. If None, attempts to extract from filename.

  • output_dir (Union[str, Path, None]) – Directory for output files. If None, uses input file directory.

  • data_type (str) – Type of data (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

Container with processing results and metadata.

Return type:

ProcessingResult

process_station(site_id, input_dir, output_dir, data_types=['eddy', 'met'])[source]

Process all data types for a single station.

Parameters:
  • site_id (str) – Station identifier (e.g., ‘US-UTW’).

  • input_dir (Union[str, Path]) – Directory containing input files.

  • output_dir (Union[str, Path]) – Directory for output files.

  • data_types (List[str]) – Data types to process. Defaults to [‘eddy’, ‘met’].

Returns:

Dictionary mapping data_type to ProcessingResult.

Return type:

Dict[str, ProcessingResult]

class micromet.pipeline.PipelineConfig(check_timestamps=True, drop_soil=True, generate_reports=True, generate_plots=False, save_intermediate=False, var_limits_csv=None, expected_freq='30min', output_format='csv')[source]

Bases: object

Configuration settings for the data processing pipeline.

check_timestamps

Whether to perform timestamp alignment analysis (slower but thorough).

Type:

bool

drop_soil

Whether to drop extra soil sensor columns.

Type:

bool

generate_reports

Whether to generate validation and gap reports.

Type:

bool

generate_plots

Whether to generate diagnostic plots.

Type:

bool

save_intermediate

Whether to save intermediate processing steps.

Type:

bool

var_limits_csv

Path to custom variable limits CSV file.

Type:

Path or None

expected_freq

Expected data frequency (e.g., ‘30min’).

Type:

str

output_format

Output file format (‘csv’, ‘parquet’, ‘feather’).

Type:

str

check_timestamps: bool = True
drop_soil: bool = True
expected_freq: str = '30min'
generate_plots: bool = False
generate_reports: bool = True
output_format: str = 'csv'
save_intermediate: bool = False
to_dict()[source]

Convert configuration to dictionary.

Return type:

dict

var_limits_csv: Path | None = None
class micromet.pipeline.ProcessingResult(site_id, success, input_file, output_file=None, n_records_input=0, n_records_output=0, n_flagged=0, processing_time=0.0, timestamp_issues=None, error_message=None, reports=<factory>)[source]

Bases: object

Container for processing results and metadata.

site_id

Station identifier.

Type:

str

success

Whether processing completed successfully.

Type:

bool

input_file

Path to input file.

Type:

Path

output_file

Path to output file (if saved).

Type:

Path or None

n_records_input

Number of records in input data.

Type:

int

n_records_output

Number of records in output data.

Type:

int

n_flagged

Number of records flagged during QA/QC.

Type:

int

processing_time

Processing time in seconds.

Type:

float

timestamp_issues

Detected timestamp alignment issues.

Type:

dict or None

error_message

Error message if processing failed.

Type:

str or None

reports

Dictionary of generated reports.

Type:

dict

error_message: str | None = None
input_file: Path
n_flagged: int = 0
n_records_input: int = 0
n_records_output: int = 0
output_file: Path | None = None
processing_time: float = 0.0
reports: Dict
site_id: str
success: bool
summary()[source]

Generate a human-readable summary.

Return type:

str

timestamp_issues: Dict | None = None
to_dict()[source]

Convert result to dictionary.

Return type:

dict

micromet.pipeline.batch_process(input_dir, output_dir, **kwargs)[source]

Convenience function for batch processing.

Parameters:
  • input_dir (Union[str, Path]) – Input directory.

  • output_dir (Union[str, Path]) – Output directory.

  • **kwargs – Additional arguments passed to Pipeline constructor.

Returns:

Results for all processed files.

Return type:

List[ProcessingResult]

micromet.pipeline.main()[source]

Command-line interface for the pipeline.

micromet.pipeline.process_station(site_id, input_dir, output_dir, **kwargs)[source]

Convenience function to process a single station.

Parameters:
  • site_id (str) – Station identifier.

  • input_dir (Union[str, Path]) – Input directory.

  • output_dir (Union[str, Path]) – Output directory.

  • **kwargs – Additional arguments passed to Pipeline constructor.

Returns:

Processing results for each data type.

Return type:

Dict[str, ProcessingResult]

micromet.reader module

This module provides the AmerifluxDataProcessor class for reading and parsing AmeriFlux-style CSV files (TOA5 or AmeriFlux output) into a pandas DataFrame.

class micromet.reader.AmerifluxDataProcessor(logger=None)[source]

Bases: object

A class for reading and parsing AmeriFlux-style CSV files.

This class is designed to handle Campbell Scientific TOA5 files or standard AmeriFlux output files, parsing them into a pandas DataFrame.

Parameters:

logger (Logger) – A logger for tracking the data processing. If not provided, a default logger is used.

logger

The logger used for logging messages.

Type:

logging.Logger

skip_rows

The number of rows to skip at the beginning of the file.

Type:

int or list of int

names

The column names for the DataFrame.

Type:

list of str

NA_VALUES = ['-9999', 'NAN', 'NaN', 'nan', nan, -9999.0]
__init__(logger=None)[source]

Initialize the AmerifluxDataProcessor.

Parameters:

logger (Logger) – A logger for tracking the data processing. If not provided, a default logger is used.

iterate_through_stations()[source]

Iterate through all stations and compile their data.

This method iterates through a predefined list of stations, compiles the data for each station, and returns a dictionary of DataFrames.

Returns:

A dictionary where keys are station IDs and values are DataFrames of the compiled data for each station.

Return type:

dict

raw_file_compile(main_dir, station_folder_name, search_str='*Flux_AmeriFluxFormat*.dat')[source]

Compile raw AmeriFlux datalogger files into a single DataFrame.

This method searches for files matching a given pattern within a station’s directory, processes each file, and concatenates them into a single DataFrame.

Parameters:
  • main_dir (Union[str, Path]) – The main directory containing the station folders.

  • station_folder_name (Union[str, Path]) – The name of the station folder.

  • search_str (str) – The search string (glob pattern) for finding files to compile. Defaults to “Flux_AmeriFluxFormat.dat”.

Returns:

A DataFrame containing the compiled data, or None if no valid files were found.

Return type:

Optional[DataFrame]

to_dataframe(file)[source]

Read an AmeriFlux-style CSV file and return it as a pandas DataFrame.

This method first determines the header structure of the file and then reads the data into a DataFrame, handling missing values.

Parameters:

file (Union[str, Path]) – The path to the CSV file to be read.

Returns:

A DataFrame containing the parsed data from the file.

Return type:

DataFrame

micromet.station_data_pull module

class micromet.station_data_pull.StationDataDownloader(config, logger=None)[source]

Bases: object

A class to manage downloading data from a station’s logger.

This class handles the connection and data download from a Campbell Scientific data logger via its web API.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

config

The configuration object.

Type:

configparser.ConfigParser or dict

logger

The logger instance.

Type:

logging.Logger

logger_credentials

The authentication credentials for the logger.

Type:

requests.auth.HTTPBasicAuth

__init__(config, logger=None)[source]

Initialize the StationDataDownloader.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

download_from_station(station, loggertype='eddy', mode='since-time', p1='0', p2='0')[source]

Download data from a station’s logger.

This method constructs a request to the station’s web API to download data based on the specified parameters.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • mode (str) – The data query mode (‘since-time’, ‘most-recent’, etc.). Defaults to ‘since-time’.

  • p1 (str) – The primary parameter for the query (e.g., start time). Defaults to “0”.

  • p2 (str) – The secondary parameter for the query (e.g., end time). Defaults to “0”.

Returns:

A tuple containing the downloaded data as a DataFrame, the size of the data packet in MB, and the HTTP status code.

Return type:

tuple[pd.DataFrame | None, float | None, int]

static get_station_id(stationid)[source]

Extract the station ID from a full station identifier string.

Parameters:

stationid (str) – The full station identifier (e.g., ‘US-ABC’).

Returns:

The extracted station ID (e.g., ‘ABC’).

Return type:

str

get_times(station, loggertype='eddy')[source]

Retrieve the current time from the logger and the system.

This method queries a station’s logger for its current time and also gets the current system time for comparison.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A tuple containing the logger’s current time as a string and the system’s current time as a string.

Return type:

Tuple[Optional[str], str]

class micromet.station_data_pull.StationDataProcessor(config, engine, logger=None)[source]

Bases: StationDataDownloader

A class for processing and managing station data.

This class extends StationDataDownloader to add functionality for reformatting data, interacting with a database, and managing the overall data processing workflow.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

engine

The SQLAlchemy engine instance.

Type:

sqlalchemy.engine.base.Engine

__init__(config, engine, logger=None)[source]

Initialize the StationDataProcessor.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

compare_sql_to_station(df, station, field='timestamp_end', loggertype='eddy')[source]

Compare station data with records in the database and filter new entries.

Parameters:
  • df (DataFrame) – The DataFrame containing the station data.

  • station (str) – The identifier for the station.

  • field (str) – The field to use for comparison. Defaults to “timestamp_end”.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A DataFrame containing only the new records.

Return type:

DataFrame

database_columns(dat)[source]

Get the list of column names for a given database table.

Parameters:

dat (str) – The type of data (‘eddy’ or ‘met’), which corresponds to the table name.

Returns:

A list of column names in the specified table.

Return type:

list

get_max_date(station, loggertype='eddy')[source]

Get the maximum timestamp from the station’s data in the database.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

The latest timestamp found in the database for the station.

Return type:

datetime

get_station_data(station, reformat=True, loggertype='eddy', config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv', drop_soil=False)[source]

Fetch and process data for a single station.

This method downloads data from a station, optionally reformats it, and returns the processed data.

Parameters:
  • station (str) – The identifier for the station.

  • reformat (bool) – Whether to reformat the downloaded data. Defaults to True.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • config_path (str) – The path to the reformatter configuration file.

  • var_limits_csv (str) – The path to the variable limits CSV file.

  • drop_soil (bool) – Whether to drop soil-related data. Defaults to False.

Returns:

A tuple containing the processed DataFrame and the size of the downloaded data packet in MB.

Return type:

Tuple[Optional[DataFrame], Optional[float]]

process_station_data(site_folders, config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv')[source]

Process and upload data for all specified stations.

This method iterates through a dictionary of site folders, fetches data for each station, processes it, and uploads it to the database.

Parameters:
  • site_folders (dict) – A dictionary mapping station IDs to folder names.

  • config_path (str) – The path to the reformatter configuration file. Defaults to “./data/reformatter_vars.yml”.

  • var_limits_csv (str) – The path to the variable limits CSV file. Defaults to “./data/extreme_values.csv”.

Return type:

None

static remove_existing_records(df, column_to_check, values_to_remove, logger=None)[source]

Remove rows from a DataFrame that already exist in the database.

Parameters:
  • df (DataFrame) – The input DataFrame.

  • column_to_check (str) – The name of the column to check for existing values.

  • values_to_remove (list) – A list of values to be removed from the DataFrame.

  • logger (Logger) – A logger for logging messages. Defaults to None.

Returns:

The DataFrame with existing records removed.

Return type:

DataFrame

micromet.station_info module

This module contains station-specific information, such as site folders and logger IDs.

micromet.utils module

Utility functions for the micromet package.

micromet.utils.create_reformatter_from_site(site_id, config_dir='src/micromet/data', check_timestamps=True, **reformatter_kwargs)[source]

Create a Reformatter instance with site configuration loaded from .ini file.

This is a convenience factory function that reads the site configuration and creates a properly configured Reformatter instance.

Parameters:
  • site_id (str) – The site identifier (e.g., ‘US-CdM’, ‘US-UTW’).

  • config_dir (Path | str) – Directory containing the .ini files. Defaults to ‘src/micromet/data’.

  • check_timestamps (bool) – Whether to enable timestamp checking. Defaults to True.

  • **reformatter_kwargs – Additional keyword arguments passed to Reformatter (e.g., drop_soil, var_limits_csv).

Returns:

A configured Reformatter instance.

Return type:

Reformatter

Examples

>>> reformatter = create_reformatter_from_site('US-CdM')
>>> df_clean, report, ts_results = reformatter.process(raw_data)
>>> # With additional options
>>> reformatter = create_reformatter_from_site(
...     'US-UTW',
...     drop_soil=False,
...     check_timestamps=True
... )
>>> # Disable timestamp checking for speed
>>> reformatter = create_reformatter_from_site(
...     'US-UTB',
...     check_timestamps=False
... )
micromet.utils.extract_config_for_reformatter(site_id, config_dir='src/micromet/data')[source]

Extract only the values needed for Reformatter from a site config.

This is a convenience function that returns just the three values needed to initialize a Reformatter with timestamp checking.

Parameters:
  • site_id (str) – The site identifier (e.g., ‘US-CdM’).

  • config_dir (Path | str) – Directory containing the .ini files.

Returns:

A tuple of (site_lat, site_lon, site_utc_offset).

Return type:

Tuple[float, float, float]

Examples

>>> lat, lon, utc = extract_config_for_reformatter('US-CdM')
>>> lat, lon, utc
(37.5241, -109.7471, -7.0)
micromet.utils.get_all_site_configs(config_dir='src/micromet/data')[source]

Read all site configurations from .ini files in a directory.

Parameters:

config_dir (Path | str) – Directory containing the .ini files. Defaults to ‘src/micromet/data’.

Returns:

Dictionary mapping site_id to configuration dictionaries.

Return type:

Dict[str, Dict[str, float | str]]

Examples

>>> all_configs = get_all_site_configs()
>>> all_configs['US-CdM']['site_lat']
37.5241
>>> list(all_configs.keys())
['US-CdM', 'US-UTB', 'US-UTD', ...]
micromet.utils.load_yaml(path)[source]

Load a YAML file and return its contents as a dictionary.

Parameters:

path (Path | str) – The path to the YAML file.

Returns:

The contents of the YAML file as a dictionary.

Return type:

Dict

Raises:

FileNotFoundError – If the specified file does not exist.

micromet.utils.logger_check(logger)[source]

Initialize and return a logger instance if none is provided.

This function checks if a logger object is provided. If not, it creates and configures a new logger.

Parameters:

logger (Logger | None) – An existing logger instance.

Returns:

A configured logger instance.

Return type:

Logger

micromet.utils.read_site_config(site_id, config_dir='src/micromet/data')[source]

Read site configuration from an .ini file.

Parameters:
  • site_id (str) – The site identifier (e.g., ‘US-CdM’, ‘US-UTW’).

  • config_dir (Path | str) – Directory containing the .ini files. Defaults to ‘src/micromet/data’.

Returns:

Dictionary with keys: - ‘site_lat’: float - Station latitude - ‘site_lon’: float - Station longitude - ‘site_utc_offset’: float - UTC offset in hours - ‘site_elevation’: float - Station elevation in meters - ‘site_name’: str - Full station name - ‘site_id’: str - Station identifier

Return type:

Dict[str, float | str]

Raises:

Examples

>>> config = read_site_config('US-CdM')
>>> config['site_lat']
37.5241
>>> config['site_utc_offset']
-7.0

Module contents

Micromet: A package for processing and analyzing micrometeorological data.

This package provides a collection of tools for reading, reformatting, performing quality control, and generating reports from micrometeorological and flux data, particularly from AmeriFlux-style data sources.

The main components of the package are: - AmerifluxDataProcessor: For reading and parsing data files. - Reformatter: For cleaning and standardizing data. - tools: A collection of utility functions for analysis. - graphs: For creating various plots and visualizations. - StationDataDownloader: For downloading data from stations. - StationDataProcessor: For processing and managing station data.

class micromet.AmerifluxDataProcessor(logger=None)[source]

Bases: object

A class for reading and parsing AmeriFlux-style CSV files.

This class is designed to handle Campbell Scientific TOA5 files or standard AmeriFlux output files, parsing them into a pandas DataFrame.

Parameters:

logger (Logger) – A logger for tracking the data processing. If not provided, a default logger is used.

logger

The logger used for logging messages.

Type:

logging.Logger

skip_rows

The number of rows to skip at the beginning of the file.

Type:

int or list of int

names

The column names for the DataFrame.

Type:

list of str

NA_VALUES = ['-9999', 'NAN', 'NaN', 'nan', nan, -9999.0]
__init__(logger=None)[source]

Initialize the AmerifluxDataProcessor.

Parameters:

logger (Logger) – A logger for tracking the data processing. If not provided, a default logger is used.

iterate_through_stations()[source]

Iterate through all stations and compile their data.

This method iterates through a predefined list of stations, compiles the data for each station, and returns a dictionary of DataFrames.

Returns:

A dictionary where keys are station IDs and values are DataFrames of the compiled data for each station.

Return type:

dict

raw_file_compile(main_dir, station_folder_name, search_str='*Flux_AmeriFluxFormat*.dat')[source]

Compile raw AmeriFlux datalogger files into a single DataFrame.

This method searches for files matching a given pattern within a station’s directory, processes each file, and concatenates them into a single DataFrame.

Parameters:
  • main_dir (Union[str, Path]) – The main directory containing the station folders.

  • station_folder_name (Union[str, Path]) – The name of the station folder.

  • search_str (str) – The search string (glob pattern) for finding files to compile. Defaults to “Flux_AmeriFluxFormat.dat”.

Returns:

A DataFrame containing the compiled data, or None if no valid files were found.

Return type:

Optional[DataFrame]

to_dataframe(file)[source]

Read an AmeriFlux-style CSV file and return it as a pandas DataFrame.

This method first determines the header structure of the file and then reads the data into a DataFrame, handling missing values.

Parameters:

file (Union[str, Path]) – The path to the CSV file to be read.

Returns:

A DataFrame containing the parsed data from the file.

Return type:

DataFrame

class micromet.DateRangeDrop(column, start, end)[source]

Bases: object

A date range within which a column’s values should be set to NaN.

column: str
end: str
start: str
class micromet.FlagWindow(flag_columns, start, end, flag_value=2)[source]

Bases: object

A time window for applying a quality flag value to one or more columns.

end: str
flag_columns: List[str]
flag_value: int = 2
start: str
class micromet.Reformatter(var_limits_csv=None, drop_soil=True, check_timestamps=False, site_lat=None, site_lon=None, site_utc_offset=-7, logger=None)[source]

Bases: object

A class to clean and standardize station data for flux/met processing.

This class provides a pipeline for preparing raw station data by applying a series of transformations, including fixing timestamps, renaming columns, applying physical limits, and checking timestamp alignment.

Parameters:
  • var_limits_csv (str | Path | None) – Path to a CSV file containing variable limits. If not provided, default limits are used.

  • drop_soil (bool) – If True, extra soil-related columns are dropped. Defaults to True.

  • check_timestamps (bool) – If True, perform timestamp alignment analysis on radiation data. Defaults to False.

  • site_lat (float | None) – Latitude of the site (required if check_timestamps=True).

  • site_lon (float | None) – Longitude of the site (required if check_timestamps=True).

  • site_utc_offset (int) – UTC offset in hours for the site (required if check_timestamps=True).

  • logger (Logger | None) – A logger for tracking the reformatting process. If not provided, a default logger is used.

logger

The logger used for logging messages.

Type:

logging.Logger

config

A dictionary of configuration parameters for the reformatting process.

Type:

dict

varlimits

A DataFrame containing the physical limits for each variable.

Type:

pd.DataFrame

drop_soil

A flag indicating whether to drop extra soil columns.

Type:

bool

check_timestamps

A flag indicating whether to perform timestamp alignment checks.

Type:

bool

site_lat

The latitude of the site.

Type:

float

site_lon

The longitude of the site.

Type:

float

site_utc_offset

The UTC offset of the site in hours.

Type:

float

__init__(var_limits_csv=None, drop_soil=True, check_timestamps=False, site_lat=None, site_lon=None, site_utc_offset=-7, logger=None)[source]

Initialize the Reformatter.

Parameters:
  • var_limits_csv (str | Path | None) – Path to a CSV file containing variable limits.

  • drop_soil (bool) – If True, extra soil-related columns are dropped. Defaults to True.

  • check_timestamps (bool) – If True, perform timestamp alignment analysis. Defaults to False.

  • site_lat (float | None) – Latitude of the site (required if check_timestamps=True).

  • site_lon (float | None) – Longitude of the site (required if check_timestamps=True).

  • site_utc_offset (int) – UTC offset in hours (required if check_timestamps=True).

  • logger (Logger | None) – A logger for tracking the reformatting process.

finalize(df)[source]

Finalize the data by applying cleaning and standardization steps.

prepare(df, interval=30, data_type='eddy')[source]

Current method - keep for backward compatibility

preprocess(df, data_type='eddy', interval=30)[source]

Preprocess the data by applying initial cleaning and standardization steps.

process(df, interval, data_type='eddy')[source]

Prepare the data by applying a series of cleaning and standardization steps.

This method takes a DataFrame of station data and applies a pipeline of transformations to clean and standardize it. The steps include fixing timestamps, renaming columns, setting numeric types, resampling, applying physical limits, and optionally checking timestamp alignment.

Parameters:
  • df (DataFrame) – The input DataFrame of station data.

  • data_type (str) – The type of data being processed (e.g., ‘eddy’, ‘met’). This is used to determine which column renaming map to use. Defaults to ‘eddy’.

  • interval (int) – The sampling interval used with the data; must be either 30 or 60 minutes

Returns:

A tuple containing: - The prepared DataFrame with standardized and cleaned data. - A report DataFrame detailing the changes made during the

application of physical limits.

  • A dictionary with timestamp alignment results (if check_timestamps=True), or None otherwise. Contains keys: ‘summary’, ‘composites’, ‘flags’.

Return type:

Tuple[DataFrame, DataFrame, Optional[Dict]]

class micromet.SiteCorrections(sg_correction_factor=None, sg_correction_vars=<factory>, sg_correction_end=None, precip_correction_factor=None, precip_correction_end=None, precip_bad_before=None, wind_direction_offset=None, wind_direction_change_date=None, date_range_drops=<factory>, h2o_flag_windows=<factory>, co2_flag_windows=<factory>, wind_flag_bad_range=None, wind_flag_marginal_ranges=<factory>, signal_strength_threshold=0.8, drop_precip_on_visits=True, csflux_join_cols=None, columns_to_drop_from_merge=None, soilvue_bad_ec_threshold=None, extra_drops=<factory>)[source]

Bases: object

Declarative specification of site-specific corrections applied during QC.

All fields are optional; only the corrections relevant to a given station need to be populated.

Parameters:
  • sg_correction_factor (Optional[float]) – Multiplicative factor for soil-heat-flux storage (SG) sensors.

  • sg_correction_vars (List[str]) – Columns to which sg_correction_factor applies.

  • sg_correction_end (Optional[str]) – Datetime string; correction is applied to data before this date.

  • precip_correction_factor (Optional[float]) – Multiplicative factor for precipitation before a program fix date.

  • precip_correction_end (Optional[str]) – Datetime string; precip correction is applied before this date.

  • precip_bad_before (Optional[str]) – Drop all precip data before this date (e.g. broken bucket).

  • wind_direction_offset (Optional[float]) – Degrees to subtract from WD_1_1_1 before the change date.

  • wind_direction_change_date (Optional[str]) – Datetime string when the IRGASON orientation changed.

  • date_range_drops (List[DateRangeDrop]) – Specific column/date-range pairs to null out (spikes, sensor issues).

  • h2o_flag_windows (List[FlagWindow]) – Windows to flag H2O signal-strength issues.

  • co2_flag_windows (List[FlagWindow]) – Windows to flag CO2 signal-strength issues.

  • wind_flag_bad_range (Optional[Tuple[float, float]]) – (start_deg, end_deg) range of wind directions flagged as 2 (bad).

  • wind_flag_marginal_ranges (List[Tuple[float, float]]) – List of (start_deg, end_deg) ranges flagged as 1 (marginal).

  • signal_strength_threshold (float) – Threshold below which signal-strength data is flagged.

  • drop_precip_on_visits (bool) – Whether to zero-out precipitation on station-visit days.

  • csflux_join_cols (Optional[List[str]]) – Subset of CSFlux columns to merge into the final eddy dataset. If None, a default set is used.

  • columns_to_drop_from_merge (Optional[List[str]]) – Columns to drop after the eddy/met merge (e.g. RECORD, G_1_1_A).

  • soilvue_bad_ec_threshold (Optional[float]) – Minimum EC_3_7_1 value; rows below are dropped for SoilVue columns.

  • extra_drops (List[DateRangeDrop]) – Additional ad-hoc date/column drops.

co2_flag_windows: List[FlagWindow]
columns_to_drop_from_merge: List[str] | None = None
csflux_join_cols: List[str] | None = None
date_range_drops: List[DateRangeDrop]
drop_precip_on_visits: bool = True
extra_drops: List[DateRangeDrop]
h2o_flag_windows: List[FlagWindow]
precip_bad_before: str | None = None
precip_correction_end: str | None = None
precip_correction_factor: float | None = None
sg_correction_end: str | None = None
sg_correction_factor: float | None = None
sg_correction_vars: List[str]
signal_strength_threshold: float = 0.8
soilvue_bad_ec_threshold: float | None = None
wind_direction_change_date: str | None = None
wind_direction_offset: float | None = None
wind_flag_bad_range: Tuple[float, float] | None = None
wind_flag_marginal_ranges: List[Tuple[float, float]]
class micromet.StationDataDownloader(config, logger=None)[source]

Bases: object

A class to manage downloading data from a station’s logger.

This class handles the connection and data download from a Campbell Scientific data logger via its web API.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

config

The configuration object.

Type:

configparser.ConfigParser or dict

logger

The logger instance.

Type:

logging.Logger

logger_credentials

The authentication credentials for the logger.

Type:

requests.auth.HTTPBasicAuth

__init__(config, logger=None)[source]

Initialize the StationDataDownloader.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

download_from_station(station, loggertype='eddy', mode='since-time', p1='0', p2='0')[source]

Download data from a station’s logger.

This method constructs a request to the station’s web API to download data based on the specified parameters.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • mode (str) – The data query mode (‘since-time’, ‘most-recent’, etc.). Defaults to ‘since-time’.

  • p1 (str) – The primary parameter for the query (e.g., start time). Defaults to “0”.

  • p2 (str) – The secondary parameter for the query (e.g., end time). Defaults to “0”.

Returns:

A tuple containing the downloaded data as a DataFrame, the size of the data packet in MB, and the HTTP status code.

Return type:

tuple[pd.DataFrame | None, float | None, int]

static get_station_id(stationid)[source]

Extract the station ID from a full station identifier string.

Parameters:

stationid (str) – The full station identifier (e.g., ‘US-ABC’).

Returns:

The extracted station ID (e.g., ‘ABC’).

Return type:

str

get_times(station, loggertype='eddy')[source]

Retrieve the current time from the logger and the system.

This method queries a station’s logger for its current time and also gets the current system time for comparison.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A tuple containing the logger’s current time as a string and the system’s current time as a string.

Return type:

Tuple[Optional[str], str]

class micromet.StationDataProcessor(config, engine, logger=None)[source]

Bases: StationDataDownloader

A class for processing and managing station data.

This class extends StationDataDownloader to add functionality for reformatting data, interacting with a database, and managing the overall data processing workflow.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

engine

The SQLAlchemy engine instance.

Type:

sqlalchemy.engine.base.Engine

__init__(config, engine, logger=None)[source]

Initialize the StationDataProcessor.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

compare_sql_to_station(df, station, field='timestamp_end', loggertype='eddy')[source]

Compare station data with records in the database and filter new entries.

Parameters:
  • df (DataFrame) – The DataFrame containing the station data.

  • station (str) – The identifier for the station.

  • field (str) – The field to use for comparison. Defaults to “timestamp_end”.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A DataFrame containing only the new records.

Return type:

DataFrame

database_columns(dat)[source]

Get the list of column names for a given database table.

Parameters:

dat (str) – The type of data (‘eddy’ or ‘met’), which corresponds to the table name.

Returns:

A list of column names in the specified table.

Return type:

list

get_max_date(station, loggertype='eddy')[source]

Get the maximum timestamp from the station’s data in the database.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

The latest timestamp found in the database for the station.

Return type:

datetime

get_station_data(station, reformat=True, loggertype='eddy', config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv', drop_soil=False)[source]

Fetch and process data for a single station.

This method downloads data from a station, optionally reformats it, and returns the processed data.

Parameters:
  • station (str) – The identifier for the station.

  • reformat (bool) – Whether to reformat the downloaded data. Defaults to True.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • config_path (str) – The path to the reformatter configuration file.

  • var_limits_csv (str) – The path to the variable limits CSV file.

  • drop_soil (bool) – Whether to drop soil-related data. Defaults to False.

Returns:

A tuple containing the processed DataFrame and the size of the downloaded data packet in MB.

Return type:

Tuple[Optional[DataFrame], Optional[float]]

process_station_data(site_folders, config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv')[source]

Process and upload data for all specified stations.

This method iterates through a dictionary of site folders, fetches data for each station, processes it, and uploads it to the database.

Parameters:
  • site_folders (dict) – A dictionary mapping station IDs to folder names.

  • config_path (str) – The path to the reformatter configuration file. Defaults to “./data/reformatter_vars.yml”.

  • var_limits_csv (str) – The path to the variable limits CSV file. Defaults to “./data/extreme_values.csv”.

Return type:

None

static remove_existing_records(df, column_to_check, values_to_remove, logger=None)[source]

Remove rows from a DataFrame that already exist in the database.

Parameters:
  • df (DataFrame) – The input DataFrame.

  • column_to_check (str) – The name of the column to check for existing values.

  • values_to_remove (list) – A list of values to be removed from the DataFrame.

  • logger (Logger) – A logger for logging messages. Defaults to None.

Returns:

The DataFrame with existing records removed.

Return type:

DataFrame

class micromet.WorkflowConfig(station='', interval=30, raw_data_root=PosixPath('.'), output_root=PosixPath('.'), amflux_var_file=None, preprocessed_dir=None, steps=<factory>, generate_plots=False, drop_soil=False, fetch_events_from_db=False, events_api_url='https://ugs-koop-umfdxaxiyq-wm.a.run.app', data_interval_label='HH', soilvue_g_calculation=False, soilvue_depths_cm=<factory>)[source]

Bases: object

Top-level configuration for the automated workflow.

Parameters:
  • station (str) – Station identifier (e.g. 'US-UTJ').

  • interval (int) – Data interval in minutes (30 or 60).

  • raw_data_root (Path) – Root folder containing compiled station data.

  • output_root (Path) – Root folder for processed outputs (raw/, qc/, ameriflux/ sub-dirs).

  • amflux_var_file (Optional[Path]) – Path to the AmeriFlux variable-name CSV. Used for column validation.

  • preprocessed_dir (Optional[Path]) – Directory for preprocessed parquet files. Defaults to raw_data_root / 'preprocessed_site_data'.

  • steps (List[int]) – Which workflow steps to run (1-4). Default is all.

  • generate_plots (bool) – Whether to generate review plots (notebooks 3b/4b).

  • drop_soil (bool) – Whether to drop extra soil columns during reformatter finalize.

  • fetch_events_from_db (bool) – Whether to pull station events from the UGS API.

  • events_api_url (str) – Base URL for the station events API.

  • data_interval_label (str) – AmeriFlux interval label ('HH' for half-hourly).

  • soilvue_g_calculation (bool) – Whether to calculate SoilVue G values using gradient+storage.

  • soilvue_depths_cm (List[float]) – SoilVue sensor depths in centimeters.

amflux_var_file: Path | None = None
data_interval_label: str = 'HH'
drop_soil: bool = False
events_api_url: str = 'https://ugs-koop-umfdxaxiyq-wm.a.run.app'
fetch_events_from_db: bool = False
generate_plots: bool = False
interval: int = 30
output_root: Path = PosixPath('.')
preprocessed_dir: Path | None = None
property preprocessed_path: Path
raw_data_root: Path = PosixPath('.')
soilvue_depths_cm: List[float]
soilvue_g_calculation: bool = False
station: str = ''
steps: List[int]
class micromet.WorkflowResult(station, success, steps_completed=<factory>, output_files=<factory>, reports=<factory>, errors=<factory>, processing_time=0.0)[source]

Bases: object

Container for results of a workflow run.

errors: Dict[int, str]
output_files: Dict[str, Path]
processing_time: float = 0.0
reports: Dict[str, Any]
station: str
steps_completed: List[int]
success: bool
summary()[source]
Return type:

str

class micromet.WorkflowRunner(config, corrections=None, logger=None)[source]

Bases: object

Orchestrates the full numbered-notebook workflow for a single station.

Parameters:
generate_review_plots(context)[source]

Generate time-series plots for all variables in the QC dataset.

Return type:

None

run()[source]

Execute the configured workflow steps in sequence.

Return type:

WorkflowResult

step1_compile_and_preprocess(context)[source]

Compile raw files and preprocess into parquet datasets.

Return type:

Dict[str, Any]

step2_create_raw_data(context)[source]

Merge data sources and create the combined raw dataset.

Return type:

Dict[str, Any]

step3_qc_data(context)[source]

Apply corrections, physical limits, QC, and flagging.

Return type:

Dict[str, Any]

step4_export_ameriflux(context)[source]

Export AmeriFlux-formatted CSV from QC data.

Return type:

Dict[str, Any]

micromet.run_workflow(station, raw_data_root, output_root, corrections=None, **kwargs)[source]

Convenience function to run the complete workflow for a station.

Parameters:
  • station (str) – Station identifier (e.g. 'US-UTJ').

  • raw_data_root (Union[str, Path]) – Root folder containing compiled station data.

  • output_root (Union[str, Path]) – Root folder for processed outputs.

  • corrections (Optional[SiteCorrections]) – Site-specific corrections.

  • **kwargs – Additional arguments passed to WorkflowConfig.

Return type:

WorkflowResult