micromet package

Submodules

micromet.converter module

micromet.graphs module

micromet.headers module

Header detection and repair utilities for delimited text files.

This module provides functions to detect missing headers in data files and repair them by borrowing headers from peer files. It supports both single-file processing and batch operations across directories.

Key Features

  • Automatic delimiter detection using csv.Sniffer with fallback heuristics

  • Header presence detection with multiple strategies

  • Peer file matching based on filename similarity and column count

  • Directory-based batch processing for duplicate files

  • Support for UTF-8, UTF-8-sig, and Latin-1 encodings

micromet.headers.apply_header(header_file, target_file, *, inplace=False)[source]

Apply a header from a reference file to a data file and return a DataFrame.

This function reads column names from header_file and applies them to target_file, which is assumed to lack a header row. The result is returned as a pandas DataFrame. Optionally, the function can overwrite target_file with the updated version, keeping a backup as *.bak.

Parameters:
  • header_file (Path) – Path to the file containing the correct column headers.

  • target_file (Path) – Path to the file that is missing column headers.

  • inplace (bool) – If True, the modified DataFrame is written back to target_file, and a backup of the original file is saved with a .bak extension. Default is False.

Returns:

DataFrame containing the contents of target_file with headers applied from header_file.

Return type:

DataFrame

Notes

The delimiter is inferred using a sniffing function to ensure consistent parsing between the header and target files.

micromet.headers.count_columns(path, delimiter)[source]

Count the number of columns in the first non-empty row of a file.

Parameters:
  • path (Path) – The path to the file.

  • delimiter (str) – The delimiter character to use for splitting rows into columns.

Returns:

The number of columns detected in the first non-empty row. Returns 0 if the file is empty or contains only empty rows.

Return type:

int

micromet.headers.detect_delimiter_and_header(path, sample_size=64000)[source]

Detect the delimiter and presence of a header in a text file.

Uses csv.Sniffer to determine the delimiter and whether a header row exists. Includes fallbacks for both detection steps if the sniffer fails.

Parameters:
  • path (Path) – The path to the file to inspect.

  • sample_size (int) – The number of bytes to read from the beginning of the file to use for detection. Defaults to 64,000.

Returns:

A tuple containing: - The detected delimiter character (e.g., ‘,’). - A boolean that is True if a header is detected, False otherwise.

Return type:

Tuple[str, bool]

micromet.headers.find_header_donor(target, delimiter, expected_cols, min_name_sim=0.4)[source]

Find a peer file to serve as a header “donor”.

Searches the same directory as the target file for a suitable file to borrow a header from. A donor is considered suitable if it: - Is a file with a common text extension. - Has a detectable header and the same delimiter. - Has the same number of columns as the target. - Has a filename similarity above min_name_sim.

Among candidates, the one with the closest modification time to the target is chosen. Ties are broken by selecting the one with the highest name similarity.

Parameters:
  • target (Path) – The path to the file that needs a header.

  • delimiter (str) – The delimiter used in the target file.

  • expected_cols (int) – The number of columns in the target file.

  • min_name_sim (float) – The minimum name similarity ratio (0.0 to 1.0) required for a file to be considered a potential donor. Defaults to 0.4.

Returns:

A tuple containing the path to the donor file and its raw header line, or None if no suitable donor is found.

Return type:

Optional[Tuple[Path, str]]

micromet.headers.fix_all_in_parent(parent, searchstr='*_AmeriFluxFormat_*.dat')[source]

Recursively scan a parent directory for files with duplicate names and fix missing headers.

This function searches parent for files matching a given pattern. If duplicate filenames are found such that one version has a header and another does not, the header is copied from the former to the latter. The target files are overwritten in-place, and a .bak backup is created for each.

Parameters:
  • parent (Path) – Root directory to scan for matching files. All subdirectories are included recursively.

  • searchstr (str) – Glob-style pattern to match filenames (default is “_AmeriFluxFormat_.dat”).

Returns:

A dictionary mapping filenames to lists of paths where they were found.

Return type:

dict

Notes

  • Files are grouped by basename and inspected line-by-line to determine whether they contain a header.

  • If multiple files have headers, only the first one is used as the donor.

  • Files with no header and no matching header source are skipped.

micromet.headers.fix_directory_pairs(dir_with_headers, dir_without_headers)[source]

Apply headers from a directory of correctly formatted files to a directory of files missing headers.

