Strategies: - POST /backtest/sma-crossover - SMA crossover (short/long window) - POST /backtest/rsi - RSI oversold/overbought signals - POST /backtest/buy-and-hold - passive benchmark - POST /backtest/momentum - multi-symbol momentum rotation Returns: total_return, annualized_return, sharpe_ratio, max_drawdown, win_rate, total_trades, equity_curve (last 20 points) Implementation: pure pandas/numpy, no external backtesting libs. Shared _compute_metrics helper across all strategies. 79 new tests (46 service unit + 33 route integration). All 391 tests passing.
373 lines
12 KiB
Python
373 lines
12 KiB
Python
"""Backtesting engine using pure pandas/numpy - no external backtesting libraries."""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from obb_utils import fetch_historical
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_EQUITY_CURVE_MAX_POINTS = 20
|
|
_MIN_BARS_FOR_SINGLE_POINT = 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal signal computation helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _extract_closes(result: Any) -> pd.Series:
|
|
"""Pull close prices from an OBBject result into a float Series."""
|
|
bars = result.results
|
|
closes = [getattr(bar, "close", None) for bar in bars]
|
|
return pd.Series(closes, dtype=float).dropna().reset_index(drop=True)
|
|
|
|
|
|
def _compute_sma_signals(
|
|
prices: pd.Series, short_window: int, long_window: int
|
|
) -> pd.Series:
|
|
"""Return position series (1=long, 0=flat) from SMA crossover strategy.
|
|
|
|
Buy when short SMA crosses above long SMA; sell when it crosses below.
|
|
"""
|
|
short_ma = prices.rolling(short_window).mean()
|
|
long_ma = prices.rolling(long_window).mean()
|
|
|
|
# 1 where short > long, else 0; NaN before long_window filled with 0
|
|
signal = (short_ma > long_ma).astype(int)
|
|
signal.iloc[: long_window - 1] = 0
|
|
return signal
|
|
|
|
|
|
def _compute_rsi(prices: pd.Series, period: int) -> pd.Series:
|
|
"""Compute Wilder RSI for a price series."""
|
|
delta = prices.diff()
|
|
gain = delta.clip(lower=0)
|
|
loss = (-delta).clip(lower=0)
|
|
|
|
avg_gain = gain.ewm(alpha=1 / period, adjust=False).mean()
|
|
avg_loss = loss.ewm(alpha=1 / period, adjust=False).mean()
|
|
|
|
# When avg_loss == 0 and avg_gain > 0, RSI = 100; avoid division by zero.
|
|
rsi = pd.Series(np.where(
|
|
avg_loss == 0,
|
|
np.where(avg_gain == 0, 50.0, 100.0),
|
|
100 - (100 / (1 + avg_gain / avg_loss)),
|
|
), index=prices.index, dtype=float)
|
|
# Preserve NaN for the initial diff period
|
|
rsi[avg_gain.isna()] = np.nan
|
|
return rsi
|
|
|
|
|
|
def _compute_rsi_signals(
|
|
prices: pd.Series, period: int, oversold: float, overbought: float
|
|
) -> pd.Series:
|
|
"""Return position series (1=long, 0=flat) from RSI strategy.
|
|
|
|
Buy when RSI < oversold; sell when RSI > overbought.
|
|
"""
|
|
rsi = _compute_rsi(prices, period)
|
|
position = pd.Series(0, index=prices.index, dtype=int)
|
|
in_trade = False
|
|
|
|
for i in range(len(prices)):
|
|
rsi_val = rsi.iloc[i]
|
|
if pd.isna(rsi_val):
|
|
continue
|
|
if not in_trade and rsi_val < oversold:
|
|
in_trade = True
|
|
elif in_trade and rsi_val > overbought:
|
|
in_trade = False
|
|
if in_trade:
|
|
position.iloc[i] = 1
|
|
|
|
return position
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared metrics computation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _compute_metrics(equity: pd.Series, trades: int) -> dict[str, Any]:
|
|
"""Compute standard backtest performance metrics from an equity curve.
|
|
|
|
Parameters
|
|
----------
|
|
equity:
|
|
Daily portfolio value series starting from initial_capital.
|
|
trades:
|
|
Number of completed round-trip trades.
|
|
|
|
Returns
|
|
-------
|
|
dict with keys: total_return, annualized_return, sharpe_ratio,
|
|
max_drawdown, win_rate, total_trades, equity_curve.
|
|
"""
|
|
n = len(equity)
|
|
initial = float(equity.iloc[0])
|
|
final = float(equity.iloc[-1])
|
|
|
|
total_return = (final - initial) / initial if initial != 0 else 0.0
|
|
|
|
trading_days = max(n - 1, 1)
|
|
annualized_return = (1 + total_return) ** (252 / trading_days) - 1
|
|
|
|
# Sharpe ratio (annualized, risk-free rate = 0)
|
|
sharpe_ratio: float | None = None
|
|
if n > 1:
|
|
daily_returns = equity.pct_change().dropna()
|
|
std = float(daily_returns.std())
|
|
if std > 0:
|
|
sharpe_ratio = float(daily_returns.mean() / std * np.sqrt(252))
|
|
|
|
# Maximum drawdown
|
|
rolling_max = equity.cummax()
|
|
drawdown = (equity - rolling_max) / rolling_max
|
|
max_drawdown = float(drawdown.min())
|
|
|
|
# Win rate: undefined when no trades
|
|
win_rate: float | None = None
|
|
if trades > 0:
|
|
# Approximate: compare each trade entry/exit pair captured in equity
|
|
win_rate = None # will be overridden by callers that track trades
|
|
|
|
# Equity curve - last N points as plain Python floats
|
|
last_n = equity.iloc[-_EQUITY_CURVE_MAX_POINTS:]
|
|
equity_curve = [round(float(v), 4) for v in last_n]
|
|
|
|
return {
|
|
"total_return": round(total_return, 6),
|
|
"annualized_return": round(annualized_return, 6),
|
|
"sharpe_ratio": round(sharpe_ratio, 6) if sharpe_ratio is not None else None,
|
|
"max_drawdown": round(max_drawdown, 6),
|
|
"win_rate": win_rate,
|
|
"total_trades": trades,
|
|
"equity_curve": equity_curve,
|
|
}
|
|
|
|
|
|
def _simulate_positions(
|
|
prices: pd.Series,
|
|
positions: pd.Series,
|
|
initial_capital: float,
|
|
) -> tuple[pd.Series, int, int]:
|
|
"""Simulate portfolio equity given a position series and prices.
|
|
|
|
Returns (equity_curve, total_trades, winning_trades).
|
|
A trade is a complete buy->sell round-trip.
|
|
"""
|
|
# Daily returns when in position
|
|
price_returns = prices.pct_change().fillna(0.0)
|
|
strategy_returns = positions.shift(1).fillna(0).astype(float) * price_returns
|
|
|
|
equity = initial_capital * (1 + strategy_returns).cumprod()
|
|
equity.iloc[0] = initial_capital
|
|
|
|
# Count round trips
|
|
trade_changes = positions.diff().abs()
|
|
entries = int((trade_changes == 1).sum())
|
|
exits = int((trade_changes == -1).sum())
|
|
total_trades = min(entries, exits) # only completed round trips
|
|
|
|
# Count wins: each completed trade where exit value > entry value
|
|
winning_trades = 0
|
|
in_trade = False
|
|
entry_price = 0.0
|
|
for i in range(len(positions)):
|
|
pos = int(positions.iloc[i])
|
|
price = float(prices.iloc[i])
|
|
if not in_trade and pos == 1:
|
|
in_trade = True
|
|
entry_price = price
|
|
elif in_trade and pos == 0:
|
|
in_trade = False
|
|
if price > entry_price:
|
|
winning_trades += 1
|
|
|
|
return equity, total_trades, winning_trades
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public strategy functions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def backtest_sma_crossover(
|
|
symbol: str,
|
|
short_window: int,
|
|
long_window: int,
|
|
days: int,
|
|
initial_capital: float,
|
|
) -> dict[str, Any]:
|
|
"""Run SMA crossover backtest for a single symbol."""
|
|
hist = await fetch_historical(symbol, days)
|
|
if hist is None:
|
|
raise ValueError(f"No historical data available for {symbol}")
|
|
|
|
prices = _extract_closes(hist)
|
|
if len(prices) <= long_window:
|
|
raise ValueError(
|
|
f"Insufficient data: need >{long_window} bars, got {len(prices)}"
|
|
)
|
|
|
|
positions = _compute_sma_signals(prices, short_window, long_window)
|
|
equity, total_trades, winning_trades = _simulate_positions(
|
|
prices, positions, initial_capital
|
|
)
|
|
|
|
result = _compute_metrics(equity, total_trades)
|
|
if total_trades > 0:
|
|
result["win_rate"] = round(winning_trades / total_trades, 6)
|
|
return result
|
|
|
|
|
|
async def backtest_rsi(
|
|
symbol: str,
|
|
period: int,
|
|
oversold: float,
|
|
overbought: float,
|
|
days: int,
|
|
initial_capital: float,
|
|
) -> dict[str, Any]:
|
|
"""Run RSI-based backtest for a single symbol."""
|
|
hist = await fetch_historical(symbol, days)
|
|
if hist is None:
|
|
raise ValueError(f"No historical data available for {symbol}")
|
|
|
|
prices = _extract_closes(hist)
|
|
if len(prices) <= period:
|
|
raise ValueError(
|
|
f"Insufficient data: need >{period} bars, got {len(prices)}"
|
|
)
|
|
|
|
positions = _compute_rsi_signals(prices, period, oversold, overbought)
|
|
equity, total_trades, winning_trades = _simulate_positions(
|
|
prices, positions, initial_capital
|
|
)
|
|
|
|
result = _compute_metrics(equity, total_trades)
|
|
if total_trades > 0:
|
|
result["win_rate"] = round(winning_trades / total_trades, 6)
|
|
return result
|
|
|
|
|
|
async def backtest_buy_and_hold(
|
|
symbol: str,
|
|
days: int,
|
|
initial_capital: float,
|
|
) -> dict[str, Any]:
|
|
"""Run a simple buy-and-hold backtest as a benchmark."""
|
|
hist = await fetch_historical(symbol, days)
|
|
if hist is None:
|
|
raise ValueError(f"No historical data available for {symbol}")
|
|
|
|
prices = _extract_closes(hist)
|
|
if len(prices) < _MIN_BARS_FOR_SINGLE_POINT:
|
|
raise ValueError(
|
|
f"Insufficient data: need at least 2 bars, got {len(prices)}"
|
|
)
|
|
|
|
# Always fully invested - position is 1 from day 0
|
|
positions = pd.Series(1, index=prices.index, dtype=int)
|
|
equity, _, _ = _simulate_positions(prices, positions, initial_capital)
|
|
|
|
result = _compute_metrics(equity, trades=1)
|
|
# Buy-and-hold: 1 trade, win_rate is whether final > initial
|
|
result["win_rate"] = 1.0 if result["total_return"] > 0 else 0.0
|
|
result["total_trades"] = 1
|
|
return result
|
|
|
|
|
|
async def backtest_momentum(
|
|
symbols: list[str],
|
|
lookback: int,
|
|
top_n: int,
|
|
rebalance_days: int,
|
|
days: int,
|
|
initial_capital: float,
|
|
) -> dict[str, Any]:
|
|
"""Run momentum strategy: every rebalance_days pick top_n symbols by lookback return."""
|
|
# Fetch all price series
|
|
price_map: dict[str, pd.Series] = {}
|
|
for sym in symbols:
|
|
hist = await fetch_historical(sym, days)
|
|
if hist is not None:
|
|
closes = _extract_closes(hist)
|
|
if len(closes) > lookback:
|
|
price_map[sym] = closes
|
|
|
|
if not price_map:
|
|
raise ValueError("No price data available for any of the requested symbols")
|
|
|
|
# Align all price series to the same length (min across symbols)
|
|
min_len = min(len(v) for v in price_map.values())
|
|
aligned = {sym: s.iloc[:min_len].reset_index(drop=True) for sym, s in price_map.items()}
|
|
|
|
n_bars = min_len
|
|
portfolio_value = initial_capital
|
|
equity_values: list[float] = [initial_capital]
|
|
allocation_history: list[dict[str, Any]] = []
|
|
total_trades = 0
|
|
|
|
current_symbols: list[str] = []
|
|
current_weights: list[float] = []
|
|
entry_prices: dict[str, float] = {}
|
|
winning_trades = 0
|
|
|
|
for bar in range(1, n_bars):
|
|
# Rebalance check
|
|
if bar % rebalance_days == 0 and bar >= lookback:
|
|
# Rank symbols by lookback-period return
|
|
returns: dict[str, float] = {}
|
|
for sym, prices in aligned.items():
|
|
if bar >= lookback:
|
|
ret = (prices.iloc[bar] / prices.iloc[bar - lookback]) - 1
|
|
returns[sym] = ret
|
|
|
|
sorted_syms = sorted(returns, key=returns.get, reverse=True) # type: ignore[arg-type]
|
|
selected = sorted_syms[:top_n]
|
|
weight = 1.0 / len(selected) if selected else 0.0
|
|
|
|
# Count closed positions as trades
|
|
for sym in current_symbols:
|
|
if sym in aligned:
|
|
exit_price = float(aligned[sym].iloc[bar])
|
|
entry_price = entry_prices.get(sym, exit_price)
|
|
total_trades += 1
|
|
if exit_price > entry_price:
|
|
winning_trades += 1
|
|
|
|
current_symbols = selected
|
|
current_weights = [weight] * len(selected)
|
|
entry_prices = {sym: float(aligned[sym].iloc[bar]) for sym in selected}
|
|
|
|
allocation_history.append({
|
|
"bar": bar,
|
|
"symbols": selected,
|
|
"weights": current_weights,
|
|
})
|
|
|
|
# Compute portfolio daily return
|
|
if current_symbols:
|
|
daily_ret = 0.0
|
|
for sym, w in zip(current_symbols, current_weights):
|
|
prev_bar = bar - 1
|
|
prev_price = float(aligned[sym].iloc[prev_bar])
|
|
curr_price = float(aligned[sym].iloc[bar])
|
|
if prev_price != 0:
|
|
daily_ret += w * (curr_price / prev_price - 1)
|
|
portfolio_value = portfolio_value * (1 + daily_ret)
|
|
|
|
equity_values.append(portfolio_value)
|
|
|
|
equity = pd.Series(equity_values, dtype=float)
|
|
result = _compute_metrics(equity, total_trades)
|
|
if total_trades > 0:
|
|
result["win_rate"] = round(winning_trades / total_trades, 6)
|
|
result["allocation_history"] = allocation_history
|
|
return result
|