Files
openbb-invest-api/quantitative_service.py
Yaojia Wang 89bdc6c552 refactor: address python review findings
- Move FRED credential registration to FastAPI lifespan (was fragile
  import-order-dependent side-effect)
- Add noqa E402 annotations for imports after curl_cffi patch
- Fix all return type hints: bare dict -> dict[str, Any]
- Move yfinance import to module level (was inline in functions)
- Fix datetime.now() -> datetime.now(tz=timezone.utc) in openbb_service
- Add try/except error handling to Group B service functions
- Fix dict mutation in relative_rotation (immutable pattern)
- Extract _classify_rrg_quadrant helper function
- Fix type builtin shadow in routes_economy (type -> gdp_type)
- Fix falsy int guard (if year: -> if year is not None:)
- Remove user input echo from error messages
2026-03-19 17:40:47 +01:00

196 lines
7.7 KiB
Python

"""Quantitative analysis: risk metrics, performance, CAPM, normality tests."""
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import Any
from openbb import obb
from obb_utils import extract_single, safe_last, fetch_historical, to_list
logger = logging.getLogger(__name__)
PROVIDER = "yfinance"
# Need 252+ trading days for default window; 730 calendar days is safe
PERF_DAYS = 730
TARGET = "close"
async def get_performance_metrics(symbol: str, days: int = 365) -> dict[str, Any]:
"""Calculate Sharpe ratio, summary stats, and volatility for a symbol."""
# Need at least 252 trading days for Sharpe window
fetch_days = max(days, PERF_DAYS)
start = (datetime.now(tz=timezone.utc) - timedelta(days=fetch_days)).strftime("%Y-%m-%d")
try:
hist = await asyncio.to_thread(
obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER
)
if not hist or not hist.results:
return {"symbol": symbol, "error": "No historical data"}
results: tuple[Any, ...] = await asyncio.gather(
asyncio.to_thread(
obb.quantitative.performance.sharpe_ratio,
data=hist.results, target=TARGET,
),
asyncio.to_thread(
obb.quantitative.summary, data=hist.results, target=TARGET
),
asyncio.to_thread(
obb.quantitative.stats.stdev, data=hist.results, target=TARGET
),
return_exceptions=True,
)
sharpe_result, summary_result, stdev_result = results
sharpe = safe_last(sharpe_result) if not isinstance(sharpe_result, BaseException) else None
summary = extract_single(summary_result) if not isinstance(summary_result, BaseException) else {}
stdev = safe_last(stdev_result) if not isinstance(stdev_result, BaseException) else None
return {
"symbol": symbol,
"period_days": days,
"sharpe_ratio": sharpe,
"summary": summary,
"stdev": stdev,
}
except Exception:
logger.warning("Performance metrics failed for %s", symbol, exc_info=True)
return {"symbol": symbol, "error": "Failed to compute performance metrics"}
async def get_capm(symbol: str) -> dict[str, Any]:
"""Calculate CAPM metrics: beta, alpha, systematic/idiosyncratic risk."""
start = (datetime.now(tz=timezone.utc) - timedelta(days=PERF_DAYS)).strftime("%Y-%m-%d")
try:
hist = await asyncio.to_thread(
obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER
)
if not hist or not hist.results:
return {"symbol": symbol, "error": "No historical data"}
capm = await asyncio.to_thread(
obb.quantitative.capm, data=hist.results, target=TARGET
)
return {"symbol": symbol, **extract_single(capm)}
except Exception:
logger.warning("CAPM failed for %s", symbol, exc_info=True)
return {"symbol": symbol, "error": "Failed to compute CAPM"}
async def get_normality_test(symbol: str, days: int = 365) -> dict[str, Any]:
"""Run normality tests (Jarque-Bera, Shapiro-Wilk, etc.) on returns."""
fetch_days = max(days, PERF_DAYS)
start = (datetime.now(tz=timezone.utc) - timedelta(days=fetch_days)).strftime("%Y-%m-%d")
try:
hist = await asyncio.to_thread(
obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER
)
if not hist or not hist.results:
return {"symbol": symbol, "error": "No historical data"}
norm = await asyncio.to_thread(
obb.quantitative.normality, data=hist.results, target=TARGET
)
return {"symbol": symbol, **extract_single(norm)}
except Exception:
logger.warning("Normality test failed for %s", symbol, exc_info=True)
return {"symbol": symbol, "error": "Failed to compute normality tests"}
async def get_unitroot_test(symbol: str, days: int = 365) -> dict[str, Any]:
"""Run unit root tests (ADF, KPSS) for stationarity."""
fetch_days = max(days, PERF_DAYS)
start = (datetime.now(tz=timezone.utc) - timedelta(days=fetch_days)).strftime("%Y-%m-%d")
try:
hist = await asyncio.to_thread(
obb.equity.price.historical, symbol, start_date=start, provider=PROVIDER
)
if not hist or not hist.results:
return {"symbol": symbol, "error": "No historical data"}
ur = await asyncio.to_thread(
obb.quantitative.unitroot_test, data=hist.results, target=TARGET
)
return {"symbol": symbol, **extract_single(ur)}
except Exception:
logger.warning("Unit root test failed for %s", symbol, exc_info=True)
return {"symbol": symbol, "error": "Failed to compute unit root test"}
# --- Extended Quantitative (Phase 1, Group J) ---
async def get_sortino(symbol: str, days: int = 365) -> dict[str, Any]:
"""Sortino ratio -- risk-adjusted return penalizing only downside deviation."""
fetch_days = max(days, PERF_DAYS)
hist = await fetch_historical(symbol, fetch_days)
if hist is None:
return {"symbol": symbol, "error": "No historical data"}
try:
result = await asyncio.to_thread(
obb.quantitative.performance.sortino_ratio,
data=hist.results, target=TARGET,
)
return {"symbol": symbol, "period_days": days, "sortino": safe_last(result)}
except Exception:
logger.warning("Sortino failed for %s", symbol, exc_info=True)
return {"symbol": symbol, "error": "Failed to compute Sortino ratio"}
async def get_omega(symbol: str, days: int = 365) -> dict[str, Any]:
"""Omega ratio -- probability-weighted gain vs loss ratio."""
fetch_days = max(days, PERF_DAYS)
hist = await fetch_historical(symbol, fetch_days)
if hist is None:
return {"symbol": symbol, "error": "No historical data"}
try:
result = await asyncio.to_thread(
obb.quantitative.performance.omega_ratio,
data=hist.results, target=TARGET,
)
return {"symbol": symbol, "period_days": days, "omega": safe_last(result)}
except Exception:
logger.warning("Omega failed for %s", symbol, exc_info=True)
return {"symbol": symbol, "error": "Failed to compute Omega ratio"}
async def get_rolling_stat(
symbol: str, stat: str, days: int = 365, window: int = 30,
) -> dict[str, Any]:
"""Compute a rolling statistic (variance, stdev, mean, skew, kurtosis, quantile)."""
valid_stats = {"variance", "stdev", "mean", "skew", "kurtosis", "quantile"}
if stat not in valid_stats:
return {"symbol": symbol, "error": f"Invalid stat. Valid options: {', '.join(sorted(valid_stats))}"}
fetch_days = max(days, PERF_DAYS)
hist = await fetch_historical(symbol, fetch_days)
if hist is None:
return {"symbol": symbol, "error": "No historical data"}
try:
fn = getattr(obb.quantitative.rolling, stat, None)
if fn is None or not callable(fn):
return {"symbol": symbol, "error": f"Stat '{stat}' not available"}
result = await asyncio.to_thread(
fn, data=hist.results, target=TARGET, window=window,
)
items = to_list(result)
# Return last N items matching the requested window
tail = items[-window:] if len(items) > window else items
return {
"symbol": symbol,
"stat": stat,
"window": window,
"period_days": days,
"data": tail,
}
except Exception:
logger.warning("Rolling %s failed for %s", stat, symbol, exc_info=True)
return {"symbol": symbol, "error": f"Failed to compute rolling {stat}"}