feat: add portfolio optimization and congress tracking (TDD)

Portfolio optimization (3 endpoints):
- POST /portfolio/optimize - HRP optimal weights via scipy clustering
- POST /portfolio/correlation - pairwise correlation matrix
- POST /portfolio/risk-parity - inverse-volatility risk parity weights

Congress tracking (2 endpoints):
- GET /regulators/congress/trades - congress member stock trades
- GET /regulators/congress/bills?query= - search congress bills

Implementation:
- portfolio_service.py: HRP with scipy fallback to inverse-vol
- congress_service.py: multi-provider fallback pattern
- 51 new tests (14 portfolio unit, 20 portfolio route, 12 congress
  unit, 7 congress route)
- All 312 tests passing
This commit is contained in:
Yaojia Wang
2026-03-19 22:27:03 +01:00
parent 27b131492f
commit 42ba359c48
9 changed files with 1140 additions and 1 deletions

68
congress_service.py Normal file
View File

@@ -0,0 +1,68 @@
"""Congress trading data: member trades and bill search."""
import asyncio
import logging
from typing import Any
from openbb import obb
from obb_utils import to_list
logger = logging.getLogger(__name__)
async def _try_obb_call(fn, *args, **kwargs) -> list[dict[str, Any]] | None:
"""Attempt a single OBB call and return to_list result, or None on failure."""
try:
result = await asyncio.to_thread(fn, *args, **kwargs)
return to_list(result)
except Exception as exc:
logger.debug("OBB call failed: %s", exc)
return None
def _get_congress_fn():
"""Resolve the congress trading OBB function safely."""
try:
return obb.regulators.government_us.congress_trading
except AttributeError:
logger.debug("obb.regulators.government_us.congress_trading not available")
return None
async def get_congress_trades() -> list[dict[str, Any]]:
"""Get recent US congress member stock trades.
Returns an empty list if the data provider is unavailable.
"""
fn = _get_congress_fn()
if fn is None:
return []
providers = ["quiverquant", "fmp"]
for provider in providers:
data = await _try_obb_call(fn, provider=provider)
if data is not None:
return data
logger.warning("All congress trades providers failed")
return []
async def search_congress_bills(query: str) -> list[dict[str, Any]]:
"""Search US congress bills by keyword.
Returns an empty list if the data provider is unavailable.
"""
fn = _get_congress_fn()
if fn is None:
return []
providers = ["quiverquant", "fmp"]
for provider in providers:
data = await _try_obb_call(fn, query, provider=provider)
if data is not None:
return data
logger.warning("All congress bills providers failed for query: %s", query)
return []

View File

