Source code for ai_cdss.data_processor

# ai_cdss/data_processor.py
from typing import List, Optional
from functools import reduce
from dataclasses import dataclass

import pandas as pd
from pandas import Timestamp
import pandera as pa
from pandera.typing import DataFrame

import os
import logging

import importlib.resources
from typing import Optional
from pathlib import Path

import numpy as np
from sklearn.linear_model import TheilSenRegressor
from scipy.signal import savgol_filter

from ai_cdss import config
from ai_cdss.utils import MultiKeyDict
from ai_cdss.constants import *
from ai_cdss.models import ScoringSchema, PPFSchema, SessionSchema, TimeseriesSchema

import logging
from IPython.display import display


# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

#################################
# ------ Data Processing ------ #
#################################

# ---------------------------------------------------------------------
# Data Processor Class

@dataclass
class ProcessingContext:
    scoring_date: Timestamp = None
    simulation_mode: bool = False
    additional_flags: dict = None

[docs] class DataProcessor: """ A class for processing data and computing a final weighted score. The final score is computed as: .. math:: S = \\alpha \\cdot A + \\beta \\cdot DM + \\gamma \\cdot PPF where: - :math:`A` is Adherence - :math:`DM` is the Difficulty Modulator - :math:`PPF` is the Patient Prescription Factor Parameters ---------- weights : List[float] List of weights :math:`\\alpha`, :math:`\\beta`, :math:`\\gamma` for computing the final score. alpha : float The smoothing factor for EWMA, controlling how much past values influence the trend. """ def __init__( self, weights: List[float] = [1,1,1], alpha: float = 0.5, context: Optional[ProcessingContext] = None ): """ Initialize the data processor with optional weights for scoring. """ self.weights = weights self.alpha = alpha self.context = context or ProcessingContext() # @pa.check_types
[docs] def process_data( self, session_data: DataFrame[SessionSchema], timeseries_data: DataFrame[TimeseriesSchema], ppf_data: DataFrame[PPFSchema], init_data: pd.DataFrame ) -> DataFrame[ScoringSchema]: """ Process and score patient-protocol combinations using session, timeseries, and PPF data. When bootstrapping patient, score is based on PPF only. Otherwise a weighted sum of DELTA_DM, ADHERENCE_RECENT and PPF determines the score. Parameters ---------- session_data : DataFrame[SessionSchema] Session-level data including adherence and scheduling information. timeseries_data : DataFrame[TimeseriesSchema] Timepoint-level data including DMs and performance metrics. ppf_data : DataFrame[PPFSchema] Patient-protocol fitness values and contributions. Returns ------- DataFrame[ScoringSchema] Final scored dataframe with protocol recommendations. """ # --- Log-Level Output --- ## PATIENT_ID + PROTOCOL_ID + SESSION_ID + ADHERENCE + RECENT_ADHERENCE + USAGE + DAYS + DM_VALUE + DELTA_DM_VALUE + PE_VALUE + PPF + SCORE scoring_date = self._get_scoring_date() print(f"scoring {scoring_date}") # Filter sessions in study range # --- Feature Building --- if not session_data.empty and not timeseries_data.empty: # Compute Session Features dm_df = self.build_delta_dm(timeseries_data) # DELTA_DM adherence_df = self.build_recent_adherence(session_data) # ADHERENCE_RECENT usage_df = self.build_usage(session_data) # USAGE usage_week_df = self.build_week_usage(session_data, scoring_date=scoring_date) # USAGE_WEEK days_df = self.build_prescription_days(session_data, scoring_date=scoring_date) # DAYS # Combine Session Features feat_pp_df = reduce(lambda l, r: pd.merge(l, r, on=BY_PP, how='left'), [ppf_data, usage_df, usage_week_df, days_df]) feat_pps_df = reduce(lambda l, r: pd.merge(l, r, on=BY_PPS, how='left'), [adherence_df, dm_df]) # Store the whole scored DataFrame as a csv # feat_df.to_csv(DEFAULT_LOG_DIR / "scored_features.csv", index=False) # Aggregate to patient protocol level feat_agg = feat_pps_df.groupby(BY_PP).agg("last").reset_index() scoring_input = reduce(lambda l, r: pd.merge(l, r, on=BY_PP, how='left'), [feat_pp_df, feat_agg]) # If we are bootstrapping a study, and no patient prescriptions yet else: # Initialize feature df with expected columns scoring_columns = BY_PP + [DELTA_DM, RECENT_ADHERENCE, USAGE, USAGE_WEEK, DAYS] feat_agg = pd.DataFrame(columns=scoring_columns) # Merge for scoring scoring_input = reduce(lambda l, r: safe_merge(l, r, on=BY_PP), [ ppf_data, feat_agg ]) # --- Scoring Data --- scored_df = self.compute_score(scoring_input, init_data) scored_df.attrs = ppf_data.attrs return scored_df[BY_PP + FINAL_METRICS]
[docs] def compute_score(self, data, protocol_metrics): """ Initializes metrics based on legacy data and computes score Parameters ---------- data : pd.DataFrame Aggregated session and timeseries data per patient-protocol. Returns ------- pd.DataFrame Scored DataFrame sorted by patient and protocol. """ # Initialize missing values data = self._init_metrics(data, protocol_metrics) # Compute objective function score alpha*Adherence + beta*DM + gamma*PPF score = self._compute_score(data) # Sort the output dataframe score.sort_values(by=BY_PP, inplace=True) return score
def _init_metrics(self, data: pd.DataFrame, protocol_metrics: pd.DataFrame) -> pd.DataFrame: """ Fill missing protocol-level metrics with zeros. Parameters ---------- data : pd.DataFrame Dataframe with potentially missing features. Returns ------- pd.DataFrame Safe dataframe with no NaNs. """ # When no prescriptions add empty list instead of NaN data[DAYS] = data[DAYS].apply(lambda x: [] if x is None or (not isinstance(x, list) and pd.isna(x)) else x) # When no sessions performed add a 0 data[USAGE] = data[USAGE].astype("Int64").fillna(0) data[USAGE_WEEK] = data[USAGE_WEEK].astype("Int64").fillna(0) # How to initliaze DM and Adherence? -> Later weeks? return data def _get_scoring_date(self): return self.context.scoring_date or pd.Timestamp.today().normalize() def _compute_ewma(self, df, value_col, group_cols, sufix=""): """ Compute Exponential Weighted Moving Average (EWMA) over grouped time series. Parameters ---------- df : pd.DataFrame DataFrame containing time-series values. value_col : str The column for which EWMA should be calculated. group_cols : list of str Columns defining the group over which to apply the EWMA. Returns ------- pd.DataFrame DataFrame with EWMA column replacing the original. """ return df.assign( **{f"{value_col}{sufix}": df.groupby(by=group_cols)[value_col].transform(lambda x: x.ewm(alpha=self.alpha, adjust=True).mean())} ) def _compute_score(self, scoring: pd.DataFrame) -> pd.DataFrame: """ Compute the final score based on adherence, DM, and PPF values. Applies the weighted scoring formula: .. math:: S = \\alpha \\cdot A + \\beta \\cdot DM + \\gamma \\cdot PPF Parameters ---------- scoring : pd.DataFrame DataFrame containing columns: ADHERENCE, DM_VALUE, PPF. Returns ------- pd.DataFrame DataFrame with an added SCORE column. """ scoring_df = scoring.copy() # weighted nan-compativle sum of factors scoring_df[SCORE] = ( scoring_df[RECENT_ADHERENCE].astype("float64").fillna(0.) * self.weights[0] + scoring_df[DELTA_DM].astype("float64").fillna(0.) * self.weights[1] + scoring_df[PPF].astype("float64").fillna(0.) * self.weights[2] ) return scoring_df def build_delta_dm(self, ts_df: DataFrame[TimeseriesSchema]) -> pd.DataFrame: grouped = ts_df.groupby(BY_PPS).agg({DM_VALUE: "mean"}).reset_index() # Session index per patient protocol grouped['SESSION_INDEX'] = grouped.groupby(BY_PP).cumcount() + 1 # Compute smoothing grouped['DM_SMOOTH'] = grouped.groupby(BY_PP)[DM_VALUE].transform( apply_savgol_filter_groupwise, SAVGOL_WINDOW_SIZE, SAVGOL_POLY_ORDER ) # Compute slope grouped[DELTA_DM] = delta_slope = grouped.groupby(BY_PP)['DM_SMOOTH'].transform( lambda g: get_rolling_theilsen_slope( g, grouped.loc[g.index, 'SESSION_INDEX'], THEILSON_REGRESSION_WINDOW_SIZE ) ).fillna(0) return grouped[BY_PPS + [DM_VALUE, DELTA_DM]]
[docs] def build_recent_adherence(self, session_df: DataFrame[SessionSchema]) -> pd.DataFrame: """ This feature builder must return adherence for patient protocols, with the following considerations: - Adherence computed as SESSION_TIME / PRESCRIBED_SESSION_TIME - Adherence is considered 0 if presribed session is not performed - BUT is not considered if no session was performed that day. Additionally a recency bias is applied. """ df = session_df.copy() # Includes prescribed but not performed sessions df = include_missing_sessions(df) # Include same day logic do not penalize def day_skip_to_nan(group): # Check all STATUS == NOT_PERFORMED day_skipped = all(group['STATUS'] == 'NOT_PERFORMED') # EWMA does not use nan values for computation if day_skipped: group['ADHERENCE'] = np.nan return group df = df.groupby(by=[PATIENT_ID, 'SESSION_DATE'], group_keys=False).apply(day_skip_to_nan) df = df.sort_values(by=BY_PP + ['SESSION_DATE', 'WEEKDAY_INDEX']) df['SESSION_INDEX'] = df.groupby(BY_PP).cumcount() + 1 # For a given patient protocol, take the array of adherences and mean aggregate with recency bias. df = self._compute_ewma(df, ADHERENCE, BY_PP, sufix="_RECENT") return df[BY_PPS + ['SESSION_DATE', 'STATUS', 'SESSION_INDEX', ADHERENCE, RECENT_ADHERENCE]]
[docs] def build_usage(self, session_df: DataFrame[SessionSchema]) -> pd.DataFrame: """ This feature builder must return how many times protocols are used in this format: PATIENT_ID PROTOCOL_ID USAGE 12 220 2 12 231 1 12 233 0 Protocols that have not been yet prescribed for a patient are not returned. """ df = session_df.copy() return ( df.groupby([PATIENT_ID, PROTOCOL_ID], dropna=False)[SESSION_ID] .nunique() .reset_index(name=USAGE) .astype({USAGE: "Int64"}) )
[docs] def build_week_usage(self, session_df: DataFrame[SessionSchema], scoring_date: Timestamp = None) -> pd.DataFrame: """ This feature builder must return how many times protocols are used in this week in this format: PATIENT_ID PROTOCOL_ID USAGE_WEEK 12 220 2 12 231 1 12 233 0 Protocols that have not been yet prescribed for a patient are not returned. """ df = session_df.copy() # Compute current week's Monday 00:00 and Sunday 23:59:59.999999 week_start = scoring_date - pd.Timedelta(days=scoring_date.weekday()) # Monday 00:00 week_start = week_start.normalize() week_end = week_start + pd.Timedelta(days=7) # Next Monday 00:00 # Filter to sessions in the given week range df = df[(df[SESSION_DATE] >= week_start) & (df[SESSION_DATE] < week_end)] # Group by patient and protocol, count unique sessions return ( df.groupby([PATIENT_ID, PROTOCOL_ID], dropna=False)[SESSION_ID] .nunique() .reset_index(name=USAGE_WEEK) .astype({USAGE_WEEK: "Int64"}) )
[docs] def build_prescription_days(self, session_df: DataFrame[SessionSchema], scoring_date: Timestamp = None) -> pd.DataFrame: """ This feature builder must return active prescriptions signaled as: PRESCRIPTION_ENDING_DATE == 2100-01-01 00:00:00 In the following format, (encoding weekdays from 0-6): PATIENT_ID PROTOCOL_ID DAYS 12 220 [2] 12 233 [0] """ # If no scoring date is given, use today's date at 00:00 week_start = scoring_date - pd.Timedelta(days=scoring_date.weekday()) # Monday 00:00 week_start = week_start.normalize() # Filter activities where the prescription is still active in this week. # i.e., prescriptions whose ending date is on or after the start of this week active_prescriptions = session_df[session_df[PRESCRIPTION_ENDING_DATE] > week_start] # Group by patient and protocol, collect all unique weekday indices (0–6) where active prescriptions occurred prescribed_days = ( active_prescriptions .groupby(BY_PP)[WEEKDAY_INDEX] .agg(lambda x: sorted(x.unique())) .rename(DAYS) .reset_index() ) return prescribed_days
# ------------------------------ # Clinical Scores
[docs] class ClinicalSubscales: def __init__(self, scale_yaml_path: Optional[str] = None): """Initialize with an optional path to scale.yaml, defaulting to internal package resource.""" # Retrieves max values for clinical subscales from config/scales.yaml if scale_yaml_path: self.scales_path = Path(scale_yaml_path) else: self.scales_path = importlib.resources.files(config) / "scales.yaml" if not self.scales_path.exists(): raise FileNotFoundError(f"Scale YAML file not found at {self.scales_path}") # Load scales maximum values self.scales_dict = MultiKeyDict.from_yaml(self.scales_path)
[docs] def compute_deficit_matrix(self, patient_df: pd.DataFrame) -> pd.DataFrame: """Compute deficit matrix given patient clinical scores.""" # Retrieve max values using MultiKeyDict max_subscales = [self.scales_dict.get(scale, None) for scale in patient_df.columns] # Check for missing subscale values if None in max_subscales: missing_subscales = [scale for scale, max_val in zip(patient_df.columns, max_subscales) if max_val is None] raise ValueError(f"Missing max values for subscales: {missing_subscales}") # Compute deficit matrix deficit_matrix = 1 - (patient_df / pd.Series(max_subscales, index=patient_df.columns)) return deficit_matrix
# ------------------------------ # Protocol Attributes
[docs] class ProtocolToClinicalMapper: def __init__(self, mapping_yaml_path: Optional[str] = None): """Initialize with an optional path to scale.yaml, defaulting to internal package resource.""" if mapping_yaml_path: self.mapping_path = Path(mapping_yaml_path) else: self.mapping_path = importlib.resources.files(config) / "mapping.yaml" if not self.mapping_path.exists(): raise FileNotFoundError(f"Scale YAML file not found at {self.mapping_path}") # logger.info(f"Loading subscale max values from: {self.scales_path}") self.mapping = MultiKeyDict.from_yaml(self.mapping_path)
[docs] def map_protocol_features(self, protocol_df: pd.DataFrame, agg_func=np.mean) -> pd.DataFrame: """Map protocol-level features into clinical scales using a predefined mapping.""" # Retrieve max values using MultiKeyDict df_clinical = pd.DataFrame(index=protocol_df.index) # Collapse using agg_func the protocol latent attributes for clinical_scale, features in self.mapping.items(): df_clinical[clinical_scale] = protocol_df[features].apply(agg_func, axis=1) df_clinical.index = protocol_df["PROTOCOL_ID"] return df_clinical
# --------------------------------------------------------------- # Adherence # --------------------------------------------------------------- def include_missing_sessions(session: pd.DataFrame): """ For each prescription, generate expected sessions (based on weekday, start and end dates), and merge them with actual performed sessions. Sessions that were expected but not performed are marked as NOT_PERFORMED. All date comparisons are done at day-level (time is ignored). """ # Normalize to date (drop time component) date_cols = ["SESSION_DATE", "PRESCRIPTION_STARTING_DATE", "PRESCRIPTION_ENDING_DATE"] for col in date_cols: session[col] = pd.to_datetime(session[col]).dt.normalize() # Get last performed session date per patient valid_sessions = session.dropna(subset=["SESSION_DATE"]) last_session_per_patient = ( valid_sessions.groupby("PATIENT_ID")["SESSION_DATE"] .max() .to_dict() ) # Get exisiting prescriptions prescriptions = session.drop_duplicates( subset=[ "PRESCRIPTION_ID", "PATIENT_ID", "PROTOCOL_ID", "PRESCRIPTION_STARTING_DATE", "PRESCRIPTION_ENDING_DATE", "WEEKDAY_INDEX" ] ) # Generate expected session dates expected_session_rows = [] for _, row in prescriptions.iterrows(): patient_id = row['PATIENT_ID'] start = row["PRESCRIPTION_STARTING_DATE"] end = row["PRESCRIPTION_ENDING_DATE"] weekday = row["WEEKDAY_INDEX"] # Safety if pd.isna(start) or pd.isna(end) or pd.isna(weekday): continue # Cap at last performed session for the patient last_session = last_session_per_patient.get(patient_id, pd.Timestamp.today().normalize()) # If the prescription end date is in the future, use today as the end limit (assuming future sessions are not yet done) end = min(end, last_session) # Generate expected session dates for this prescription expected_dates = generate_expected_sessions(start, end, int(weekday)) # Should return date-like list # Fill rows with NOT_PERFORMED status for session_date in expected_dates: row_dict = { **row.to_dict(), "SESSION_DATE": pd.to_datetime(session_date).normalize(), "STATUS": "NOT_PERFORMED", "ADHERENCE": 0.0, "SESSION_DURATION": 0, "REAL_SESSION_DURATION": 0, } # Overwrite session metric columns with NaN row_dict.update({col: np.nan for col in SESSION_COLUMNS}) expected_session_rows.append(row_dict) expected_df = pd.DataFrame(expected_session_rows) if expected_df.empty: return session # Filter out already performed sessions performed_index = pd.MultiIndex.from_frame(valid_sessions[["PRESCRIPTION_ID", "SESSION_DATE"]]) expected_index = pd.MultiIndex.from_frame(expected_df[["PRESCRIPTION_ID", "SESSION_DATE"]]) # Identify expected sessions that were not actually performed # (i.e., keep only those not present in the performed session index) mask = ~expected_index.isin(performed_index) expected_df = expected_df.loc[mask] # Merge performed sessions with expected sessions session_all = pd.concat([valid_sessions, expected_df], ignore_index=True) return session_all.sort_values(by=["PATIENT_ID", "PRESCRIPTION_ID", "SESSION_DATE"]).reset_index(drop=True) def generate_expected_sessions(start: Timestamp, end: Timestamp, weekday: int) -> List[Timestamp]: """ Generate session dates between start and end for the given weekday index. Weekday: 0=Monday, 1=Tuesday, ..., 6=Sunday """ weekday_map = { 0: 'W-MON', 1: 'W-TUE', 2: 'W-WED', 3: 'W-THU', 4: 'W-FRI', 5: 'W-SAT', 6: 'W-SUN', } freq = weekday_map.get(weekday) if freq is None: return [] return list(pd.date_range(start=start, end=end, freq=freq)) # --------------------------------------------------------------- # Delta DM # --------------------------------------------------------------- def apply_savgol_filter_groupwise(series, window_size, polyorder): series_len = len(series) if series_len < polyorder + 1: return series window = min(window_size, series_len) if window <= polyorder: window = polyorder + 1 if window > series_len: return series if window % 2 == 0: window -= 1 if window <= polyorder : return series try: return savgol_filter(series, window_length=window, polyorder=polyorder) except ValueError: return series def get_rolling_theilsen_slope(series_y, series_x, window_size): slopes = pd.Series([np.nan] * len(series_y), index=series_y.index) if len(series_y) < 2 : return slopes regressor = TheilSenRegressor(random_state=42, max_subpopulation=1000) for i in range(len(series_y)): start_index = max(0, i - window_size + 1) window_y = series_y.iloc[start_index : i + 1] window_x = series_x.iloc[start_index : i + 1] if len(window_y) < 2: slopes.iloc[i] = 0.0 if len(window_y) == 1 else np.nan; continue if len(window_x.unique()) == 1 and len(window_y.unique()) > 1: slopes.iloc[i] = np.nan; continue if len(window_y.unique()) == 1: slopes.iloc[i] = 0.0; continue X_reshaped = window_x.values.reshape(-1, 1) try: regressor.fit(X_reshaped, window_y.values) slopes.iloc[i] = regressor.coef_[0] except Exception: slopes.iloc[i] = np.nan return slopes # --------------------------------------------------------------- # PPF # --------------------------------------------------------------- def feature_contributions(df_A, df_B): # Convert to numpy A = df_A.to_numpy() # (patients, subscales) B = df_B.to_numpy() # (protocols, subscales) # Compute row-wise norms A_norms = np.linalg.norm(A, axis=1, keepdims=True) # (patients, 1) B_norms = np.linalg.norm(B, axis=1, keepdims=True) # (protocols, 1) # Replace zero norms with a small value to avoid NaN (division by zero) A_norms[A_norms == 0] = 1e-10 B_norms[B_norms == 0] = 1e-10 # Normalize each row to unit vectors A_norm = A / A_norms # (patient, subscales) B_norm = B / B_norms # (protocol, subscales) # Compute feature contributions contributions = A_norm[:, np.newaxis, :] * B_norm[np.newaxis, :, :] # (patient, dim, subscales) * (dim, protocol, subscales) return contributions # (patients, protocols, subscales_sim) def compute_ppf(patient_deficiency, protocol_mapped): """ Compute the patient-protocol feature matrix (PPF) and feature contributions. """ contributions = feature_contributions(patient_deficiency, protocol_mapped) ppf = np.sum(contributions, axis=2) # (patients, protocols, cosine) ppf = pd.DataFrame(ppf, index=patient_deficiency.index, columns=protocol_mapped.index) contributions = pd.DataFrame(contributions.tolist(), index=patient_deficiency.index, columns=protocol_mapped.index) ppf_long = ppf.stack().reset_index() ppf_long.columns = ["PATIENT_ID", "PROTOCOL_ID", "PPF"] contrib_long = contributions.stack().reset_index() contrib_long.columns = ["PATIENT_ID", "PROTOCOL_ID", "CONTRIB"] return ppf_long, contrib_long def compute_protocol_similarity(protocol_mapped): """ Compute protocol similarity. """ import gower protocol_attributes = protocol_mapped.copy() protocol_ids = protocol_attributes.PROTOCOL_ID protocol_attributes.drop(columns="PROTOCOL_ID", inplace=True) hot_encoded_cols = protocol_attributes.columns.str.startswith("BODY_PART") weights = np.ones(len(protocol_attributes.columns)) weights[hot_encoded_cols] = weights[hot_encoded_cols] / hot_encoded_cols.sum() protocol_attributes = protocol_attributes.astype(float) gower_sim_matrix = gower.gower_matrix(protocol_attributes, weight=weights) gower_sim_matrix = pd.DataFrame(1- gower_sim_matrix, index=protocol_ids, columns=protocol_ids) gower_sim_matrix.columns.name = "PROTOCOL_SIM" gower_sim_matrix = gower_sim_matrix.stack().reset_index() gower_sim_matrix.columns = ["PROTOCOL_A", "PROTOCOL_B", "SIMILARITY"] return gower_sim_matrix # --------------------------------------------------------------- # Utils # --------------------------------------------------------------- # def check_session(session: pd.DataFrame) -> pd.DataFrame: # """ # Check for data discrepancies in session DataFrame, export findings to ~/.ai_cdss/logs/, # log summary, and return cleaned DataFrame. # Parameters # ---------- # session : pd.DataFrame # Session DataFrame to check and clean. # Returns # ------- # pd.DataFrame # Cleaned session DataFrame. # """ # # Patient registered but no data yet (no prescription) # patients_no_data = session[session["PRESCRIPTION_ID"].isna()] # if not patients_no_data.empty: # patients_no_data[["PATIENT_ID", "PRESCRIPTION_ID", "SESSION_ID"]].to_csv(export_file, index=False) # logger.warning(f"{len(patients_no_data)} patients found without prescription. Check exported file: {export_file}") # else: # logger.info("No patients without prescription found.") # # Drop these rows # session = session.drop(patients_no_data.index) # # Sessions in session table but not in recording table (no adherence) # patient_session_discrepancy = session[session["ADHERENCE"].isna()] # if not patient_session_discrepancy.empty: # export_file = os.path.join(log_dir, f"patient_session_discrepancy_{timestamp}.csv") # patient_session_discrepancy[["PATIENT_ID", "PRESCRIPTION_ID", "SESSION_ID"]].to_csv(export_file, index=False) # logger.warning(f"{len(patient_session_discrepancy)} sessions found without adherence. Check exported file: {export_file}") # else: # logger.info("No sessions without adherence found.") # # Drop these rows # session = session.drop(patient_session_discrepancy.index) # # Final info # logger.info(f"Session data cleaned. Final shape: {session.shape}") # return session def safe_merge( left: pd.DataFrame, right: pd.DataFrame, on, how: str = "left", export_dir: str = "~/.ai_cdss/logs/", left_name: str = "left", right_name: str = "right", ) -> pd.DataFrame: """ Perform a safe merge and independently report unmatched rows from left and right DataFrames. Parameters ---------- left : pd.DataFrame Left DataFrame. right : pd.DataFrame Right DataFrame. on : str or list Column(s) to join on. how : str, optional Type of merge to be performed. Default is "left". export_dir : str, optional Directory to export discrepancy reports and logs. left_name : str, optional Friendly name for the left DataFrame, for logging. right_name : str, optional Friendly name for the right DataFrame, for logging. drop_inconsistencies : bool, optional If True, drop inconsistent rows (left-only). Default is False. Returns ------- pd.DataFrame Merged DataFrame. """ # Prepare export directory export_dir = os.path.expanduser(export_dir) os.makedirs(export_dir, exist_ok=True) timestamp = pd.Timestamp.now() log_file = os.path.join(export_dir, "data_check.log") # Setup logger logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Outer merge for discrepancy check discrepancy_check = left.merge(right, on=on, how="outer", indicator=True) left_only = discrepancy_check[discrepancy_check["_merge"] == "left_only"] right_only = discrepancy_check[discrepancy_check["_merge"] == "right_only"] # Export and log discrepancies if found if not left_only.empty: export_file = os.path.join(export_dir, f"{left_name}_only_{timestamp}.csv") try: left_only[BY_ID + ["SESSION_DURATION", "SCORE", "DM_VALUE", "PE_VALUE"]].to_csv(export_file, index=False) except KeyError as e: left_only.to_csv(export_file, index=False) logger.warning( f"{len(left_only)} rows found only in '{left_name}' DataFrame " f"(see export: {export_file})" ) if not right_only.empty: export_file = os.path.join(export_dir, f"{right_name}_only_{timestamp}.csv") try: right_only[BY_PPS + ["SESSION_DURATION", "SCORE", "DM_VALUE", "PE_VALUE"]].to_csv(export_file, index=False) except KeyError as e: right_only.to_csv(export_file, index=False) logger.warning( f"{len(right_only)} rows from '{right_name}' DataFrame did not match '{left_name}' DataFrame " f"(see export: {export_file})" ) # Step 3: Actual requested merge merged = left.merge(right, on=on, how=how) return merged