This function loops through all files in dir_without_headers. For each file that lacks a header, it attempts to find a matching file by name in dir_with_headers and uses it to patch the missing header. The original file is overwritten, and a .bak backup is created.

Parameters:
  • dir_with_headers (Path) – Directory containing files with valid headers.

  • dir_without_headers (Path) – Directory containing files that may be missing headers.

Return type:

None

Notes

This function assumes that files in both directories are named identically, and that headers can be determined by inspecting the first line of each file.

micromet.headers.get_first_line_raw(path)[source]

Return the first line of a file as raw text, without trailing newlines.

Parameters:

path (Path) – The path to the file.

Returns:

The content of the first line.

Return type:

str

micromet.headers.header_line_is_valid(header_line, delimiter, expected_cols)[source]

Check if a header line has the expected number of columns.

This function properly handles quoted fields.

Parameters:
  • header_line (str) – The raw header line text.

  • delimiter (str) – The delimiter character.

  • expected_cols (int) – The number of columns the header should have.

Returns:

True if the parsed header has the correct number of columns, False otherwise.

Return type:

bool

micromet.headers.looks_like_header(line, alpha_thresh=0.2)[source]

Heuristically determine if a line appears to be a header.

This function checks if a line from a text file is likely to be a header row by checking for the presence of alphabetic characters.

Parameters:
  • line (str) – A single line of text from a file.

  • alpha_thresh (float) – The minimum fraction of fields that must contain alphabetic characters to be considered a header. Defaults to 0.2 (20%).

Returns:

True if the line is likely a header, False otherwise.

Return type:

bool

micromet.headers.name_similarity(a, b)[source]

Calculate the similarity ratio between two strings.

Uses difflib.SequenceMatcher for the comparison.

Parameters:
  • a (str) – The first string.

  • b (str) – The second string.

Returns:

A similarity score between 0.0 and 1.0.

Return type:

float

micromet.headers.open_text(path, encodings=None)[source]

Open a text file, trying a list of encodings until one succeeds.

Parameters:
  • path (Path) – The path to the text file.

  • encodings (list[str] | None) – A list of character encodings to try, in order. Defaults to [“utf-8-sig”, “utf-8”, “latin-1”].

Returns:

An open file object.

Return type:

TextIOWrapper

Raises:

Exception – If all attempted encodings fail, the last exception is re-raised.

micromet.headers.patch_file(donor, target)[source]

Apply a header from a donor file to a target file.

This function reads the header from a donor file and applies it to a target file that is assumed to be missing a header. The modified data is returned as a DataFrame and written back to the target file.

Parameters:
  • donor (Path) – The path to the file with the correct header.

  • target (Path) – The path to the file that needs a header.

Returns:

A DataFrame containing the data from the target file with the new header.

Return type:

DataFrame

micromet.headers.prepend_header_in_place(path, header_line)[source]

Insert a header line at the top of a file.

This function reads the entire file, then writes it back with the provided header line at the beginning. It attempts to preserve the original newline style.

Parameters:
  • path (Path) – The path to the file to be modified.

  • header_line (str) – The header line to prepend to the file.

Return type:

None

micromet.headers.process_file(path, min_sim, make_backup)[source]

Detect and repair a headerless delimited text file in place.

The function inspects path to determine its delimiter and whether the file already contains a header row. If a header is missing, it searches for a “donor” file in the same directory with a compatible delimiter and column count, and with column-name similarity above min_sim. When a donor is found, its header is prepended to path (optionally creating a .bak backup first). Progress is reported via print messages.

Parameters:
  • path (Path) – Path to the target text file to check and possibly fix.

  • min_sim (float) – Minimum similarity threshold (0–1) for column-name matching when selecting a donor header. Higher values are stricter.

  • make_backup (bool) – If True, write a bytes-for-bytes backup alongside the file at path.with_suffix(path.suffix + ".bak") before modifying the file.

Returns:

The file at path may be modified in place as a side effect.

Return type:

None

Raises:
  • OSError – If reading or writing the file fails.

  • Exception – Any error originating from helper functions may propagate.

micromet.headers.read_colnames(path)[source]

Read column names from the first line of a file.

This function infers the delimiter, reads the first line of the file, and returns the column names.

Parameters:

path (Path) – The path to the file.

Returns:

A list of column names.

Return type:

list[str]

