Files
openbb-invest-api/portfolio_service.py
Yaojia Wang 42ba359c48 feat: add portfolio optimization and congress tracking (TDD)
Portfolio optimization (3 endpoints):
- POST /portfolio/optimize - HRP optimal weights via scipy clustering
- POST /portfolio/correlation - pairwise correlation matrix
- POST /portfolio/risk-parity - inverse-volatility risk parity weights

Congress tracking (2 endpoints):
- GET /regulators/congress/trades - congress member stock trades
- GET /regulators/congress/bills?query= - search congress bills

Implementation:
- portfolio_service.py: HRP with scipy fallback to inverse-vol
- congress_service.py: multi-provider fallback pattern
- 51 new tests (14 portfolio unit, 20 portfolio route, 12 congress
  unit, 7 congress route)
- All 312 tests passing
2026-03-19 22:27:03 +01:00

223 lines
6.8 KiB
Python

"""Portfolio optimization: HRP, correlation matrix, risk parity."""
import asyncio
import logging
from typing import Any
import numpy as np
import pandas as pd
from obb_utils import fetch_historical
logger = logging.getLogger(__name__)
async def fetch_historical_prices(symbols: list[str], days: int = 365) -> pd.DataFrame:
"""Fetch closing prices for multiple symbols and return as a DataFrame.
Columns are symbol names; rows are dates. Symbols with no data are skipped.
"""
tasks = [fetch_historical(sym, days=days) for sym in symbols]
results = await asyncio.gather(*tasks)
price_series: dict[str, pd.Series] = {}
for sym, result in zip(symbols, results):
if result is None or result.results is None:
logger.warning("No historical data for %s, skipping", sym)
continue
rows = result.results
if not rows:
continue
dates = []
closes = []
for row in rows:
d = getattr(row, "date", None)
c = getattr(row, "close", None)
if d is not None and c is not None:
dates.append(str(d))
closes.append(float(c))
if dates:
price_series[sym] = pd.Series(closes, index=dates)
if not price_series:
return pd.DataFrame()
df = pd.DataFrame(price_series)
df = df.dropna(how="all")
return df
def _compute_returns(prices: pd.DataFrame) -> pd.DataFrame:
"""Compute daily log returns from a price DataFrame."""
return prices.pct_change().dropna()
def _inverse_volatility_weights(returns: pd.DataFrame) -> dict[str, float]:
"""Compute inverse-volatility weights."""
vols = returns.std()
inv_vols = 1.0 / vols
weights = inv_vols / inv_vols.sum()
return {sym: float(w) for sym, w in weights.items()}
def _hrp_weights(returns: pd.DataFrame) -> dict[str, float]:
"""Compute Hierarchical Risk Parity weights via scipy clustering.
Falls back to inverse-volatility if scipy is unavailable.
"""
symbols = list(returns.columns)
n = len(symbols)
if n == 1:
return {symbols[0]: 1.0}
try:
from scipy.cluster.hierarchy import linkage, leaves_list
from scipy.spatial.distance import squareform
corr = returns.corr().fillna(0).values
# Convert correlation to distance: d = sqrt(0.5 * (1 - corr))
dist = np.sqrt(np.clip(0.5 * (1 - corr), 0, 1))
np.fill_diagonal(dist, 0.0)
condensed = squareform(dist)
link = linkage(condensed, method="single")
order = leaves_list(link)
sorted_symbols = [symbols[i] for i in order]
except ImportError:
logger.warning("scipy not available; using inverse-volatility for HRP")
return _inverse_volatility_weights(returns)
cov = returns.cov().values
def _bisect_weights(items: list[str]) -> dict[str, float]:
if len(items) == 1:
return {items[0]: 1.0}
mid = len(items) // 2
left_items = items[:mid]
right_items = items[mid:]
left_idx = [sorted_symbols.index(s) for s in left_items]
right_idx = [sorted_symbols.index(s) for s in right_items]
def _cluster_var(idx: list[int]) -> float:
sub_cov = cov[np.ix_(idx, idx)]
w = np.ones(len(idx)) / len(idx)
return float(w @ sub_cov @ w)
v_left = _cluster_var(left_idx)
v_right = _cluster_var(right_idx)
total = v_left + v_right
alpha = 1.0 - v_left / total if total > 0 else 0.5
w_left = _bisect_weights(left_items)
w_right = _bisect_weights(right_items)
result = {}
for sym, w in w_left.items():
result[sym] = w * (1.0 - alpha)
for sym, w in w_right.items():
result[sym] = w * alpha
return result
raw = _bisect_weights(sorted_symbols)
total = sum(raw.values())
return {sym: float(w / total) for sym, w in raw.items()}
async def optimize_hrp(symbols: list[str], days: int = 365) -> dict[str, Any]:
"""Compute Hierarchical Risk Parity portfolio weights.
Args:
symbols: List of ticker symbols (1-50).
days: Number of historical days to use.
Returns:
Dict with keys ``weights`` (symbol -> float) and ``method``.
Raises:
ValueError: If symbols is empty or no price data is available.
"""
if not symbols:
raise ValueError("symbols must not be empty")
prices = await fetch_historical_prices(symbols, days=days)
if prices.empty:
raise ValueError("No price data available for the given symbols")
returns = _compute_returns(prices)
weights = _hrp_weights(returns)
return {"weights": weights, "method": "hrp"}
async def compute_correlation(
symbols: list[str], days: int = 365
) -> dict[str, Any]:
"""Compute correlation matrix for a list of symbols.
Args:
symbols: List of ticker symbols (1-50).
days: Number of historical days to use.
Returns:
Dict with keys ``symbols`` (list) and ``matrix`` (list of lists).
Raises:
ValueError: If symbols is empty or no price data is available.
"""
if not symbols:
raise ValueError("symbols must not be empty")
prices = await fetch_historical_prices(symbols, days=days)
if prices.empty:
raise ValueError("No price data available for the given symbols")
returns = _compute_returns(prices)
available = list(returns.columns)
corr = returns.corr().fillna(0)
matrix = corr.values.tolist()
return {"symbols": available, "matrix": matrix}
async def compute_risk_parity(
symbols: list[str], days: int = 365
) -> dict[str, Any]:
"""Compute equal risk contribution (inverse-volatility) weights.
Args:
symbols: List of ticker symbols (1-50).
days: Number of historical days to use.
Returns:
Dict with keys ``weights``, ``risk_contributions``, and ``method``.
Raises:
ValueError: If symbols is empty or no price data is available.
"""
if not symbols:
raise ValueError("symbols must not be empty")
prices = await fetch_historical_prices(symbols, days=days)
if prices.empty:
raise ValueError("No price data available for the given symbols")
returns = _compute_returns(prices)
weights = _inverse_volatility_weights(returns)
# Risk contributions: w_i * sigma_i / sum(w_j * sigma_j)
vols = returns.std()
weighted_risk = {sym: weights[sym] * float(vols[sym]) for sym in weights}
total_risk = sum(weighted_risk.values())
if total_risk > 0:
risk_contributions = {sym: v / total_risk for sym, v in weighted_risk.items()}
else:
n = len(weights)
risk_contributions = {sym: 1.0 / n for sym in weights}
return {
"weights": weights,
"risk_contributions": risk_contributions,
"method": "risk_parity",
}