"""Backtesting engine using pure pandas/numpy - no external backtesting libraries.""" import logging from typing import Any import numpy as np import pandas as pd from obb_utils import fetch_historical logger = logging.getLogger(__name__) _EQUITY_CURVE_MAX_POINTS = 20 _MIN_BARS_FOR_SINGLE_POINT = 2 # --------------------------------------------------------------------------- # Internal signal computation helpers # --------------------------------------------------------------------------- def _extract_closes(result: Any) -> pd.Series: """Pull close prices from an OBBject result into a float Series.""" bars = result.results closes = [getattr(bar, "close", None) for bar in bars] return pd.Series(closes, dtype=float).dropna().reset_index(drop=True) def _compute_sma_signals( prices: pd.Series, short_window: int, long_window: int ) -> pd.Series: """Return position series (1=long, 0=flat) from SMA crossover strategy. Buy when short SMA crosses above long SMA; sell when it crosses below. """ short_ma = prices.rolling(short_window).mean() long_ma = prices.rolling(long_window).mean() # 1 where short > long, else 0; NaN before long_window filled with 0 signal = (short_ma > long_ma).astype(int) signal.iloc[: long_window - 1] = 0 return signal def _compute_rsi(prices: pd.Series, period: int) -> pd.Series: """Compute Wilder RSI for a price series.""" delta = prices.diff() gain = delta.clip(lower=0) loss = (-delta).clip(lower=0) avg_gain = gain.ewm(alpha=1 / period, adjust=False).mean() avg_loss = loss.ewm(alpha=1 / period, adjust=False).mean() # When avg_loss == 0 and avg_gain > 0, RSI = 100; avoid division by zero. rsi = pd.Series(np.where( avg_loss == 0, np.where(avg_gain == 0, 50.0, 100.0), 100 - (100 / (1 + avg_gain / avg_loss)), ), index=prices.index, dtype=float) # Preserve NaN for the initial diff period rsi[avg_gain.isna()] = np.nan return rsi def _compute_rsi_signals( prices: pd.Series, period: int, oversold: float, overbought: float ) -> pd.Series: """Return position series (1=long, 0=flat) from RSI strategy. Buy when RSI < oversold; sell when RSI > overbought. """ rsi = _compute_rsi(prices, period) position = pd.Series(0, index=prices.index, dtype=int) in_trade = False for i in range(len(prices)): rsi_val = rsi.iloc[i] if pd.isna(rsi_val): continue if not in_trade and rsi_val < oversold: in_trade = True elif in_trade and rsi_val > overbought: in_trade = False if in_trade: position.iloc[i] = 1 return position # --------------------------------------------------------------------------- # Shared metrics computation # --------------------------------------------------------------------------- def _compute_metrics(equity: pd.Series, trades: int) -> dict[str, Any]: """Compute standard backtest performance metrics from an equity curve. Parameters ---------- equity: Daily portfolio value series starting from initial_capital. trades: Number of completed round-trip trades. Returns ------- dict with keys: total_return, annualized_return, sharpe_ratio, max_drawdown, win_rate, total_trades, equity_curve. """ n = len(equity) initial = float(equity.iloc[0]) final = float(equity.iloc[-1]) total_return = (final - initial) / initial if initial != 0 else 0.0 trading_days = max(n - 1, 1) annualized_return = (1 + total_return) ** (252 / trading_days) - 1 # Sharpe ratio (annualized, risk-free rate = 0) sharpe_ratio: float | None = None if n > 1: daily_returns = equity.pct_change().dropna() std = float(daily_returns.std()) if std > 0: sharpe_ratio = float(daily_returns.mean() / std * np.sqrt(252)) # Maximum drawdown rolling_max = equity.cummax() drawdown = (equity - rolling_max) / rolling_max max_drawdown = float(drawdown.min()) # Win rate: undefined when no trades win_rate: float | None = None if trades > 0: # Approximate: compare each trade entry/exit pair captured in equity win_rate = None # will be overridden by callers that track trades # Equity curve - last N points as plain Python floats last_n = equity.iloc[-_EQUITY_CURVE_MAX_POINTS:] equity_curve = [round(float(v), 4) for v in last_n] return { "total_return": round(total_return, 6), "annualized_return": round(annualized_return, 6), "sharpe_ratio": round(sharpe_ratio, 6) if sharpe_ratio is not None else None, "max_drawdown": round(max_drawdown, 6), "win_rate": win_rate, "total_trades": trades, "equity_curve": equity_curve, } def _simulate_positions( prices: pd.Series, positions: pd.Series, initial_capital: float, ) -> tuple[pd.Series, int, int]: """Simulate portfolio equity given a position series and prices. Returns (equity_curve, total_trades, winning_trades). A trade is a complete buy->sell round-trip. """ # Daily returns when in position price_returns = prices.pct_change().fillna(0.0) strategy_returns = positions.shift(1).fillna(0).astype(float) * price_returns equity = initial_capital * (1 + strategy_returns).cumprod() equity.iloc[0] = initial_capital # Count round trips trade_changes = positions.diff().abs() entries = int((trade_changes == 1).sum()) exits = int((trade_changes == -1).sum()) total_trades = min(entries, exits) # only completed round trips # Count wins: each completed trade where exit value > entry value winning_trades = 0 in_trade = False entry_price = 0.0 for i in range(len(positions)): pos = int(positions.iloc[i]) price = float(prices.iloc[i]) if not in_trade and pos == 1: in_trade = True entry_price = price elif in_trade and pos == 0: in_trade = False if price > entry_price: winning_trades += 1 return equity, total_trades, winning_trades # --------------------------------------------------------------------------- # Public strategy functions # --------------------------------------------------------------------------- async def backtest_sma_crossover( symbol: str, short_window: int, long_window: int, days: int, initial_capital: float, ) -> dict[str, Any]: """Run SMA crossover backtest for a single symbol.""" hist = await fetch_historical(symbol, days) if hist is None: raise ValueError(f"No historical data available for {symbol}") prices = _extract_closes(hist) if len(prices) <= long_window: raise ValueError( f"Insufficient data: need >{long_window} bars, got {len(prices)}" ) positions = _compute_sma_signals(prices, short_window, long_window) equity, total_trades, winning_trades = _simulate_positions( prices, positions, initial_capital ) result = _compute_metrics(equity, total_trades) if total_trades > 0: result["win_rate"] = round(winning_trades / total_trades, 6) return result async def backtest_rsi( symbol: str, period: int, oversold: float, overbought: float, days: int, initial_capital: float, ) -> dict[str, Any]: """Run RSI-based backtest for a single symbol.""" hist = await fetch_historical(symbol, days) if hist is None: raise ValueError(f"No historical data available for {symbol}") prices = _extract_closes(hist) if len(prices) <= period: raise ValueError( f"Insufficient data: need >{period} bars, got {len(prices)}" ) positions = _compute_rsi_signals(prices, period, oversold, overbought) equity, total_trades, winning_trades = _simulate_positions( prices, positions, initial_capital ) result = _compute_metrics(equity, total_trades) if total_trades > 0: result["win_rate"] = round(winning_trades / total_trades, 6) return result async def backtest_buy_and_hold( symbol: str, days: int, initial_capital: float, ) -> dict[str, Any]: """Run a simple buy-and-hold backtest as a benchmark.""" hist = await fetch_historical(symbol, days) if hist is None: raise ValueError(f"No historical data available for {symbol}") prices = _extract_closes(hist) if len(prices) < _MIN_BARS_FOR_SINGLE_POINT: raise ValueError( f"Insufficient data: need at least 2 bars, got {len(prices)}" ) # Always fully invested - position is 1 from day 0 positions = pd.Series(1, index=prices.index, dtype=int) equity, _, _ = _simulate_positions(prices, positions, initial_capital) result = _compute_metrics(equity, trades=1) # Buy-and-hold: 1 trade, win_rate is whether final > initial result["win_rate"] = 1.0 if result["total_return"] > 0 else 0.0 result["total_trades"] = 1 return result async def backtest_momentum( symbols: list[str], lookback: int, top_n: int, rebalance_days: int, days: int, initial_capital: float, ) -> dict[str, Any]: """Run momentum strategy: every rebalance_days pick top_n symbols by lookback return.""" # Fetch all price series price_map: dict[str, pd.Series] = {} for sym in symbols: hist = await fetch_historical(sym, days) if hist is not None: closes = _extract_closes(hist) if len(closes) > lookback: price_map[sym] = closes if not price_map: raise ValueError("No price data available for any of the requested symbols") # Align all price series to the same length (min across symbols) min_len = min(len(v) for v in price_map.values()) aligned = {sym: s.iloc[:min_len].reset_index(drop=True) for sym, s in price_map.items()} n_bars = min_len portfolio_value = initial_capital equity_values: list[float] = [initial_capital] allocation_history: list[dict[str, Any]] = [] total_trades = 0 current_symbols: list[str] = [] current_weights: list[float] = [] entry_prices: dict[str, float] = {} winning_trades = 0 for bar in range(1, n_bars): # Rebalance check if bar % rebalance_days == 0 and bar >= lookback: # Rank symbols by lookback-period return returns: dict[str, float] = {} for sym, prices in aligned.items(): if bar >= lookback: ret = (prices.iloc[bar] / prices.iloc[bar - lookback]) - 1 returns[sym] = ret sorted_syms = sorted(returns, key=returns.get, reverse=True) # type: ignore[arg-type] selected = sorted_syms[:top_n] weight = 1.0 / len(selected) if selected else 0.0 # Count closed positions as trades for sym in current_symbols: if sym in aligned: exit_price = float(aligned[sym].iloc[bar]) entry_price = entry_prices.get(sym, exit_price) total_trades += 1 if exit_price > entry_price: winning_trades += 1 current_symbols = selected current_weights = [weight] * len(selected) entry_prices = {sym: float(aligned[sym].iloc[bar]) for sym in selected} allocation_history.append({ "bar": bar, "symbols": selected, "weights": current_weights, }) # Compute portfolio daily return if current_symbols: daily_ret = 0.0 for sym, w in zip(current_symbols, current_weights): prev_bar = bar - 1 prev_price = float(aligned[sym].iloc[prev_bar]) curr_price = float(aligned[sym].iloc[bar]) if prev_price != 0: daily_ret += w * (curr_price / prev_price - 1) portfolio_value = portfolio_value * (1 + daily_ret) equity_values.append(portfolio_value) equity = pd.Series(equity_values, dtype=float) result = _compute_metrics(equity, total_trades) if total_trades > 0: result["win_rate"] = round(winning_trades / total_trades, 6) result["allocation_history"] = allocation_history return result