micromet.headers.scan(root, min_sim=0.5, backup=False)[source]

Recursively scan a directory tree and fix headerless text files.

Walks root with Path.rglob("*") and applies process_file() to every file whose extension is in {".dat"}. Exceptions raised by process_file() are caught and reported, allowing the scan to continue.

Parameters:
  • root (Path) – Directory to search recursively for candidate text files.

  • min_sim (float) – Minimum column-name similarity (0–1) when selecting a donor header; passed through to process_file().

  • backup (bool) – If True, create a .bak file for each modified file; passed through to process_file() as make_backup.

Return type:

None

Returns:

  • None

  • Side Effects

  • ————

  • - May modify files in place by inserting a header line.

    • May create .bak files adjacent to modified files when backup=True.

  • - Prints progress, skip, and error messages to standard output.

micromet.headers.sniff_delimiter(path, sample_bytes=2048, default=',')[source]

Infer the most likely delimiter used in a text file.

This function reads a sample from the beginning of a file and uses csv.Sniffer to detect the delimiter.

Parameters:
  • path (Path) – The path to the file.

  • sample_bytes (int) – The number of bytes to read for the sample. Defaults to 2048.

  • default (str) – The delimiter to return if detection fails. Defaults to “,”.

Returns:

The detected or default delimiter.

Return type:

str

micromet.station_data_pull module

class micromet.station_data_pull.StationDataDownloader(config, logger=None)[source]

Bases: object

A class to manage downloading data from a station’s logger.

This class handles the connection and data download from a Campbell Scientific data logger via its web API.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

config

The configuration object.

Type:

configparser.ConfigParser or dict

logger

The logger instance.

Type:

logging.Logger

logger_credentials

The authentication credentials for the logger.

Type:

requests.auth.HTTPBasicAuth

__init__(config, logger=None)[source]

Initialize the StationDataDownloader.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

download_from_station(station, loggertype='eddy', mode='since-time', p1='0', p2='0')[source]

Download data from a station’s logger.

This method constructs a request to the station’s web API to download data based on the specified parameters.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • mode (str) – The data query mode (‘since-time’, ‘most-recent’, etc.). Defaults to ‘since-time’.

  • p1 (str) – The primary parameter for the query (e.g., start time). Defaults to “0”.

  • p2 (str) – The secondary parameter for the query (e.g., end time). Defaults to “0”.

Returns:

A tuple containing the downloaded data as a DataFrame, the size of the data packet in MB, and the HTTP status code.

Return type:

tuple[pd.DataFrame | None, float | None, int]

static get_station_id(stationid)[source]

Extract the station ID from a full station identifier string.

Parameters:

stationid (str) – The full station identifier (e.g., ‘US-ABC’).

Returns:

The extracted station ID (e.g., ‘ABC’).

Return type:

str

get_times(station, loggertype='eddy')[source]

Retrieve the current time from the logger and the system.

This method queries a station’s logger for its current time and also gets the current system time for comparison.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A tuple containing the logger’s current time as a string and the system’s current time as a string.

Return type:

Tuple[Optional[str], str]

class micromet.station_data_pull.StationDataProcessor(config, engine, logger=None)[source]

Bases: StationDataDownloader

A class for processing and managing station data.

This class extends StationDataDownloader to add functionality for reformatting data, interacting with a database, and managing the overall data processing workflow.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

engine

The SQLAlchemy engine instance.

Type:

sqlalchemy.engine.base.Engine

__init__(config, engine, logger=None)[source]

Initialize the StationDataProcessor.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

compare_sql_to_station(df, station, field='timestamp_end', loggertype='eddy')[source]

Compare station data with records in the database and filter new entries.

Parameters:
  • df (DataFrame) – The DataFrame containing the station data.

  • station (str) – The identifier for the station.

  • field (str) – The field to use for comparison. Defaults to “timestamp_end”.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A DataFrame containing only the new records.

Return type:

DataFrame

database_columns(dat)[source]

Get the list of column names for a given database table.

Parameters:

dat (str) – The type of data (‘eddy’ or ‘met’), which corresponds to the table name.

Returns:

A list of column names in the specified table.

Return type:

list

get_max_date(station, loggertype='eddy')[source]

Get the maximum timestamp from the station’s data in the database.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

The latest timestamp found in the database for the station.

Return type:

datetime

