"""Quantitative analysis: risk metrics, performance, CAPM, normality tests.""" import asyncio import logging from datetime import datetime, timezone, timedelta from typing import Any from openbb import obb from obb_utils import extract_single, safe_last, fetch_historical, to_list logger = logging.getLogger(__name__) PROVIDER = "yfinance" # Need 252+ trading days for default window; 730 calendar days is safe PERF_DAYS = 730 TARGET = "close" async def get_performance_metrics(symbol: str, days: int = 365) -> dict[str, Any]: """Calculate Sharpe ratio, summary stats, and volatility for a symbol.""" # Need at least 252 trading days for Sharpe window fetch_days = max(days, PERF_DAYS) start = (datetime.now(tz=timezone.utc) - timedelta(days=fetch_days)).strftime("%Y-%m-%d") try: hist = await asyncio.to_thread( obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER ) if not hist or not hist.results: return {"symbol": symbol, "error": "No historical data"} results: tuple[Any, ...] = await asyncio.gather( asyncio.to_thread( obb.quantitative.performance.sharpe_ratio, data=hist.results, target=TARGET, ), asyncio.to_thread( obb.quantitative.summary, data=hist.results, target=TARGET ), asyncio.to_thread( obb.quantitative.stats.stdev, data=hist.results, target=TARGET ), return_exceptions=True, ) sharpe_result, summary_result, stdev_result = results sharpe = safe_last(sharpe_result) if not isinstance(sharpe_result, BaseException) else None summary = extract_single(summary_result) if not isinstance(summary_result, BaseException) else {} stdev = safe_last(stdev_result) if not isinstance(stdev_result, BaseException) else None return { "symbol": symbol, "period_days": days, "sharpe_ratio": sharpe, "summary": summary, "stdev": stdev, } except Exception: logger.warning("Performance metrics failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute performance metrics"} async def get_capm(symbol: str) -> dict[str, Any]: """Calculate CAPM metrics: beta, alpha, systematic/idiosyncratic risk.""" start = (datetime.now(tz=timezone.utc) - timedelta(days=PERF_DAYS)).strftime("%Y-%m-%d") try: hist = await asyncio.to_thread( obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER ) if not hist or not hist.results: return {"symbol": symbol, "error": "No historical data"} capm = await asyncio.to_thread( obb.quantitative.capm, data=hist.results, target=TARGET ) return {"symbol": symbol, **extract_single(capm)} except Exception: logger.warning("CAPM failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute CAPM"} async def get_normality_test(symbol: str, days: int = 365) -> dict[str, Any]: """Run normality tests (Jarque-Bera, Shapiro-Wilk, etc.) on returns.""" fetch_days = max(days, PERF_DAYS) start = (datetime.now(tz=timezone.utc) - timedelta(days=fetch_days)).strftime("%Y-%m-%d") try: hist = await asyncio.to_thread( obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER ) if not hist or not hist.results: return {"symbol": symbol, "error": "No historical data"} norm = await asyncio.to_thread( obb.quantitative.normality, data=hist.results, target=TARGET ) return {"symbol": symbol, **extract_single(norm)} except Exception: logger.warning("Normality test failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute normality tests"} async def get_unitroot_test(symbol: str, days: int = 365) -> dict[str, Any]: """Run unit root tests (ADF, KPSS) for stationarity.""" fetch_days = max(days, PERF_DAYS) start = (datetime.now(tz=timezone.utc) - timedelta(days=fetch_days)).strftime("%Y-%m-%d") try: hist = await asyncio.to_thread( obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER ) if not hist or not hist.results: return {"symbol": symbol, "error": "No historical data"} ur = await asyncio.to_thread( obb.quantitative.unitroot_test, data=hist.results, target=TARGET ) return {"symbol": symbol, **extract_single(ur)} except Exception: logger.warning("Unit root test failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute unit root test"} # --- Extended Quantitative (Phase 1, Group J) --- async def get_sortino(symbol: str, days: int = 365) -> dict[str, Any]: """Sortino ratio -- risk-adjusted return penalizing only downside deviation.""" fetch_days = max(days, PERF_DAYS) hist = await fetch_historical(symbol, fetch_days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.quantitative.performance.sortino_ratio, data=hist.results, target=TARGET, ) return {"symbol": symbol, "period_days": days, "sortino": safe_last(result)} except Exception: logger.warning("Sortino failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Sortino ratio"} async def get_omega(symbol: str, days: int = 365) -> dict[str, Any]: """Omega ratio -- probability-weighted gain vs loss ratio.""" fetch_days = max(days, PERF_DAYS) hist = await fetch_historical(symbol, fetch_days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.quantitative.performance.omega_ratio, data=hist.results, target=TARGET, ) return {"symbol": symbol, "period_days": days, "omega": safe_last(result)} except Exception: logger.warning("Omega failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Omega ratio"} async def get_rolling_stat( symbol: str, stat: str, days: int = 365, window: int = 30, ) -> dict[str, Any]: """Compute a rolling statistic (variance, stdev, mean, skew, kurtosis, quantile).""" valid_stats = {"variance", "stdev", "mean", "skew", "kurtosis", "quantile"} if stat not in valid_stats: return {"symbol": symbol, "error": f"Invalid stat: {stat}. Use: {', '.join(sorted(valid_stats))}"} fetch_days = max(days, PERF_DAYS) hist = await fetch_historical(symbol, fetch_days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: fn = getattr(obb.quantitative.rolling, stat, None) if fn is None or not callable(fn): return {"symbol": symbol, "error": f"Stat '{stat}' not available"} result = await asyncio.to_thread( fn, data=hist.results, target=TARGET, window=window, ) items = to_list(result) # Return last N items matching the requested window tail = items[-window:] if len(items) > window else items return { "symbol": symbol, "stat": stat, "window": window, "period_days": days, "data": tail, } except Exception: logger.warning("Rolling %s failed for %s", stat, symbol, exc_info=True) return {"symbol": symbol, "error": f"Failed to compute rolling {stat}"}