Source code for ds_utils.metrics.time_series

"""Time series and forecasting metrics.

This module provides scalar metric functions for time-series forecasting,
financial modeling, and other use cases where trend direction and bias
are of primary interest.
"""

from typing import Optional

import numpy as np


[docs] def directional_accuracy_score( y_true: np.ndarray, y_pred: np.ndarray, baseline: Optional[np.ndarray] = None, sample_weight: Optional[np.ndarray] = None, handle_equal: str = "exclude", ) -> float: """Calculate the directional accuracy score. Directional accuracy (DA) measures the proportion of time steps for which the predicted direction of change matches the true direction of change, relative to a baseline. The formula is: DA = (1/n) * Σ I(sign(y_true - baseline) == sign(y_pred - baseline)) :param y_true: Ground truth (correct) target values. :param y_pred: Estimated target values. :param baseline: Baseline values to compare against. If None, uses the previous value of y_true (time-series mode). If provided, must have the same shape as y_true. :param sample_weight: Sample weights. :param handle_equal: How to treat samples where y_true == baseline. - 'exclude': Filter out these samples (default). - 'correct': Count as correct if y_pred == baseline, else incorrect. - 'incorrect': Always count as incorrect. :return: Directional accuracy score as a float. :raises ValueError: If handle_equal is invalid, if shapes mismatch, or if insufficient samples are provided for time-series mode. Time series example: >>> import numpy as np >>> y_true = np.array([100, 102, 98, 101, 99]) >>> y_pred = np.array([100.5, 103, 97, 102, 98]) >>> directional_accuracy_score(y_true, y_pred) 1.0 Custom baseline example: >>> y_true = np.array([102, 98, 101, 99, 102]) >>> baseline = np.array([100, 100, 100, 100, 100]) >>> y_pred = np.array([101, 99, 99, 101, 99]) >>> directional_accuracy_score(y_true, y_pred, baseline=baseline) 0.4 """ if handle_equal not in ["exclude", "correct", "incorrect"]: raise ValueError(f"handle_equal must be 'exclude', 'correct', or 'incorrect', got '{handle_equal}'") y_true = np.asarray(y_true).flatten() y_pred = np.asarray(y_pred).flatten() if y_true.shape != y_pred.shape: raise ValueError(f"Shape mismatch: y_true {y_true.shape} and y_pred {y_pred.shape} must match.") if baseline is None: if len(y_true) < 2: raise ValueError("Time-series mode (baseline=None) requires at least 2 samples.") if sample_weight is not None: sample_weight = np.asarray(sample_weight).flatten() if len(sample_weight) != len(y_true): raise ValueError( f"Sample weight length ({len(sample_weight)}) does not match sample count ({len(y_true)})" ) baseline = y_true[:-1] y_true = y_true[1:] y_pred = y_pred[1:] if sample_weight is not None: sample_weight = sample_weight[1:] else: baseline = np.asarray(baseline).flatten() if baseline.shape != y_true.shape: raise ValueError(f"Shape mismatch: baseline {baseline.shape} and y_true {y_true.shape} must match.") if sample_weight is not None: sample_weight = np.asarray(sample_weight).flatten() if len(sample_weight) != len(y_true): raise ValueError( f"Sample weight length ({len(sample_weight)}) does not match sample count ({len(y_true)})" ) true_direction = np.sign(y_true - baseline) pred_direction = np.sign(y_pred - baseline) if handle_equal == "exclude": mask = true_direction != 0 true_direction = true_direction[mask] pred_direction = pred_direction[mask] if sample_weight is not None: sample_weight = sample_weight[mask] if len(true_direction) == 0: raise ValueError("No valid samples remain after filtering") correct_direction = true_direction == pred_direction elif handle_equal == "correct": equal_mask = true_direction == 0 correct_direction = np.where( equal_mask, pred_direction == 0, true_direction == pred_direction, ) else: # handle_equal == "incorrect" equal_mask = true_direction == 0 correct_direction = np.where( equal_mask, False, true_direction == pred_direction, ) if sample_weight is None: accuracy = correct_direction.mean() else: w = sample_weight / sample_weight.sum() accuracy = (correct_direction * w).sum() return float(accuracy)
[docs] def directional_bias_score( y_true: np.ndarray, y_pred: np.ndarray, sample_weight: Optional[np.ndarray] = None, handle_equal: str = "exclude", ) -> float: """Calculate the directional bias score. Directional bias (DB) measures the systematic tendency of a model to over-predict or under-predict the target values. The formula is: DB = (n_over - n_under) / n :param y_true: Ground truth (correct) target values. :param y_pred: Estimated target values. :param sample_weight: Sample weights. :param handle_equal: How to treat samples where y_pred == y_true. - 'exclude': Filter out these samples (default). - 'neutral': Keep samples; they contribute 0 to the bias. :return: Directional bias score as a float in [-1, 1]. 1.0 = complete over-prediction, -1.0 = complete under-prediction, 0.0 = balanced predictions. :raises ValueError: If handle_equal is invalid, if shapes mismatch, or if no samples remain after filtering. Complete over-prediction example: >>> import numpy as np >>> y_true = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) >>> y_pred = np.array([1.1, 2.1, 3.1, 4.1, 5.1]) >>> directional_bias_score(y_true, y_pred) 1.0 Complete under-prediction example: >>> y_pred = np.array([0.9, 1.9, 2.9, 3.9, 4.9]) >>> directional_bias_score(y_true, y_pred) -1.0 Balanced example: >>> y_pred = np.array([1.1, 1.9, 3.1, 3.9, 5.0]) >>> directional_bias_score(y_true, y_pred) 0.0 """ if handle_equal not in ["exclude", "neutral"]: raise ValueError(f"handle_equal must be 'exclude' or 'neutral', got '{handle_equal}'") y_true = np.asarray(y_true).flatten() y_pred = np.asarray(y_pred).flatten() if y_true.shape != y_pred.shape: raise ValueError(f"Shape mismatch: y_true {y_true.shape} and y_pred {y_pred.shape} must match.") if sample_weight is not None: sample_weight = np.asarray(sample_weight).flatten() if len(sample_weight) != len(y_true): raise ValueError(f"Sample weight length ({len(sample_weight)}) does not match sample count ({len(y_true)})") errors = y_pred - y_true if handle_equal == "exclude": mask = errors != 0 errors = errors[mask] if sample_weight is not None: sample_weight = sample_weight[mask] if len(errors) == 0: raise ValueError("No valid samples remain after filtering") over_predictions = errors > 0 under_predictions = errors < 0 if sample_weight is None: prop_over = over_predictions.mean() prop_under = under_predictions.mean() else: w = sample_weight / sample_weight.sum() prop_over = (over_predictions * w).sum() prop_under = (under_predictions * w).sum() return float(prop_over - prop_under)