get_station_data(station, reformat=True, loggertype='eddy', config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv', drop_soil=False)[source]

Fetch and process data for a single station.

This method downloads data from a station, optionally reformats it, and returns the processed data.

Parameters:
  • station (str) – The identifier for the station.

  • reformat (bool) – Whether to reformat the downloaded data. Defaults to True.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • config_path (str) – The path to the reformatter configuration file.

  • var_limits_csv (str) – The path to the variable limits CSV file.

  • drop_soil (bool) – Whether to drop soil-related data. Defaults to False.

Returns:

A tuple containing the processed DataFrame and the size of the downloaded data packet in MB.

Return type:

Tuple[Optional[DataFrame], Optional[float]]

process_station_data(site_folders, config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv')[source]

Process and upload data for all specified stations.

This method iterates through a dictionary of site folders, fetches data for each station, processes it, and uploads it to the database.

Parameters:
  • site_folders (dict) – A dictionary mapping station IDs to folder names.

  • config_path (str) – The path to the reformatter configuration file. Defaults to “./data/reformatter_vars.yml”.

  • var_limits_csv (str) – The path to the variable limits CSV file. Defaults to “./data/extreme_values.csv”.

Return type:

None

static remove_existing_records(df, column_to_check, values_to_remove, logger=None)[source]

Remove rows from a DataFrame that already exist in the database.

Parameters:
  • df (DataFrame) – The input DataFrame.

  • column_to_check (str) – The name of the column to check for existing values.

  • values_to_remove (list) – A list of values to be removed from the DataFrame.

  • logger (Logger) – A logger for logging messages. Defaults to None.

Returns:

The DataFrame with existing records removed.

Return type:

DataFrame

micromet.tools module

Module contents

Micromet: A package for processing and analyzing micrometeorological data.

This package provides a collection of tools for reading, reformatting, performing quality control, and generating reports from micrometeorological and flux data, particularly from AmeriFlux-style data sources.

The main components of the package are: - AmerifluxDataProcessor: For reading and parsing data files. - Reformatter: For cleaning and standardizing data. - tools: A collection of utility functions for analysis. - graphs: For creating various plots and visualizations. - StationDataDownloader: For downloading data from stations. - StationDataProcessor: For processing and managing station data.

class micromet.AmerifluxDataProcessor(logger=None)[source]

Bases: object

A class for reading and parsing AmeriFlux-style CSV files.

This class is designed to handle Campbell Scientific TOA5 files or standard AmeriFlux output files, parsing them into a pandas DataFrame.

Parameters:

logger (Logger) – A logger for tracking the data processing. If not provided, a default logger is used.

logger

The logger used for logging messages.

Type:

logging.Logger

skip_rows

The number of rows to skip at the beginning of the file.

Type:

int or list of int

names

The column names for the DataFrame.

Type:

list of str

NA_VALUES = ['-9999', 'NAN', 'NaN', 'nan', nan, -9999.0]
__init__(logger=None)[source]

Initialize the AmerifluxDataProcessor.

Parameters:

logger (Logger) – A logger for tracking the data processing. If not provided, a default logger is used.

iterate_through_stations()[source]

Iterate through all stations and compile their data.

This method iterates through a predefined list of stations, compiles the data for each station, and returns a dictionary of DataFrames.

Returns:

A dictionary where keys are station IDs and values are DataFrames of the compiled data for each station.

Return type:

dict

raw_file_compile(main_dir, station_folder_name, search_str='*Flux_AmeriFluxFormat*.dat')[source]

Compile raw AmeriFlux datalogger files into a single DataFrame.

This method searches for files matching a given pattern within a station’s directory, processes each file, and concatenates them into a single DataFrame.

Parameters:
  • main_dir (Union[str, Path]) – The main directory containing the station folders.

  • station_folder_name (Union[str, Path]) – The name of the station folder.

  • search_str (str) – The search string (glob pattern) for finding files to compile. Defaults to “Flux_AmeriFluxFormat.dat”.

Returns:

A DataFrame containing the compiled data, or None if no valid files were found.

Return type:

Optional[DataFrame]

to_dataframe(file)[source]

Read an AmeriFlux-style CSV file and return it as a pandas DataFrame.

This method first determines the header structure of the file and then reads the data into a DataFrame, handling missing values.

Parameters:

file (Union[str, Path]) – The path to the CSV file to be read.

Returns:

A DataFrame containing the parsed data from the file.

Return type:

DataFrame

class micromet.DateRangeDrop(column, start, end)[source]

Bases: object

A date range within which a column’s values should be set to NaN.

column: str
end: str
start: str
class micromet.FlagWindow(flag_columns, start, end, flag_value=2)[source]

Bases: object

A time window for applying a quality flag value to one or more columns.

end: str
flag_columns: List[str]
flag_value: int = 2
start: str
class micromet.Reformatter(var_limits_csv=None, drop_soil=True, check_timestamps=False, site_lat=None, site_lon=None, site_utc_offset=-7, logger=None)[source]

Bases: object

A class to clean and standardize station data for flux/met processing.

This class provides a pipeline for preparing raw station data by applying a series of transformations, including fixing timestamps, renaming columns, applying physical limits, and checking timestamp alignment.

Parameters:
  • var_limits_csv (str | Path | None) – Path to a CSV file containing variable limits. If not provided, default limits are used.

  • drop_soil (bool) – If True, extra soil-related columns are dropped. Defaults to True.

  • check_timestamps (bool) – If True, perform timestamp alignment analysis on radiation data. Defaults to False.

  • site_lat (float | None) – Latitude of the site (required if check_timestamps=True).

  • site_lon (float | None) – Longitude of the site (required if check_timestamps=True).

  • site_utc_offset (int) – UTC offset in hours for the site (required if check_timestamps=True).

  • logger (Logger | None) – A logger for tracking the reformatting process. If not provided, a default logger is used.

logger

The logger used for logging messages.

Type:

logging.Logger

config

A dictionary of configuration parameters for the reformatting process.

Type:

dict

varlimits

A DataFrame containing the physical limits for each variable.

Type:

pd.DataFrame

drop_soil

A flag indicating whether to drop extra soil columns.

Type:

bool

check_timestamps

A flag indicating whether to perform timestamp alignment checks.

Type:

bool

site_lat

The latitude of the site.

Type:

float

site_lon

The longitude of the site.

Type:

float

site_utc_offset

The UTC offset of the site in hours.

Type:

float

__init__(var_limits_csv=None, drop_soil=True, check_timestamps=False, site_lat=None, site_lon=None, site_utc_offset=-7, logger=None)[source]

Initialize the Reformatter.

Parameters:
  • var_limits_csv (str | Path | None) – Path to a CSV file containing variable limits.

  • drop_soil (bool) – If True, extra soil-related columns are dropped. Defaults to True.

  • check_timestamps (bool) – If True, perform timestamp alignment analysis. Defaults to False.

  • site_lat (float | None) – Latitude of the site (required if check_timestamps=True).

  • site_lon (float | None) – Longitude of the site (required if check_timestamps=True).

  • site_utc_offset (int) – UTC offset in hours (required if check_timestamps=True).

  • logger (Logger | None) – A logger for tracking the reformatting process.

finalize(df)[source]

Finalize the data by applying cleaning and standardization steps.

prepare(df, interval=30, data_type='eddy')[source]

Current method - keep for backward compatibility

preprocess(df, data_type='eddy', interval=30)[source]

Preprocess the data by applying initial cleaning and standardization steps.

process(df, interval, data_type='eddy')[source]

Prepare the data by applying a series of cleaning and standardization steps.

This method takes a DataFrame of station data and applies a pipeline of transformations to clean and standardize it. The steps include fixing timestamps, renaming columns, setting numeric types, resampling, applying physical limits, and optionally checking timestamp alignment.

Parameters:
  • df (DataFrame) – The input DataFrame of station data.

  • data_type (str) – The type of data being processed (e.g., ‘eddy’, ‘met’). This is used to determine which column renaming map to use. Defaults to ‘eddy’.

  • interval (int) – The sampling interval used with the data; must be either 30 or 60 minutes

Returns:

A tuple containing: - The prepared DataFrame with standardized and cleaned data. - A report DataFrame detailing the changes made during the

application of physical limits.

  • A dictionary with timestamp alignment results (if check_timestamps=True), or None otherwise. Contains keys: ‘summary’, ‘composites’, ‘flags’.

Return type:

Tuple[DataFrame, DataFrame, Optional[Dict]]

class micromet.SiteCorrections(sg_correction_factor=None, sg_correction_vars=<factory>, sg_correction_end=None, precip_correction_factor=None, precip_correction_end=None, precip_bad_before=None, wind_direction_offset=None, wind_direction_change_date=None, date_range_drops=<factory>, h2o_flag_windows=<factory>, co2_flag_windows=<factory>, wind_flag_bad_range=None, wind_flag_marginal_ranges=<factory>, signal_strength_threshold=0.8, drop_precip_on_visits=True, csflux_join_cols=None, columns_to_drop_from_merge=None, soilvue_bad_ec_threshold=None, extra_drops=<factory>)[source]

Bases: object

Declarative specification of site-specific corrections applied during QC.

All fields are optional; only the corrections relevant to a given station need to be populated.

Parameters:
  • sg_correction_factor (Optional[float]) – Multiplicative factor for soil-heat-flux storage (SG) sensors.

  • sg_correction_vars (List[str]) – Columns to which sg_correction_factor applies.

  • sg_correction_end (Optional[str]) – Datetime string; correction is applied to data before this date.

  • precip_correction_factor (Optional[float]) – Multiplicative factor for precipitation before a program fix date.

  • precip_correction_end (Optional[str]) – Datetime string; precip correction is applied before this date.

  • precip_bad_before (Optional[str]) – Drop all precip data before this date (e.g. broken bucket).

  • wind_direction_offset (Optional[float]) – Degrees to subtract from WD_1_1_1 before the change date.

  • wind_direction_change_date (Optional[str]) – Datetime string when the IRGASON orientation changed.

  • date_range_drops (List[DateRangeDrop]) – Specific column/date-range pairs to null out (spikes, sensor issues).

  • h2o_flag_windows (List[FlagWindow]) – Windows to flag H2O signal-strength issues.

  • co2_flag_windows (List[FlagWindow]) – Windows to flag CO2 signal-strength issues.

  • wind_flag_bad_range (Optional[Tuple[float, float]]) – (start_deg, end_deg) range of wind directions flagged as 2 (bad).

  • wind_flag_marginal_ranges (List[Tuple[float, float]]) – List of (start_deg, end_deg) ranges flagged as 1 (marginal).

  • signal_strength_threshold (float) – Threshold below which signal-strength data is flagged.

  • drop_precip_on_visits (bool) – Whether to zero-out precipitation on station-visit days.

  • csflux_join_cols (Optional[List[str]]) – Subset of CSFlux columns to merge into the final eddy dataset. If None, a default set is used.

  • columns_to_drop_from_merge (Optional[List[str]]) – Columns to drop after the eddy/met merge (e.g. RECORD, G_1_1_A).

  • soilvue_bad_ec_threshold (Optional[float]) – Minimum EC_3_7_1 value; rows below are dropped for SoilVue columns.

  • extra_drops (List[DateRangeDrop]) – Additional ad-hoc date/column drops.

co2_flag_windows: List[FlagWindow]
columns_to_drop_from_merge: List[str] | None = None
csflux_join_cols: List[str] | None = None
date_range_drops: List[DateRangeDrop]
drop_precip_on_visits: bool = True
extra_drops: List[DateRangeDrop]
h2o_flag_windows: List[FlagWindow]
precip_bad_before: str | None = None
precip_correction_end: str | None = None
precip_correction_factor: float | None = None
sg_correction_end: str | None = None
sg_correction_factor: float | None = None
sg_correction_vars: List[str]
signal_strength_threshold: float = 0.8
soilvue_bad_ec_threshold: float | None = None
wind_direction_change_date: str | None = None
wind_direction_offset: float | None = None
wind_flag_bad_range: Tuple[float, float] | None = None
wind_flag_marginal_ranges: List[Tuple[float, float]]
class micromet.StationDataDownloader(config, logger=None)[source]

Bases: object

A class to manage downloading data from a station’s logger.

This class handles the connection and data download from a Campbell Scientific data logger via its web API.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

config

The configuration object.

Type:

configparser.ConfigParser or dict

logger

The logger instance.

Type:

logging.Logger

logger_credentials

The authentication credentials for the logger.

Type:

requests.auth.HTTPBasicAuth

__init__(config, logger=None)[source]

Initialize the StationDataDownloader.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object containing station details and credentials.

  • logger (Logger) – A logger for logging messages. If None, a new logger is created.

download_from_station(station, loggertype='eddy', mode='since-time', p1='0', p2='0')[source]

Download data from a station’s logger.

This method constructs a request to the station’s web API to download data based on the specified parameters.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • mode (str) – The data query mode (‘since-time’, ‘most-recent’, etc.). Defaults to ‘since-time’.

  • p1 (str) – The primary parameter for the query (e.g., start time). Defaults to “0”.

  • p2 (str) – The secondary parameter for the query (e.g., end time). Defaults to “0”.

Returns:

A tuple containing the downloaded data as a DataFrame, the size of the data packet in MB, and the HTTP status code.

Return type:

tuple[pd.DataFrame | None, float | None, int]

static get_station_id(stationid)[source]

Extract the station ID from a full station identifier string.

Parameters:

stationid (str) – The full station identifier (e.g., ‘US-ABC’).

Returns:

The extracted station ID (e.g., ‘ABC’).

Return type:

str

get_times(station, loggertype='eddy')[source]

Retrieve the current time from the logger and the system.

This method queries a station’s logger for its current time and also gets the current system time for comparison.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A tuple containing the logger’s current time as a string and the system’s current time as a string.

Return type:

Tuple[Optional[str], str]

class micromet.StationDataProcessor(config, engine, logger=None)[source]

Bases: StationDataDownloader

A class for processing and managing station data.

This class extends StationDataDownloader to add functionality for reformatting data, interacting with a database, and managing the overall data processing workflow.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

engine

The SQLAlchemy engine instance.

Type:

sqlalchemy.engine.base.Engine

__init__(config, engine, logger=None)[source]

Initialize the StationDataProcessor.

Parameters:
  • config (Union[ConfigParser, dict]) – A configuration object with station details.

  • engine (Engine) – A SQLAlchemy engine for database connections.

  • logger (Logger) – A logger for logging messages.

compare_sql_to_station(df, station, field='timestamp_end', loggertype='eddy')[source]

Compare station data with records in the database and filter new entries.

Parameters:
  • df (DataFrame) – The DataFrame containing the station data.

  • station (str) – The identifier for the station.

  • field (str) – The field to use for comparison. Defaults to “timestamp_end”.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

A DataFrame containing only the new records.

Return type:

DataFrame

database_columns(dat)[source]

Get the list of column names for a given database table.

Parameters:

dat (str) – The type of data (‘eddy’ or ‘met’), which corresponds to the table name.

Returns:

A list of column names in the specified table.

Return type:

list

get_max_date(station, loggertype='eddy')[source]

Get the maximum timestamp from the station’s data in the database.

Parameters:
  • station (str) – The identifier for the station.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

Returns:

The latest timestamp found in the database for the station.

Return type:

datetime

get_station_data(station, reformat=True, loggertype='eddy', config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv', drop_soil=False)[source]

Fetch and process data for a single station.

This method downloads data from a station, optionally reformats it, and returns the processed data.

Parameters:
  • station (str) – The identifier for the station.

  • reformat (bool) – Whether to reformat the downloaded data. Defaults to True.

  • loggertype (str) – The type of logger (‘eddy’ or ‘met’). Defaults to ‘eddy’.

  • config_path (str) – The path to the reformatter configuration file.

  • var_limits_csv (str) – The path to the variable limits CSV file.

  • drop_soil (bool) – Whether to drop soil-related data. Defaults to False.

Returns:

A tuple containing the processed DataFrame and the size of the downloaded data packet in MB.

Return type:

Tuple[Optional[DataFrame], Optional[float]]

process_station_data(site_folders, config_path='./data/reformatter_vars.yml', var_limits_csv='./data/extreme_values.csv')[source]

Process and upload data for all specified stations.

This method iterates through a dictionary of site folders, fetches data for each station, processes it, and uploads it to the database.

Parameters:
  • site_folders (dict) – A dictionary mapping station IDs to folder names.

  • config_path (str) – The path to the reformatter configuration file. Defaults to “./data/reformatter_vars.yml”.

  • var_limits_csv (str) – The path to the variable limits CSV file. Defaults to “./data/extreme_values.csv”.

Return type:

None

static remove_existing_records(df, column_to_check, values_to_remove, logger=None)[source]

Remove rows from a DataFrame that already exist in the database.

Parameters:
  • df (DataFrame) – The input DataFrame.

  • column_to_check (str) – The name of the column to check for existing values.

  • values_to_remove (list) – A list of values to be removed from the DataFrame.

  • logger (Logger) – A logger for logging messages. Defaults to None.

Returns:

The DataFrame with existing records removed.

Return type:

DataFrame

class micromet.WorkflowConfig(station='', interval=30, raw_data_root=PosixPath('.'), output_root=PosixPath('.'), amflux_var_file=None, preprocessed_dir=None, steps=<factory>, generate_plots=False, drop_soil=False, fetch_events_from_db=False, events_api_url='https://ugs-koop-umfdxaxiyq-wm.a.run.app', data_interval_label='HH', soilvue_g_calculation=False, soilvue_depths_cm=<factory>)[source]

Bases: object

Top-level configuration for the automated workflow.

Parameters:
  • station (str) – Station identifier (e.g. 'US-UTJ').

  • interval (int) – Data interval in minutes (30 or 60).

  • raw_data_root (Path) – Root folder containing compiled station data.

  • output_root (Path) – Root folder for processed outputs (raw/, qc/, ameriflux/ sub-dirs).

  • amflux_var_file (Optional[Path]) – Path to the AmeriFlux variable-name CSV. Used for column validation.

  • preprocessed_dir (Optional[Path]) – Directory for preprocessed parquet files. Defaults to raw_data_root / 'preprocessed_site_data'.

  • steps (List[int]) – Which workflow steps to run (1-4). Default is all.

  • generate_plots (bool) – Whether to generate review plots (notebooks 3b/4b).

  • drop_soil (bool) – Whether to drop extra soil columns during reformatter finalize.

  • fetch_events_from_db (bool) – Whether to pull station events from the UGS API.

  • events_api_url (str) – Base URL for the station events API.

  • data_interval_label (str) – AmeriFlux interval label ('HH' for half-hourly).

  • soilvue_g_calculation (bool) – Whether to calculate SoilVue G values using gradient+storage.

  • soilvue_depths_cm (List[float]) – SoilVue sensor depths in centimeters.

amflux_var_file: Path | None = None
data_interval_label: str = 'HH'
drop_soil: bool = False
events_api_url: str = 'https://ugs-koop-umfdxaxiyq-wm.a.run.app'
fetch_events_from_db: bool = False
generate_plots: bool = False
interval: int = 30
output_root: Path = PosixPath('.')
preprocessed_dir: Path | None = None
property preprocessed_path: Path
raw_data_root: Path = PosixPath('.')
soilvue_depths_cm: List[float]
soilvue_g_calculation: bool = False
station: str = ''
steps: List[int]
class micromet.WorkflowResult(station, success, steps_completed=<factory>, output_files=<factory>, reports=<factory>, errors=<factory>, processing_time=0.0)[source]

Bases: object

Container for results of a workflow run.

errors: Dict[int, str]
output_files: Dict[str, Path]
processing_time: float = 0.0
reports: Dict[str, Any]
station: str
steps_completed: List[int]
success: bool
summary()[source]
Return type:

str

class micromet.WorkflowRunner(config, corrections=None, logger=None)[source]

Bases: object

Orchestrates the full numbered-notebook workflow for a single station.

Parameters:
generate_review_plots(context)[source]

Generate time-series plots for all variables in the QC dataset.

Return type:

None

run()[source]

Execute the configured workflow steps in sequence.

Return type:

WorkflowResult

step1_compile_and_preprocess(context)[source]

Compile raw files and preprocess into parquet datasets.

Return type:

Dict[str, Any]

step2_create_raw_data(context)[source]

Merge data sources and create the combined raw dataset.

Return type:

Dict[str, Any]

step3_qc_data(context)[source]

Apply corrections, physical limits, QC, and flagging.

Return type:

Dict[str, Any]

step4_export_ameriflux(context)[source]

Export AmeriFlux-formatted CSV from QC data.

Return type:

Dict[str, Any]

micromet.run_workflow(station, raw_data_root, output_root, corrections=None, **kwargs)[source]

Convenience function to run the complete workflow for a station.

Parameters:
  • station (str) – Station identifier (e.g. 'US-UTJ').

  • raw_data_root (Union[str, Path]) – Root folder containing compiled station data.

  • output_root (Union[str, Path]) – Root folder for processed outputs.

  • corrections (Optional[SiteCorrections]) – Site-specific corrections.

  • **kwargs – Additional arguments passed to WorkflowConfig.

Return type:

WorkflowResult