import pandas as pd
import numpy as np
[docs]
def correct_vars_by_factor(df, correction_factor=0.05/0.16,
vars_to_correct=['SG_1_1_1','SG_2_1_1'],
min_correction_date='2010-01-01',
max_correction_date='2030-01-01'):
"""
Applies a multiplicative correction factor to specified variables within a
defined time window.
The default min and max correction dates are intended to correct the full
range of values.
Parameters
----------
df : pd.DataFrame
The input DataFrame, expected to have a DatetimeIndex.
correction_factor : float, optional
The factor by which the variables should be multiplied.
Default is 0.05 / 0.16.
vars_to_correct : list of str, optional
List of column names to apply the correction to.
min_correction_date : str, optional
Start date (inclusive) for the correction window.
max_correction_date : str, optional
End date (inclusive) for the correction window.
Returns
-------
pd.DataFrame
A new DataFrame with the specified columns corrected within the date
range.
"""
df_out = df.copy()
min_correction_date = pd.to_datetime(min_correction_date)
max_correction_date = pd.to_datetime(max_correction_date)
mask = (df_out.index >= min_correction_date) & (
df_out.index<=max_correction_date)
df_out.loc[mask, vars_to_correct] = df_out.loc[
mask, vars_to_correct]*correction_factor
return(df_out)
[docs]
def apply_limits_to_vars(df, limit_check_vars, limits):
"""
Sets values in specified columns that fall outside a given [min, max] range
to NaN.
Parameters
----------
df : pd.DataFrame
The input DataFrame.
limit_check_vars : list of str
List of column names to apply the limits to.
limits : list or tuple
A two-element sequence [min_value, max_value].
Returns
-------
pd.DataFrame
A new DataFrame with out-of-range values set to NaN.
"""
df_out = df.copy()
for col in limit_check_vars:
mask = (df_out[col]<limits[0]) | (df_out[col]>limits[1])
df_out.loc[mask, col] = np.nan
print(f'{mask.sum()} values dropped from {col} b/c value out of range')
return(df_out)
[docs]
def calculate_new_g_value(df, plate_num):
"""
Calculates the new G value (G_{plate_num}__1_1) by summing the G_PLATE and SG components.
Note: The sum operation automatically results in NaN if either source value is NaN.
Parameters
----------
df : pd.DataFrame
The input DataFrame.
plate_num : str
The plate number (e.g., '1' or '2') used to construct column names.
Returns
-------
pd.DataFrame
A new DataFrame with the calculated G value.
"""
df_out = df.copy()
col_g = f'G_{plate_num}_1_1'
col_g_plate = f'G_PLATE_{plate_num}_1_1'
col_sg = f'SG_{plate_num}_1_1'
# Mask identifies records where G will become missing because G_PLATE or SG is missing,
# but G currently has a value. This is purely for the print statement.
mask = ((df_out[col_g_plate].isna()) | (df_out[col_sg].isna())) & (
df_out[col_g].notna()
)
print(f'{mask.sum()} new records will having missing G values b/c SG or G_PLATE missing')
# Calculate the new G value. The addition automatically propagates NaNs if either
# source is NaN, which is generally the desired behavior.
df_out[col_g] = df_out[col_g_plate] + df_out[col_sg]
return df_out
[docs]
def calc_mean_value_for_soil(df, var='G'):
"""
Calculates the mean of two related variables (var_1_1_1 and var_2_1_1)
and stores the result in a third variable (var_1_1_A).
Parameters
----------
df : pd.DataFrame
The input DataFrame.
var : str, optional
The variable prefix (e.g., 'G', 'SG'). Default is 'G'.
Returns
-------
pd.DataFrame
A new DataFrame with the calculated mean value in the 'var_1_1_A' column.
"""
df_out = df.copy()
col1 = f'{var}_1_1_1'
col2 = f'{var}_2_1_1'
col_mean = f'{var}_1_1_A'
mask_print = ((df_out[col1].isna()) | (df_out[col2].isna())) & (
~df_out[col_mean].isna()
)
print(f'{mask_print.sum()} new records will having missing Mean {var} b/c {var}1 or {var}2 is missing')
# Initialize the mean column to NaN (to clear previous values)
df_out[col_mean] = np.nan
# Calculation mask: only calculate the mean where BOTH inputs are NOT NaN
calc_mask = (~df_out[col1].isna()) & (~df_out[col2].isna())
# Apply the calculation only to the rows where both inputs are valid
df_out.loc[calc_mask, col_mean] = (df_out[col1] + df_out[col2]) / 2
return df_out
[docs]
def run_soil_data_pipeline(df_input,
sg_correction_factor=0.05/0.16,
sg_limits=[-100, 250],
g_limits=[-250, 400]):
"""
Executes the full, seven-step data processing pipeline for soil data.
The steps include:
1. Applying correction factor to SG variables.
2. Applying limits/quality control to SG variables.
3. Calculating G values for plate 1 and plate 2 based on SG plus .
4. Applying limits/quality control to calculated G variables.
5. Calculating the mean G value (G_1_1_A).
6. Applying limits/quality control to the mean G variable.
Parameters
----------
df_input : pd.DataFrame
The initial input DataFrame (e.g., 'final_eddy').
sg_correction_factor : float
Correction factor for SG variables.
sg_limits : list or tuple
Min/max limits for SG variables.
g_limits : list or tuple
Min/max limits for G variables (G_1_1_1, G_2_1_1, G_1_1_A).
Returns
-------
pd.DataFrame
The final processed DataFrame.
"""
print("--- Starting G Fix Data Pipeline ---")
# STEP 1: Apply correction factor (SG variables)
print("Step 1: SG correction applied.")
temp1 = correct_vars_by_factor(
df_input,
correction_factor=sg_correction_factor,
vars_to_correct = ['SG_1_1_1', 'SG_2_1_1', 'SG_1_1_A']
)
# STEP 2: Apply limits to corrected SG variables
print('\n')
print("Step 2: SG limits applied.")
temp2 = apply_limits_to_vars(
temp1,
limit_check_vars=['SG_1_1_1', 'SG_2_1_1', 'SG_1_1_A'],
limits=sg_limits
)
# STEP 3 & 4: Calculate new G values for plate 1 and plate 2
print('\n')
print("Step 3 & 4: G values calculated (G_1_1_1 and G_2_1_1).")
temp3 = calculate_new_g_value(temp2, '1')
temp4 = calculate_new_g_value(temp3, '2')
# STEP 5: Apply limits to the new G variables
print('\n')
print("Step 5: G limits applied to individual plates.")
temp5 = apply_limits_to_vars(
temp4,
limit_check_vars=['G_1_1_1', 'G_2_1_1'],
limits=g_limits
)
# STEP 6: Calculate the mean G value
print('\n')
print("Step 6: Mean G value (G_1_1_A) calculated.")
temp6 = calc_mean_value_for_soil(temp5, 'G')
# STEP 7: Apply limits to the mean G variable
print('\n')
print("Step 7: Final mean G limits applied.")
temp7 = apply_limits_to_vars(
temp6,
limit_check_vars=['G_1_1_A'],
limits=g_limits
)
print("--- Pipeline Finished ---")
return temp7