@@ -37,6 +37,7 @@ from routes_sentiment import router as sentiment_router # noqa: E402
from routes_shorts import router as shorts_router # noqa: E402 from routes_shorts import router as shorts_router # noqa: E402
from routes_surveys import router as surveys_router # noqa: E402 from routes_surveys import router as surveys_router # noqa: E402
from routes_technical import router as technical_router # noqa: E402 from routes_technical import router as technical_router # noqa: E402
from routes_portfolio import router as portfolio_router # noqa: E402
logging.basicConfig( logging.basicConfig(
level=settings.log_level.upper(), level=settings.log_level.upper(),
@@ -81,6 +82,7 @@ app.include_router(fixed_income_router)
app.include_router(economy_router) app.include_router(economy_router)
app.include_router(surveys_router) app.include_router(surveys_router)
app.include_router(regulators_router) app.include_router(regulators_router)
app.include_router(portfolio_router)
@app.get("/health", response_model=dict[str, str]) @app.get("/health", response_model=dict[str, str])

222
portfolio_service.py Normal file
View File

@@ -0,0 +1,222 @@
"""Portfolio optimization: HRP, correlation matrix, risk parity."""
import asyncio
import logging
from typing import Any
import numpy as np
import pandas as pd
from obb_utils import fetch_historical
logger = logging.getLogger(__name__)
async def fetch_historical_prices(symbols: list[str], days: int = 365) -> pd.DataFrame:
"""Fetch closing prices for multiple symbols and return as a DataFrame.
Columns are symbol names; rows are dates. Symbols with no data are skipped.
"""
tasks = [fetch_historical(sym, days=days) for sym in symbols]
results = await asyncio.gather(*tasks)
price_series: dict[str, pd.Series] = {}
for sym, result in zip(symbols, results):
if result is None or result.results is None:
logger.warning("No historical data for %s, skipping", sym)
continue
rows = result.results
if not rows:
continue
dates = []
closes = []
for row in rows:
d = getattr(row, "date", None)
c = getattr(row, "close", None)
if d is not None and c is not None:
dates.append(str(d))
closes.append(float(c))
if dates:
price_series[sym] = pd.Series(closes, index=dates)
if not price_series:
return pd.DataFrame()
df = pd.DataFrame(price_series)
df = df.dropna(how="all")
return df
def _compute_returns(prices: pd.DataFrame) -> pd.DataFrame:
"""Compute daily log returns from a price DataFrame."""
return prices.pct_change().dropna()
def _inverse_volatility_weights(returns: pd.DataFrame) -> dict[str, float]:
"""Compute inverse-volatility weights."""
vols = returns.std()
inv_vols = 1.0 / vols
weights = inv_vols / inv_vols.sum()
return {sym: float(w) for sym, w in weights.items()}
def _hrp_weights(returns: pd.DataFrame) -> dict[str, float]:
"""Compute Hierarchical Risk Parity weights via scipy clustering.
Falls back to inverse-volatility if scipy is unavailable.
"""
symbols = list(returns.columns)
n = len(symbols)
if n == 1:
return {symbols[0]: 1.0}
try:
from scipy.cluster.hierarchy import linkage, leaves_list
from scipy.spatial.distance import squareform
corr = returns.corr().fillna(0).values
# Convert correlation to distance: d = sqrt(0.5 * (1 - corr))
dist = np.sqrt(np.clip(0.5 * (1 - corr), 0, 1))
np.fill_diagonal(dist, 0.0)
condensed = squareform(dist)
link = linkage(condensed, method="single")
order = leaves_list(link)
sorted_symbols = [symbols[i] for i in order]
except ImportError:
logger.warning("scipy not available; using inverse-volatility for HRP")
return _inverse_volatility_weights(returns)
cov = returns.cov().values
def _bisect_weights(items: list[str]) -> dict[str, float]:
if len(items) == 1:
return {items[0]: 1.0}
mid = len(items) // 2
left_items = items[:mid]
right_items = items[mid:]
left_idx = [sorted_symbols.index(s) for s in left_items]
right_idx = [sorted_symbols.index(s) for s in right_items]
def _cluster_var(idx: list[int]) -> float:
sub_cov = cov[np.ix_(idx, idx)]
w = np.ones(len(idx)) / len(idx)
return float(w @ sub_cov @ w)
v_left = _cluster_var(left_idx)
v_right = _cluster_var(right_idx)
total = v_left + v_right
alpha = 1.0 - v_left / total if total > 0 else 0.5
w_left = _bisect_weights(left_items)
w_right = _bisect_weights(right_items)
result = {}
for sym, w in w_left.items():
result[sym] = w * (1.0 - alpha)
for sym, w in w_right.items():
result[sym] = w * alpha
return result
raw = _bisect_weights(sorted_symbols)
total = sum(raw.values())
return {sym: float(w / total) for sym, w in raw.items()}
async def optimize_hrp(symbols: list[str], days: int = 365) -> dict[str, Any]:
"""Compute Hierarchical Risk Parity portfolio weights.
Args:
symbols: List of ticker symbols (1-50).
days: Number of historical days to use.
Returns:
Dict with keys ``weights`` (symbol -> float) and ``method``.
Raises:
ValueError: If symbols is empty or no price data is available.
"""
if not symbols:
raise ValueError("symbols must not be empty")
prices = await fetch_historical_prices(symbols, days=days)
if prices.empty:
raise ValueError("No price data available for the given symbols")
returns = _compute_returns(prices)
weights = _hrp_weights(returns)
return {"weights": weights, "method": "hrp"}
async def compute_correlation(
symbols: list[str], days: int = 365
) -> dict[str, Any]:
"""Compute correlation matrix for a list of symbols.
Args:
symbols: List of ticker symbols (1-50).
days: Number of historical days to use.
Returns:
Dict with keys ``symbols`` (list) and ``matrix`` (list of lists).
Raises:
ValueError: If symbols is empty or no price data is available.
"""
if not symbols:
raise ValueError("symbols must not be empty")
prices = await fetch_historical_prices(symbols, days=days)
if prices.empty:
raise ValueError("No price data available for the given symbols")
returns = _compute_returns(prices)
available = list(returns.columns)
corr = returns.corr().fillna(0)
matrix = corr.values.tolist()
return {"symbols": available, "matrix": matrix}
async def compute_risk_parity(
symbols: list[str], days: int = 365
) -> dict[str, Any]:
"""Compute equal risk contribution (inverse-volatility) weights.
Args:
symbols: List of ticker symbols (1-50).
days: Number of historical days to use.
Returns:
Dict with keys ``weights``, ``risk_contributions``, and ``method``.
Raises:
ValueError: If symbols is empty or no price data is available.
"""
if not symbols:
raise ValueError("symbols must not be empty")
prices = await fetch_historical_prices(symbols, days=days)
if prices.empty:
raise ValueError("No price data available for the given symbols")
returns = _compute_returns(prices)
weights = _inverse_volatility_weights(returns)
# Risk contributions: w_i * sigma_i / sum(w_j * sigma_j)
vols = returns.std()
weighted_risk = {sym: weights[sym] * float(vols[sym]) for sym in weights}
total_risk = sum(weighted_risk.values())
if total_risk > 0:
risk_contributions = {sym: v / total_risk for sym, v in weighted_risk.items()}
else:
n = len(weights)
risk_contributions = {sym: 1.0 / n for sym in weights}
return {
"weights": weights,
"risk_contributions": risk_contributions,
"method": "risk_parity",
}

54
routes_portfolio.py Normal file
View File

@@ -0,0 +1,54 @@
"""Routes for portfolio optimization (HRP, correlation, risk parity)."""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from models import ApiResponse
from route_utils import safe
import portfolio_service
router = APIRouter(prefix="/api/v1/portfolio")
class PortfolioOptimizeRequest(BaseModel):
symbols: list[str] = Field(..., min_length=1, max_length=50)
days: int = Field(default=365, ge=1, le=3650)
@router.post("/optimize", response_model=ApiResponse)
@safe
async def portfolio_optimize(request: PortfolioOptimizeRequest):
"""Compute HRP optimal weights for a list of symbols."""
try:
result = await portfolio_service.optimize_hrp(
request.symbols, days=request.days
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return ApiResponse(data=result)
@router.post("/correlation", response_model=ApiResponse)
@safe
async def portfolio_correlation(request: PortfolioOptimizeRequest):
"""Compute correlation matrix for a list of symbols."""
try:
result = await portfolio_service.compute_correlation(
request.symbols, days=request.days
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return ApiResponse(data=result)
@router.post("/risk-parity", response_model=ApiResponse)
@safe
async def portfolio_risk_parity(request: PortfolioOptimizeRequest):
"""Compute equal risk contribution weights for a list of symbols."""
try:
result = await portfolio_service.compute_risk_parity(
request.symbols, days=request.days
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return ApiResponse(data=result)

View File

@@ -1,10 +1,11 @@
"""Routes for regulatory data (CFTC, SEC).""" """Routes for regulatory data (CFTC, SEC, Congress)."""
from fastapi import APIRouter, Path, Query from fastapi import APIRouter, Path, Query
from models import ApiResponse from models import ApiResponse
from route_utils import safe, validate_symbol from route_utils import safe, validate_symbol
import regulators_service import regulators_service
import congress_service
router = APIRouter(prefix="/api/v1/regulators") router = APIRouter(prefix="/api/v1/regulators")
@@ -49,3 +50,22 @@ async def sec_cik_map(symbol: str = Path(..., min_length=1, max_length=20)):
symbol = validate_symbol(symbol) symbol = validate_symbol(symbol)
data = await regulators_service.get_cik_map(symbol) data = await regulators_service.get_cik_map(symbol)
return ApiResponse(data=data) return ApiResponse(data=data)
# --- Congress Trading ---
@router.get("/congress/trades", response_model=ApiResponse)
@safe
async def congress_trades():
"""Recent US congress member stock trades."""
data = await congress_service.get_congress_trades()
return ApiResponse(data=data)
@router.get("/congress/bills", response_model=ApiResponse)
@safe
async def congress_bills(query: str = Query(..., min_length=1, max_length=200)):
"""Search US congress bills by keyword."""
data = await congress_service.search_congress_bills(query)
return ApiResponse(data=data)

View File

@@ -0,0 +1,187 @@
"""Tests for congress trading service (TDD - RED phase first)."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# --- get_congress_trades ---
@pytest.mark.asyncio
async def test_get_congress_trades_happy_path():
"""Returns list of trade dicts when OBB call succeeds."""
expected = [
{
"representative": "Nancy Pelosi",
"ticker": "NVDA",
"transaction_date": "2024-01-15",
"transaction_type": "Purchase",
"amount": "$1,000,001-$5,000,000",
}
]
import congress_service
mock_fn = MagicMock()
with patch.object(congress_service, "_get_congress_fn", return_value=mock_fn), \
patch.object(congress_service, "_try_obb_call", new_callable=AsyncMock, return_value=expected):
result = await congress_service.get_congress_trades()
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["representative"] == "Nancy Pelosi"
@pytest.mark.asyncio
async def test_get_congress_trades_returns_empty_when_fn_not_available():
"""Returns empty list when OBB congress function is not available."""
import congress_service
with patch.object(congress_service, "_get_congress_fn", return_value=None):
result = await congress_service.get_congress_trades()
assert result == []
@pytest.mark.asyncio
async def test_get_congress_trades_returns_empty_on_all_provider_failures():
"""Returns empty list when all providers fail (_try_obb_call returns None)."""
import congress_service
mock_fn = MagicMock()
with patch.object(congress_service, "_get_congress_fn", return_value=mock_fn), \
patch.object(congress_service, "_try_obb_call", new_callable=AsyncMock, return_value=None):
result = await congress_service.get_congress_trades()
assert result == []
@pytest.mark.asyncio
async def test_get_congress_trades_empty_list_result():
"""Returns empty list when _try_obb_call returns empty list."""
import congress_service
mock_fn = MagicMock()
with patch.object(congress_service, "_get_congress_fn", return_value=mock_fn), \
patch.object(congress_service, "_try_obb_call", new_callable=AsyncMock, return_value=[]):
result = await congress_service.get_congress_trades()
assert result == []
# --- _get_congress_fn ---
def test_get_congress_fn_returns_none_when_attribute_missing():
"""Returns None gracefully when obb.regulators.government_us is absent."""
import congress_service
mock_obb = MagicMock(spec=[]) # spec with no attributes
with patch.object(congress_service, "obb", mock_obb):
result = congress_service._get_congress_fn()
assert result is None
def test_get_congress_fn_returns_callable_when_available():
"""Returns the congress_trading callable when attribute exists."""
import congress_service
mock_fn = MagicMock()
mock_obb = MagicMock()
mock_obb.regulators.government_us.congress_trading = mock_fn
with patch.object(congress_service, "obb", mock_obb):
result = congress_service._get_congress_fn()
assert result is mock_fn
# --- _try_obb_call ---
@pytest.mark.asyncio
async def test_try_obb_call_returns_list_on_success():
"""_try_obb_call converts OBBject result to list via to_list."""
import congress_service
mock_result = MagicMock()
expected = [{"ticker": "AAPL"}]
with patch.object(congress_service, "to_list", return_value=expected), \
patch("congress_service.asyncio.to_thread", new_callable=AsyncMock, return_value=mock_result):
result = await congress_service._try_obb_call(MagicMock())
assert result == expected
@pytest.mark.asyncio
async def test_try_obb_call_returns_none_on_exception():
"""_try_obb_call returns None when asyncio.to_thread raises."""
import congress_service
with patch("congress_service.asyncio.to_thread", new_callable=AsyncMock, side_effect=Exception("fail")):
result = await congress_service._try_obb_call(MagicMock())
assert result is None
# --- search_congress_bills ---
@pytest.mark.asyncio
async def test_search_congress_bills_happy_path():
"""Returns list of bill dicts when OBB call succeeds."""
expected = [
{"title": "Infrastructure Investment and Jobs Act", "bill_id": "HR3684"},
{"title": "Inflation Reduction Act", "bill_id": "HR5376"},
]
import congress_service
mock_fn = MagicMock()
with patch.object(congress_service, "_get_congress_fn", return_value=mock_fn), \
patch.object(congress_service, "_try_obb_call", new_callable=AsyncMock, return_value=expected):
result = await congress_service.search_congress_bills("infrastructure")
assert isinstance(result, list)
assert len(result) == 2
assert result[0]["bill_id"] == "HR3684"
@pytest.mark.asyncio
async def test_search_congress_bills_returns_empty_when_fn_not_available():
"""Returns empty list when OBB function is not available."""
import congress_service
with patch.object(congress_service, "_get_congress_fn", return_value=None):
result = await congress_service.search_congress_bills("taxes")
assert result == []
@pytest.mark.asyncio
async def test_search_congress_bills_returns_empty_on_failure():
"""Returns empty list when all providers fail."""
import congress_service
mock_fn = MagicMock()
with patch.object(congress_service, "_get_congress_fn", return_value=mock_fn), \
patch.object(congress_service, "_try_obb_call", new_callable=AsyncMock, return_value=None):
result = await congress_service.search_congress_bills("taxes")
assert result == []
@pytest.mark.asyncio
async def test_search_congress_bills_empty_results():
"""Returns empty list when _try_obb_call returns empty list."""
import congress_service
mock_fn = MagicMock()
with patch.object(congress_service, "_get_congress_fn", return_value=mock_fn), \
patch.object(congress_service, "_try_obb_call", new_callable=AsyncMock, return_value=[]):
result = await congress_service.search_congress_bills("nonexistent")
assert result == []

View File

@@ -0,0 +1,263 @@
"""Tests for portfolio optimization service (TDD - RED phase first)."""
from unittest.mock import AsyncMock, patch
import pytest
# --- HRP Optimization ---
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_hrp_optimize_happy_path(mock_fetch):
"""HRP returns weights that sum to ~1.0 for valid symbols."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame(
{
"AAPL": [150.0, 151.0, 149.0, 152.0, 153.0],
"MSFT": [300.0, 302.0, 298.0, 305.0, 307.0],
"GOOGL": [2800.0, 2820.0, 2790.0, 2830.0, 2850.0],
}
)
import portfolio_service
result = await portfolio_service.optimize_hrp(
["AAPL", "MSFT", "GOOGL"], days=365
)
assert result["method"] == "hrp"
assert set(result["weights"].keys()) == {"AAPL", "MSFT", "GOOGL"}
total = sum(result["weights"].values())
assert abs(total - 1.0) < 0.01
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_hrp_optimize_single_symbol(mock_fetch):
"""Single symbol gets weight of 1.0."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame(
{"AAPL": [150.0, 151.0, 149.0, 152.0, 153.0]}
)
import portfolio_service
result = await portfolio_service.optimize_hrp(["AAPL"], days=365)
assert result["weights"]["AAPL"] == pytest.approx(1.0, abs=0.01)
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_hrp_optimize_no_data_raises(mock_fetch):
"""Raises ValueError when no price data is available."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame()
import portfolio_service
with pytest.raises(ValueError, match="No price data"):
await portfolio_service.optimize_hrp(["AAPL", "MSFT"], days=365)
@pytest.mark.asyncio
async def test_hrp_optimize_empty_symbols_raises():
"""Raises ValueError for empty symbol list."""
import portfolio_service
with pytest.raises(ValueError, match="symbols"):
await portfolio_service.optimize_hrp([], days=365)
# --- Correlation Matrix ---
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_correlation_matrix_happy_path(mock_fetch):
"""Correlation matrix has 1.0 on diagonal and valid shape."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame(
{
"AAPL": [150.0, 151.0, 149.0, 152.0, 153.0],
"MSFT": [300.0, 302.0, 298.0, 305.0, 307.0],
"GOOGL": [2800.0, 2820.0, 2790.0, 2830.0, 2850.0],
}
)
import portfolio_service
result = await portfolio_service.compute_correlation(
["AAPL", "MSFT", "GOOGL"], days=365
)
assert result["symbols"] == ["AAPL", "MSFT", "GOOGL"]
matrix = result["matrix"]
assert len(matrix) == 3
assert len(matrix[0]) == 3
# Diagonal should be 1.0
for i in range(3):
assert abs(matrix[i][i] - 1.0) < 0.01
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_correlation_matrix_two_symbols(mock_fetch):
"""Two-symbol correlation is symmetric."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame(
{
"AAPL": [150.0, 151.0, 149.0, 152.0, 153.0],
"MSFT": [300.0, 302.0, 298.0, 305.0, 307.0],
}
)
import portfolio_service
result = await portfolio_service.compute_correlation(["AAPL", "MSFT"], days=365)
matrix = result["matrix"]
# Symmetric: matrix[0][1] == matrix[1][0]
assert abs(matrix[0][1] - matrix[1][0]) < 0.001
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_correlation_no_data_raises(mock_fetch):
"""Raises ValueError when no data is returned."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame()
import portfolio_service
with pytest.raises(ValueError, match="No price data"):
await portfolio_service.compute_correlation(["AAPL", "MSFT"], days=365)
@pytest.mark.asyncio
async def test_correlation_empty_symbols_raises():
"""Raises ValueError for empty symbol list."""
import portfolio_service
with pytest.raises(ValueError, match="symbols"):
await portfolio_service.compute_correlation([], days=365)
# --- Risk Parity ---
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_risk_parity_happy_path(mock_fetch):
"""Risk parity returns weights and risk_contributions summing to ~1.0."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame(
{
"AAPL": [150.0, 151.0, 149.0, 152.0, 153.0],
"MSFT": [300.0, 302.0, 298.0, 305.0, 307.0],
"GOOGL": [2800.0, 2820.0, 2790.0, 2830.0, 2850.0],
}
)
import portfolio_service
result = await portfolio_service.compute_risk_parity(
["AAPL", "MSFT", "GOOGL"], days=365
)
assert result["method"] == "risk_parity"
assert set(result["weights"].keys()) == {"AAPL", "MSFT", "GOOGL"}
assert set(result["risk_contributions"].keys()) == {"AAPL", "MSFT", "GOOGL"}
total_w = sum(result["weights"].values())
assert abs(total_w - 1.0) < 0.01
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_risk_parity_single_symbol(mock_fetch):
"""Single symbol gets weight 1.0 and risk_contribution 1.0."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame(
{"AAPL": [150.0, 151.0, 149.0, 152.0, 153.0]}
)
import portfolio_service
result = await portfolio_service.compute_risk_parity(["AAPL"], days=365)
assert result["weights"]["AAPL"] == pytest.approx(1.0, abs=0.01)
assert result["risk_contributions"]["AAPL"] == pytest.approx(1.0, abs=0.01)
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical_prices", new_callable=AsyncMock)
async def test_risk_parity_no_data_raises(mock_fetch):
"""Raises ValueError when no price data is available."""
import pandas as pd
mock_fetch.return_value = pd.DataFrame()
import portfolio_service
with pytest.raises(ValueError, match="No price data"):
await portfolio_service.compute_risk_parity(["AAPL", "MSFT"], days=365)
@pytest.mark.asyncio
async def test_risk_parity_empty_symbols_raises():
"""Raises ValueError for empty symbol list."""
import portfolio_service
with pytest.raises(ValueError, match="symbols"):
await portfolio_service.compute_risk_parity([], days=365)
# --- fetch_historical_prices helper ---
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical")
async def test_fetch_historical_prices_returns_dataframe(mock_fetch_hist):
"""fetch_historical_prices assembles a price DataFrame from OBBject results."""
import pandas as pd
from unittest.mock import MagicMock
mock_result = MagicMock()
mock_result.results = [
MagicMock(date="2024-01-01", close=150.0),
MagicMock(date="2024-01-02", close=151.0),
]
mock_fetch_hist.return_value = mock_result
import portfolio_service
df = await portfolio_service.fetch_historical_prices(["AAPL"], days=30)
assert isinstance(df, pd.DataFrame)
assert "AAPL" in df.columns
@pytest.mark.asyncio
@patch("portfolio_service.fetch_historical")
async def test_fetch_historical_prices_skips_none(mock_fetch_hist):
"""fetch_historical_prices returns empty DataFrame when all fetches fail."""
import pandas as pd
mock_fetch_hist.return_value = None
import portfolio_service
df = await portfolio_service.fetch_historical_prices(["AAPL", "MSFT"], days=30)
assert isinstance(df, pd.DataFrame)
assert df.empty

View File

@@ -0,0 +1,98 @@
"""Tests for congress trading routes (TDD - RED phase first)."""
from unittest.mock import patch, AsyncMock
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
# --- GET /api/v1/regulators/congress/trades ---
@pytest.mark.asyncio
@patch("routes_regulators.congress_service.get_congress_trades", new_callable=AsyncMock)
async def test_congress_trades_happy_path(mock_fn, client):
mock_fn.return_value = [
{
"representative": "Nancy Pelosi",
"ticker": "NVDA",
"transaction_date": "2024-01-15",
"transaction_type": "Purchase",
"amount": "$1,000,001-$5,000,000",
}
]
resp = await client.get("/api/v1/regulators/congress/trades")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert len(data["data"]) == 1
assert data["data"][0]["representative"] == "Nancy Pelosi"
mock_fn.assert_called_once()
@pytest.mark.asyncio
@patch("routes_regulators.congress_service.get_congress_trades", new_callable=AsyncMock)
async def test_congress_trades_empty(mock_fn, client):
mock_fn.return_value = []
resp = await client.get("/api/v1/regulators/congress/trades")
assert resp.status_code == 200
assert resp.json()["data"] == []
@pytest.mark.asyncio
@patch("routes_regulators.congress_service.get_congress_trades", new_callable=AsyncMock)
async def test_congress_trades_service_error_returns_502(mock_fn, client):
mock_fn.side_effect = RuntimeError("Data provider unavailable")
resp = await client.get("/api/v1/regulators/congress/trades")
assert resp.status_code == 502
# --- GET /api/v1/regulators/congress/bills ---
@pytest.mark.asyncio
@patch("routes_regulators.congress_service.search_congress_bills", new_callable=AsyncMock)
async def test_congress_bills_happy_path(mock_fn, client):
mock_fn.return_value = [
{"title": "Infrastructure Investment and Jobs Act", "bill_id": "HR3684"},
{"title": "Inflation Reduction Act", "bill_id": "HR5376"},
]
resp = await client.get("/api/v1/regulators/congress/bills?query=infrastructure")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert len(data["data"]) == 2
assert data["data"][0]["bill_id"] == "HR3684"
mock_fn.assert_called_once_with("infrastructure")
@pytest.mark.asyncio
async def test_congress_bills_missing_query(client):
resp = await client.get("/api/v1/regulators/congress/bills")
assert resp.status_code == 422
@pytest.mark.asyncio
@patch("routes_regulators.congress_service.search_congress_bills", new_callable=AsyncMock)
async def test_congress_bills_empty(mock_fn, client):
mock_fn.return_value = []
resp = await client.get("/api/v1/regulators/congress/bills?query=nonexistent")
assert resp.status_code == 200
assert resp.json()["data"] == []
@pytest.mark.asyncio
@patch("routes_regulators.congress_service.search_congress_bills", new_callable=AsyncMock)
async def test_congress_bills_service_error_returns_502(mock_fn, client):
mock_fn.side_effect = RuntimeError("Congress API unavailable")
resp = await client.get("/api/v1/regulators/congress/bills?query=tax")
assert resp.status_code == 502

View File

@@ -0,0 +1,225 @@
"""Tests for portfolio optimization routes (TDD - RED phase first)."""
from unittest.mock import patch, AsyncMock
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
# --- POST /api/v1/portfolio/optimize ---
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.optimize_hrp", new_callable=AsyncMock)
async def test_portfolio_optimize_happy_path(mock_fn, client):
mock_fn.return_value = {
"weights": {"AAPL": 0.35, "MSFT": 0.32, "GOOGL": 0.33},
"method": "hrp",
}
resp = await client.post(
"/api/v1/portfolio/optimize",
json={"symbols": ["AAPL", "MSFT", "GOOGL"], "days": 365},
)
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["method"] == "hrp"
assert "AAPL" in data["data"]["weights"]
mock_fn.assert_called_once_with(["AAPL", "MSFT", "GOOGL"], days=365)
@pytest.mark.asyncio
async def test_portfolio_optimize_missing_symbols(client):
resp = await client.post("/api/v1/portfolio/optimize", json={"days": 365})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_portfolio_optimize_empty_symbols(client):
resp = await client.post(
"/api/v1/portfolio/optimize", json={"symbols": [], "days": 365}
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_portfolio_optimize_too_many_symbols(client):
symbols = [f"SYM{i}" for i in range(51)]
resp = await client.post(
"/api/v1/portfolio/optimize", json={"symbols": symbols, "days": 365}
)
assert resp.status_code == 422
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.optimize_hrp", new_callable=AsyncMock)
async def test_portfolio_optimize_service_error_returns_502(mock_fn, client):
mock_fn.side_effect = RuntimeError("Computation failed")
resp = await client.post(
"/api/v1/portfolio/optimize",
json={"symbols": ["AAPL", "MSFT"], "days": 365},
)
assert resp.status_code == 502
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.optimize_hrp", new_callable=AsyncMock)
async def test_portfolio_optimize_value_error_returns_400(mock_fn, client):
mock_fn.side_effect = ValueError("No price data available")
resp = await client.post(
"/api/v1/portfolio/optimize",
json={"symbols": ["AAPL", "MSFT"], "days": 365},
)
assert resp.status_code == 400
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.optimize_hrp", new_callable=AsyncMock)
async def test_portfolio_optimize_default_days(mock_fn, client):
mock_fn.return_value = {"weights": {"AAPL": 1.0}, "method": "hrp"}
resp = await client.post(
"/api/v1/portfolio/optimize", json={"symbols": ["AAPL"]}
)
assert resp.status_code == 200
mock_fn.assert_called_once_with(["AAPL"], days=365)
# --- POST /api/v1/portfolio/correlation ---
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.compute_correlation", new_callable=AsyncMock)
async def test_portfolio_correlation_happy_path(mock_fn, client):
mock_fn.return_value = {
"symbols": ["AAPL", "MSFT"],
"matrix": [[1.0, 0.85], [0.85, 1.0]],
}
resp = await client.post(
"/api/v1/portfolio/correlation",
json={"symbols": ["AAPL", "MSFT"], "days": 365},
)
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["symbols"] == ["AAPL", "MSFT"]
assert data["data"]["matrix"][0][0] == pytest.approx(1.0)
mock_fn.assert_called_once_with(["AAPL", "MSFT"], days=365)
@pytest.mark.asyncio
async def test_portfolio_correlation_missing_symbols(client):
resp = await client.post("/api/v1/portfolio/correlation", json={"days": 365})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_portfolio_correlation_empty_symbols(client):
resp = await client.post(
"/api/v1/portfolio/correlation", json={"symbols": [], "days": 365}
)
assert resp.status_code == 422
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.compute_correlation", new_callable=AsyncMock)
async def test_portfolio_correlation_service_error_returns_502(mock_fn, client):
mock_fn.side_effect = RuntimeError("Failed")
resp = await client.post(
"/api/v1/portfolio/correlation",
json={"symbols": ["AAPL", "MSFT"], "days": 365},
)
assert resp.status_code == 502
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.compute_correlation", new_callable=AsyncMock)
async def test_portfolio_correlation_value_error_returns_400(mock_fn, client):
mock_fn.side_effect = ValueError("No price data available")
resp = await client.post(
"/api/v1/portfolio/correlation",
json={"symbols": ["AAPL", "MSFT"], "days": 365},
)
assert resp.status_code == 400
# --- POST /api/v1/portfolio/risk-parity ---
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.compute_risk_parity", new_callable=AsyncMock)
async def test_portfolio_risk_parity_happy_path(mock_fn, client):
mock_fn.return_value = {
"weights": {"AAPL": 0.35, "MSFT": 0.33, "GOOGL": 0.32},
"risk_contributions": {"AAPL": 0.34, "MSFT": 0.33, "GOOGL": 0.33},
"method": "risk_parity",
}
resp = await client.post(
"/api/v1/portfolio/risk-parity",
json={"symbols": ["AAPL", "MSFT", "GOOGL"], "days": 365},
)
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["method"] == "risk_parity"
assert "risk_contributions" in data["data"]
mock_fn.assert_called_once_with(["AAPL", "MSFT", "GOOGL"], days=365)
@pytest.mark.asyncio
async def test_portfolio_risk_parity_missing_symbols(client):
resp = await client.post("/api/v1/portfolio/risk-parity", json={"days": 365})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_portfolio_risk_parity_empty_symbols(client):
resp = await client.post(
"/api/v1/portfolio/risk-parity", json={"symbols": [], "days": 365}
)
assert resp.status_code == 422
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.compute_risk_parity", new_callable=AsyncMock)
async def test_portfolio_risk_parity_service_error_returns_502(mock_fn, client):
mock_fn.side_effect = RuntimeError("Failed")
resp = await client.post(
"/api/v1/portfolio/risk-parity",
json={"symbols": ["AAPL", "MSFT"], "days": 365},
)
assert resp.status_code == 502
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.compute_risk_parity", new_callable=AsyncMock)
async def test_portfolio_risk_parity_value_error_returns_400(mock_fn, client):
mock_fn.side_effect = ValueError("No price data available")
resp = await client.post(
"/api/v1/portfolio/risk-parity",
json={"symbols": ["AAPL", "MSFT"], "days": 365},
)
assert resp.status_code == 400
@pytest.mark.asyncio
@patch("routes_portfolio.portfolio_service.compute_risk_parity", new_callable=AsyncMock)
async def test_portfolio_risk_parity_default_days(mock_fn, client):
mock_fn.return_value = {
"weights": {"AAPL": 1.0},
"risk_contributions": {"AAPL": 1.0},
"method": "risk_parity",
}
resp = await client.post(
"/api/v1/portfolio/risk-parity", json={"symbols": ["AAPL"]}
)
assert resp.status_code == 200
mock_fn.assert_called_once_with(["AAPL"], days=365)