"""
Complete pipeline for processing micrometeorological data with Micromet.
This module provides high-level orchestration for the complete data processing
workflow, from raw data files to cleaned, validated, and analyzed datasets.
Classes
-------
Pipeline : Main orchestration class for data processing
PipelineConfig : Configuration container for pipeline settings
ProcessingResult : Container for processing results and metadata
Functions
---------
run_pipeline : Convenience function to run complete pipeline
process_station : Process a single station's data
batch_process : Process multiple stations
Examples
--------
Basic usage:
>>> from micromet.pipeline import Pipeline
>>>
>>> # Process a single file
>>> pipeline = Pipeline()
>>> result = pipeline.process_file(
... 'data/US-UTW_Flux.dat',
... site_id='US-UTW'
... )
>>>
>>> # Batch process all stations
>>> results = pipeline.batch_process(
... input_dir='./raw_data',
... output_dir='./processed_data'
... )
Command-line usage:
$ python -m micromet.pipeline --site US-UTW --input data/ --output results/
$ python -m micromet.pipeline --batch --input data/ --output results/
"""
from __future__ import annotations
import argparse
import json
import logging
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import pandas as pd
import numpy as np
from micromet.reader import AmerifluxDataProcessor
from micromet.format.reformatter import Reformatter
from micromet.qaqc import variable_limits
from micromet.report import gap_summary, validate, graphs
from micromet.utils import (
logger_check,
read_site_config,
get_all_site_configs,
create_reformatter_from_site
)
# =============================================================================
# Configuration and Result Containers
# =============================================================================
[docs]
@dataclass
class PipelineConfig:
"""
Configuration settings for the data processing pipeline.
Attributes
----------
check_timestamps : bool
Whether to perform timestamp alignment analysis (slower but thorough).
drop_soil : bool
Whether to drop extra soil sensor columns.
generate_reports : bool
Whether to generate validation and gap reports.
generate_plots : bool
Whether to generate diagnostic plots.
save_intermediate : bool
Whether to save intermediate processing steps.
var_limits_csv : Path or None
Path to custom variable limits CSV file.
expected_freq : str
Expected data frequency (e.g., '30min').
output_format : str
Output file format ('csv', 'parquet', 'feather').
"""
check_timestamps: bool = True
drop_soil: bool = True
generate_reports: bool = True
generate_plots: bool = False
save_intermediate: bool = False
var_limits_csv: Optional[Path] = None
expected_freq: str = '30min'
output_format: str = 'csv'
[docs]
def to_dict(self) -> dict:
"""Convert configuration to dictionary."""
d = asdict(self)
# Convert Path objects to strings
if d['var_limits_csv'] is not None:
d['var_limits_csv'] = str(d['var_limits_csv'])
return d
[docs]
@dataclass
class ProcessingResult:
"""
Container for processing results and metadata.
Attributes
----------
site_id : str
Station identifier.
success : bool
Whether processing completed successfully.
input_file : Path
Path to input file.
output_file : Path or None
Path to output file (if saved).
n_records_input : int
Number of records in input data.
n_records_output : int
Number of records in output data.
n_flagged : int
Number of records flagged during QA/QC.
processing_time : float
Processing time in seconds.
timestamp_issues : dict or None
Detected timestamp alignment issues.
error_message : str or None
Error message if processing failed.
reports : dict
Dictionary of generated reports.
"""
site_id: str
success: bool
input_file: Path
output_file: Optional[Path] = None
n_records_input: int = 0
n_records_output: int = 0
n_flagged: int = 0
processing_time: float = 0.0
timestamp_issues: Optional[Dict] = None
error_message: Optional[str] = None
reports: Dict = field(default_factory=dict)
[docs]
def to_dict(self) -> dict:
"""Convert result to dictionary."""
d = asdict(self)
# Convert Path objects to strings
d['input_file'] = str(d['input_file'])
if d['output_file'] is not None:
d['output_file'] = str(d['output_file'])
return d
[docs]
def summary(self) -> str:
"""Generate a human-readable summary."""
status = "SUCCESS" if self.success else "FAILED"
lines = [
f"Processing Result: {status}",
f"Site: {self.site_id}",
f"Input: {self.input_file}",
f"Records: {self.n_records_input} → {self.n_records_output}",
f"Flagged: {self.n_flagged}",
f"Time: {self.processing_time:.2f}s",
]
if self.timestamp_issues:
lines.append(f"Timestamp Issues: {len(self.timestamp_issues)}")
if self.error_message:
lines.append(f"Error: {self.error_message}")
return "\n".join(lines)
# =============================================================================
# Main Pipeline Class
# =============================================================================
[docs]
class Pipeline:
"""
Main orchestration class for micrometeorological data processing.
This class coordinates the complete workflow from raw data files to
cleaned, validated, and analyzed datasets.
Parameters
----------
config : PipelineConfig, optional
Configuration settings for the pipeline.
logger : logging.Logger, optional
Logger instance for tracking progress.
Attributes
----------
config : PipelineConfig
Pipeline configuration.
logger : logging.Logger
Logger instance.
reader : AmerifluxDataProcessor
Data reader instance.
"""
[docs]
def __init__(
self,
config: Optional[PipelineConfig] = None,
logger: Optional[logging.Logger] = None
):
"""Initialize the Pipeline."""
self.config = config or PipelineConfig()
self.logger = logger_check(logger)
self.reader = AmerifluxDataProcessor(logger=self.logger)
self.logger.info("Pipeline initialized")
self.logger.debug(f"Configuration: {self.config.to_dict()}")
[docs]
def process_file(
self,
input_file: Union[str, Path],
site_id: Optional[str] = None,
output_dir: Optional[Union[str, Path]] = None,
data_type: str = 'eddy',
) -> ProcessingResult:
"""
Process a single data file through the complete pipeline.
Parameters
----------
input_file : str or Path
Path to input data file.
site_id : str, optional
Station identifier. If None, attempts to extract from filename.
output_dir : str or Path, optional
Directory for output files. If None, uses input file directory.
data_type : str, optional
Type of data ('eddy' or 'met'). Defaults to 'eddy'.
Returns
-------
ProcessingResult
Container with processing results and metadata.
"""
start_time = datetime.now()
input_file = Path(input_file)
# Extract site_id if not provided
if site_id is None:
site_id = self._extract_site_id(input_file)
self.logger.info(f"Processing {site_id}: {input_file.name}")
try:
# Step 1: Read raw data
self.logger.info("Step 1/5: Reading raw data...")
df_raw = self.reader.to_dataframe(input_file)
n_input = len(df_raw)
self.logger.info(f" Read {n_input:,} records")
# Step 2: Reformat and clean
self.logger.info("Step 2/5: Reformatting and cleaning...")
df_clean, limits_report, ts_results = self._reformat_data(
df_raw, site_id, data_type
)
n_output = len(df_clean)
self.logger.info(f" Output: {n_output:,} records")
# Step 3: Generate reports
reports = {}
if self.config.generate_reports:
self.logger.info("Step 3/5: Generating QA/QC reports...")
reports = self._generate_reports(
df_clean, limits_report, ts_results, site_id
)
else:
self.logger.info("Step 3/5: Skipping reports (disabled)")
# Step 4: Generate plots
if self.config.generate_plots and output_dir:
self.logger.info("Step 4/5: Generating diagnostic plots...")
self._generate_plots(df_clean, site_id, Path(output_dir))
else:
self.logger.info("Step 4/5: Skipping plots")
# Step 5: Save output
output_file = None
if output_dir:
self.logger.info("Step 5/5: Saving output...")
output_file = self._save_output(
df_clean, site_id, Path(output_dir), data_type
)
# Save reports
if reports and self.config.generate_reports:
self._save_reports(reports, site_id, Path(output_dir))
else:
self.logger.info("Step 5/5: Skipping save (no output_dir)")
# Calculate metrics
n_flagged = limits_report['n_flagged'].sum() if 'n_flagged' in limits_report else 0
processing_time = (datetime.now() - start_time).total_seconds()
# Extract timestamp issues
ts_issues = None
if ts_results and 'flags' in ts_results:
ts_issues = ts_results['flags']
result = ProcessingResult(
site_id=site_id,
success=True,
input_file=input_file,
output_file=output_file,
n_records_input=n_input,
n_records_output=n_output,
n_flagged=int(n_flagged),
processing_time=processing_time,
timestamp_issues=ts_issues,
reports=reports
)
self.logger.info(f"✓ Processing complete: {processing_time:.2f}s")
return result
except Exception as e:
processing_time = (datetime.now() - start_time).total_seconds()
self.logger.error(f"✗ Processing failed: {e}", exc_info=True)
return ProcessingResult(
site_id=site_id or "UNKNOWN",
success=False,
input_file=input_file,
processing_time=processing_time,
error_message=str(e)
)
[docs]
def batch_process(
self,
input_dir: Union[str, Path],
output_dir: Union[str, Path],
pattern: str = "*Flux*.dat",
data_type: str = 'eddy',
) -> List[ProcessingResult]:
"""
Process multiple files in a directory.
Parameters
----------
input_dir : str or Path
Directory containing input files.
output_dir : str or Path
Directory for output files.
pattern : str, optional
Glob pattern for finding input files. Defaults to "*Flux*.dat".
data_type : str, optional
Type of data ('eddy' or 'met'). Defaults to 'eddy'.
Returns
-------
list of ProcessingResult
Results for all processed files.
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Find all matching files
files = sorted(input_dir.rglob(pattern))
if not files:
self.logger.warning(f"No files found matching '{pattern}' in {input_dir}")
return []
self.logger.info(f"Found {len(files)} files to process")
results = []
for i, file in enumerate(files, 1):
self.logger.info(f"\n{'='*60}")
self.logger.info(f"File {i}/{len(files)}: {file.name}")
self.logger.info(f"{'='*60}")
result = self.process_file(
input_file=file,
output_dir=output_dir,
data_type=data_type
)
results.append(result)
# Generate batch summary
self._log_batch_summary(results)
self._save_batch_summary(results, output_dir)
return results
[docs]
def process_station(
self,
site_id: str,
input_dir: Union[str, Path],
output_dir: Union[str, Path],
data_types: List[str] = ['eddy', 'met'],
) -> Dict[str, ProcessingResult]:
"""
Process all data types for a single station.
Parameters
----------
site_id : str
Station identifier (e.g., 'US-UTW').
input_dir : str or Path
Directory containing input files.
output_dir : str or Path
Directory for output files.
data_types : list of str, optional
Data types to process. Defaults to ['eddy', 'met'].
Returns
-------
dict
Dictionary mapping data_type to ProcessingResult.
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
self.logger.info(f"\n{'#'*60}")
self.logger.info(f"Processing Station: {site_id}")
self.logger.info(f"{'#'*60}")
results = {}
for data_type in data_types:
# Find files for this data type
if data_type == 'eddy':
pattern = f"*{site_id}*AmeriFlux*.dat"
else:
pattern = f"*{site_id}*Statistics*.dat"
files = list(input_dir.rglob(pattern))
if not files:
self.logger.warning(f"No {data_type} files found for {site_id}")
continue
# Process the most recent file (or combine if needed)
file = max(files, key=lambda p: p.stat().st_mtime)
result = self.process_file(
input_file=file,
site_id=site_id,
output_dir=output_dir,
data_type=data_type
)
results[data_type] = result
return results
# -------------------------------------------------------------------------
# Private Helper Methods
# -------------------------------------------------------------------------
def _reformat_data(
self,
df: pd.DataFrame,
site_id: str,
data_type: str
) -> Tuple[pd.DataFrame, pd.DataFrame, Optional[Dict]]:
"""Reformat and clean data using Reformatter."""
# Try to load site configuration
try:
reformatter = create_reformatter_from_site(
site_id=site_id,
check_timestamps=self.config.check_timestamps,
drop_soil=self.config.drop_soil,
var_limits_csv=self.config.var_limits_csv
)
except (FileNotFoundError, KeyError):
self.logger.warning(
f"Could not load config for {site_id}, using defaults"
)
reformatter = Reformatter(
check_timestamps=False, # Can't check without site config
drop_soil=self.config.drop_soil,
var_limits_csv=self.config.var_limits_csv
)
# Process the data
df_clean, limits_report, ts_results = reformatter.process(
df, data_type=data_type
)
return df_clean, limits_report, ts_results
def _generate_reports(
self,
df: pd.DataFrame,
limits_report: pd.DataFrame,
ts_results: Optional[Dict],
site_id: str
) -> Dict[str, pd.DataFrame]:
"""Generate QA/QC and validation reports."""
reports = {}
# Limits report (already generated)
reports['limits'] = limits_report
# Gap summary
if isinstance(df.index, pd.DatetimeIndex):
try:
# Create temporary multi-index for gap_summary
df_temp = df.copy()
df_temp['STATIONID'] = site_id
df_temp['DATETIME_END'] = df_temp.index
df_temp = df_temp.set_index(['STATIONID', 'DATETIME_END'])
gaps = gap_summary.summarize_gaps(
df_temp,
expected_freq=self.config.expected_freq
)
reports['gaps'] = gaps
self.logger.info(f" Found {len(gaps)} gap periods")
except Exception as e:
self.logger.warning(f" Gap analysis failed: {e}")
# Flag validation
try:
flag_cols = [c for c in df.columns if c.endswith('_SSITC_TEST')]
if flag_cols:
invalid_flags = validate.validate_flags(df, flag_cols)
if invalid_flags:
reports['invalid_flags'] = pd.DataFrame([
{'column': k, 'invalid_values': str(v)}
for k, v in invalid_flags.items()
])
self.logger.warning(f" Found invalid flags in {len(invalid_flags)} columns")
except Exception as e:
self.logger.warning(f" Flag validation failed: {e}")
# Timestamp consistency
try:
ts_inconsistencies = validate.validate_timestamp_consistency(df)
if not ts_inconsistencies.empty:
reports['timestamp_inconsistencies'] = ts_inconsistencies
self.logger.warning(f" Found {len(ts_inconsistencies)} timestamp inconsistencies")
except Exception as e:
self.logger.warning(f" Timestamp validation failed: {e}")
# Timestamp alignment (if checked)
if ts_results:
if 'summary' in ts_results:
reports['timestamp_alignment'] = ts_results['summary']
if 'flags' in ts_results and ts_results['flags']:
self.logger.warning(f" Timestamp alignment issues detected: {list(ts_results['flags'].keys())}")
return reports
def _generate_plots(
self,
df: pd.DataFrame,
site_id: str,
output_dir: Path
) -> None:
"""Generate diagnostic plots."""
plot_dir = output_dir / 'plots' / site_id
plot_dir.mkdir(parents=True, exist_ok=True)
# Add plot generation here as needed
# Example: energy balance, time series, etc.
self.logger.debug(f" Plots would be saved to {plot_dir}")
def _save_output(
self,
df: pd.DataFrame,
site_id: str,
output_dir: Path,
data_type: str
) -> Path:
"""Save processed data to file."""
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{site_id}_{data_type}_processed_{timestamp}"
if self.config.output_format == 'csv':
output_file = output_dir / f"{filename}.csv"
df.to_csv(output_file, index=True)
elif self.config.output_format == 'parquet':
output_file = output_dir / f"{filename}.parquet"
df.to_parquet(output_file, index=True)
elif self.config.output_format == 'feather':
output_file = output_dir / f"{filename}.feather"
df.reset_index().to_feather(output_file)
else:
raise ValueError(f"Unknown output format: {self.config.output_format}")
self.logger.info(f" Saved to {output_file}")
return output_file
def _save_reports(
self,
reports: Dict[str, pd.DataFrame],
site_id: str,
output_dir: Path
) -> None:
"""Save QA/QC reports to files."""
report_dir = output_dir / 'reports' / site_id
report_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
for name, report_df in reports.items():
if isinstance(report_df, pd.DataFrame) and not report_df.empty:
filename = report_dir / f"{name}_{timestamp}.csv"
report_df.to_csv(filename, index=False)
self.logger.debug(f" Saved {name} report to {filename}")
def _extract_site_id(self, filepath: Path) -> str:
"""Extract site ID from filename."""
# Look for US-XXX pattern
name = filepath.stem
for part in name.split('_'):
if part.startswith('US-'):
return part
# Fallback
self.logger.warning(f"Could not extract site_id from {filepath.name}, using 'UNKNOWN'")
return 'UNKNOWN'
def _log_batch_summary(self, results: List[ProcessingResult]) -> None:
"""Log summary statistics for batch processing."""
n_total = len(results)
n_success = sum(1 for r in results if r.success)
n_failed = n_total - n_success
total_input = sum(r.n_records_input for r in results)
total_output = sum(r.n_records_output for r in results)
total_time = sum(r.processing_time for r in results)
self.logger.info(f"\n{'='*60}")
self.logger.info("BATCH PROCESSING SUMMARY")
self.logger.info(f"{'='*60}")
self.logger.info(f"Total files: {n_total}")
self.logger.info(f"Successful: {n_success} ({n_success/n_total*100:.1f}%)")
self.logger.info(f"Failed: {n_failed}")
self.logger.info(f"Total records: {total_input:,} → {total_output:,}")
self.logger.info(f"Total time: {total_time:.2f}s")
self.logger.info(f"{'='*60}")
if n_failed > 0:
self.logger.warning("\nFailed files:")
for r in results:
if not r.success:
self.logger.warning(f" {r.site_id}: {r.error_message}")
def _save_batch_summary(
self,
results: List[ProcessingResult],
output_dir: Path
) -> None:
"""Save batch processing summary to JSON file."""
summary_file = output_dir / 'batch_summary.json'
summary = {
'timestamp': datetime.now().isoformat(),
'config': self.config.to_dict(),
'n_total': len(results),
'n_success': sum(1 for r in results if r.success),
'n_failed': sum(1 for r in results if not r.success),
'results': [r.to_dict() for r in results]
}
with open(summary_file, 'w') as f:
json.dump(summary, f, indent=2)
self.logger.info(f"\nBatch summary saved to {summary_file}")
# =============================================================================
# Convenience Functions
# =============================================================================
[docs]
def process_station(
site_id: str,
input_dir: Union[str, Path],
output_dir: Union[str, Path],
**kwargs
) -> Dict[str, ProcessingResult]:
"""
Convenience function to process a single station.
Parameters
----------
site_id : str
Station identifier.
input_dir : str or Path
Input directory.
output_dir : str or Path
Output directory.
**kwargs
Additional arguments passed to Pipeline constructor.
Returns
-------
dict
Processing results for each data type.
"""
pipeline = Pipeline(**kwargs)
return pipeline.process_station(site_id, input_dir, output_dir)
[docs]
def batch_process(
input_dir: Union[str, Path],
output_dir: Union[str, Path],
**kwargs
) -> List[ProcessingResult]:
"""
Convenience function for batch processing.
Parameters
----------
input_dir : str or Path
Input directory.
output_dir : str or Path
Output directory.
**kwargs
Additional arguments passed to Pipeline constructor.
Returns
-------
list of ProcessingResult
Results for all processed files.
"""
pipeline = Pipeline(**kwargs)
return pipeline.batch_process(input_dir, output_dir)
# =============================================================================
# Command-Line Interface
# =============================================================================
[docs]
def main():
"""Command-line interface for the pipeline."""
parser = argparse.ArgumentParser(
description='Process micrometeorological data with Micromet',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Process a single file
python -m micromet.pipeline --input data/US-UTW_Flux.dat --output results/
# Process a single station (all data types)
python -m micromet.pipeline --site US-UTW --input data/ --output results/
# Batch process all files
python -m micromet.pipeline --batch --input data/ --output results/
# With custom settings
python -m micromet.pipeline --site US-UTW --input data/ --output results/ \\
--no-timestamp-check --keep-soil --format parquet
"""
)
# Input/output
parser.add_argument('--input', '-i', required=True,
help='Input file or directory')
parser.add_argument('--output', '-o', required=True,
help='Output directory')
# Processing mode
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument('--site', '-s',
help='Process specific station (e.g., US-UTW)')
mode_group.add_argument('--batch', '-b', action='store_true',
help='Batch process all files in directory')
# Data type
parser.add_argument('--data-type', '-t', choices=['eddy', 'met'], default='eddy',
help='Data type to process (default: eddy)')
# Configuration options
parser.add_argument('--no-timestamp-check', action='store_true',
help='Disable timestamp alignment analysis (faster)')
parser.add_argument('--keep-soil', action='store_true',
help='Keep all soil sensor columns')
parser.add_argument('--no-reports', action='store_true',
help='Skip generating QA/QC reports')
parser.add_argument('--plots', action='store_true',
help='Generate diagnostic plots')
parser.add_argument('--format', choices=['csv', 'parquet', 'feather'],
default='csv', help='Output file format (default: csv)')
# Logging
parser.add_argument('--verbose', '-v', action='store_true',
help='Verbose output (DEBUG level)')
parser.add_argument('--quiet', '-q', action='store_true',
help='Quiet output (WARNING level only)')
args = parser.parse_args()
# Set up logging
level = logging.INFO
if args.verbose:
level = logging.DEBUG
elif args.quiet:
level = logging.WARNING
logging.basicConfig(
level=level,
format='%(levelname)s [%(asctime)s] %(name)s – %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Create pipeline configuration
config = PipelineConfig(
check_timestamps=not args.no_timestamp_check,
drop_soil=not args.keep_soil,
generate_reports=not args.no_reports,
generate_plots=args.plots,
output_format=args.format
)
# Create pipeline
pipeline = Pipeline(config=config)
# Run appropriate processing mode
input_path = Path(args.input)
output_path = Path(args.output)
if args.site:
# Process single station
results = pipeline.process_station(
site_id=args.site,
input_dir=input_path,
output_dir=output_path
)
elif args.batch or input_path.is_dir():
# Batch process
results = pipeline.batch_process(
input_dir=input_path,
output_dir=output_path,
data_type=args.data_type
)
else:
# Process single file
result = pipeline.process_file(
input_file=input_path,
output_dir=output_path,
data_type=args.data_type
)
print("\n" + result.summary())
if __name__ == '__main__':
main()