"""Technical analysis indicators via openbb-technical (local computation).""" import asyncio import logging from typing import Any from openbb import obb from obb_utils import fetch_historical, to_list logger = logging.getLogger(__name__) async def get_technical_indicators( symbol: str, days: int = 400 ) -> dict[str, Any]: """Compute key technical indicators for a symbol.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data available"} result: dict[str, Any] = {"symbol": symbol} # RSI (14-period) try: rsi = await asyncio.to_thread(obb.technical.rsi, data=hist.results, length=14) rsi_items = _extract_latest(rsi) result["rsi_14"] = rsi_items.get("close_RSI_14") except Exception: logger.warning("RSI calculation failed for %s", symbol, exc_info=True) result["rsi_14"] = None # MACD (12, 26, 9) try: macd = await asyncio.to_thread( obb.technical.macd, data=hist.results, fast=12, slow=26, signal=9 ) macd_items = _extract_latest(macd) result["macd"] = { "macd": macd_items.get("close_MACD_12_26_9"), "signal": macd_items.get("close_MACDs_12_26_9"), "histogram": macd_items.get("close_MACDh_12_26_9"), } except Exception: logger.warning("MACD calculation failed for %s", symbol, exc_info=True) result["macd"] = None # SMA (20, 50, 200) for period in [20, 50, 200]: try: sma = await asyncio.to_thread( obb.technical.sma, data=hist.results, length=period ) sma_items = _extract_latest(sma) result[f"sma_{period}"] = sma_items.get(f"close_SMA_{period}") except Exception: logger.warning("SMA_%d failed for %s", period, symbol, exc_info=True) result[f"sma_{period}"] = None # EMA (12, 26) for period in [12, 26]: try: ema = await asyncio.to_thread( obb.technical.ema, data=hist.results, length=period ) ema_items = _extract_latest(ema) result[f"ema_{period}"] = ema_items.get(f"close_EMA_{period}") except Exception: logger.warning("EMA_%d failed for %s", period, symbol, exc_info=True) result[f"ema_{period}"] = None # Bollinger Bands (20, 2) try: bbands = await asyncio.to_thread( obb.technical.bbands, data=hist.results, length=20, std=2 ) bb_items = _extract_latest(bbands) result["bollinger_bands"] = { "upper": bb_items.get("close_BBU_20_2.0"), "middle": bb_items.get("close_BBM_20_2.0"), "lower": bb_items.get("close_BBL_20_2.0"), } except Exception: logger.warning("Bollinger Bands failed for %s", symbol, exc_info=True) result["bollinger_bands"] = None # Add interpretation result["signals"] = _interpret_signals(result) return result def _extract_latest(result: Any) -> dict[str, Any]: """Get the last row from a technical indicator result as a dict.""" if result is None or result.results is None: return {} items = result.results if isinstance(items, list) and items: last = items[-1] if hasattr(last, "model_dump"): return last.model_dump() return vars(last) if vars(last) else {} return {} def _interpret_signals(data: dict[str, Any]) -> list[str]: """Generate simple text signals from technical indicators.""" signals: list[str] = [] rsi = data.get("rsi_14") if rsi is not None: if rsi > 70: signals.append(f"RSI {rsi:.1f}: Overbought (bearish signal)") elif rsi < 30: signals.append(f"RSI {rsi:.1f}: Oversold (bullish signal)") else: signals.append(f"RSI {rsi:.1f}: Neutral") macd = data.get("macd") if macd and macd.get("histogram") is not None: hist = macd["histogram"] if hist > 0: signals.append("MACD histogram positive (bullish momentum)") else: signals.append("MACD histogram negative (bearish momentum)") sma_50 = data.get("sma_50") sma_200 = data.get("sma_200") if sma_50 is not None and sma_200 is not None: if sma_50 > sma_200: signals.append("Golden cross: SMA50 above SMA200 (bullish trend)") else: signals.append("Death cross: SMA50 below SMA200 (bearish trend)") return signals # --- Individual Indicator Functions (Phase 1, Group I) --- async def get_atr(symbol: str, length: int = 14, days: int = 400) -> dict[str, Any]: """Average True Range -- volatility measurement for position sizing.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.atr, data=hist.results, length=length ) latest = _extract_latest(result) return { "symbol": symbol, "length": length, "atr": latest.get(f"ATRr_{length}"), } except Exception: logger.warning("ATR failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute ATR"} async def get_adx(symbol: str, length: int = 14, days: int = 400) -> dict[str, Any]: """Average Directional Index -- trend strength (>25 strong, <20 range-bound).""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.adx, data=hist.results, length=length ) latest = _extract_latest(result) adx_val = latest.get(f"ADX_{length}") signal = "strong trend" if adx_val and adx_val > 25 else "range-bound" return { "symbol": symbol, "length": length, "adx": adx_val, "dmp": latest.get(f"DMP_{length}"), "dmn": latest.get(f"DMN_{length}"), "signal": signal, } except Exception: logger.warning("ADX failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute ADX"} async def get_stoch( symbol: str, fast_k: int = 14, slow_d: int = 3, slow_k: int = 3, days: int = 400, ) -> dict[str, Any]: """Stochastic Oscillator -- overbought/oversold momentum signal.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.stoch, data=hist.results, fast_k=fast_k, slow_d=slow_d, slow_k=slow_k, ) latest = _extract_latest(result) k_val = latest.get(f"STOCHk_{fast_k}_{slow_d}_{slow_k}") d_val = latest.get(f"STOCHd_{fast_k}_{slow_d}_{slow_k}") signal = "neutral" if k_val is not None: if k_val > 80: signal = "overbought" elif k_val < 20: signal = "oversold" return { "symbol": symbol, "stoch_k": k_val, "stoch_d": d_val, "signal": signal, } except Exception: logger.warning("Stochastic failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Stochastic"} async def get_obv(symbol: str, days: int = 400) -> dict[str, Any]: """On-Balance Volume -- cumulative volume indicator for divergence detection.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread(obb.technical.obv, data=hist.results) latest = _extract_latest(result) return { "symbol": symbol, "obv": latest.get("OBV"), } except Exception: logger.warning("OBV failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute OBV"} async def get_ichimoku(symbol: str, days: int = 400) -> dict[str, Any]: """Ichimoku Cloud -- comprehensive trend system.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread(obb.technical.ichimoku, data=hist.results) latest = _extract_latest(result) return { "symbol": symbol, "tenkan_sen": latest.get("ITS_9"), "kijun_sen": latest.get("IKS_26"), "senkou_span_a": latest.get("ISA_9"), "senkou_span_b": latest.get("ISB_26"), "chikou_span": latest.get("ICS_26"), } except Exception: logger.warning("Ichimoku failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Ichimoku"} async def get_donchian(symbol: str, length: int = 20, days: int = 400) -> dict[str, Any]: """Donchian Channels -- breakout detection system.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.donchian, data=hist.results, lower_length=length, upper_length=length, ) latest = _extract_latest(result) return { "symbol": symbol, "length": length, "upper": latest.get(f"DCU_{length}_{length}"), "middle": latest.get(f"DCM_{length}_{length}"), "lower": latest.get(f"DCL_{length}_{length}"), } except Exception: logger.warning("Donchian failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Donchian"} async def get_aroon(symbol: str, length: int = 25, days: int = 400) -> dict[str, Any]: """Aroon Indicator -- trend direction and strength.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.aroon, data=hist.results, length=length, ) latest = _extract_latest(result) up = latest.get(f"AROONU_{length}") down = latest.get(f"AROOND_{length}") osc = latest.get(f"AROONOSC_{length}") return { "symbol": symbol, "length": length, "aroon_up": up, "aroon_down": down, "aroon_oscillator": osc, } except Exception: logger.warning("Aroon failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Aroon"} async def get_cci(symbol: str, length: int = 14, days: int = 400) -> dict[str, Any]: """Commodity Channel Index -- cyclical trend identification.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.cci, data=hist.results, length=length, ) latest = _extract_latest(result) cci_val = latest.get(f"CCI_{length}_{0.015}") signal = "neutral" if cci_val is not None: if cci_val > 100: signal = "overbought" elif cci_val < -100: signal = "oversold" return { "symbol": symbol, "length": length, "cci": cci_val, "signal": signal, } except Exception: logger.warning("CCI failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute CCI"} async def get_kc(symbol: str, length: int = 20, days: int = 400) -> dict[str, Any]: """Keltner Channels -- ATR-based volatility bands.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.kc, data=hist.results, length=length, ) latest = _extract_latest(result) return { "symbol": symbol, "length": length, "upper": latest.get(f"KCUe_{length}_2"), "middle": latest.get(f"KCBe_{length}_2"), "lower": latest.get(f"KCLe_{length}_2"), } except Exception: logger.warning("Keltner failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Keltner Channels"} async def get_fib(symbol: str, days: int = 120) -> dict[str, Any]: """Fibonacci Retracement levels from recent price range.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread(obb.technical.fib, data=hist.results) latest = _extract_latest(result) return {"symbol": symbol, **latest} except Exception: logger.warning("Fibonacci failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute Fibonacci"} async def get_ad(symbol: str, days: int = 400) -> dict[str, Any]: """Accumulation/Distribution Line -- volume-based trend indicator.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread(obb.technical.ad, data=hist.results) latest = _extract_latest(result) return { "symbol": symbol, "ad": latest.get("AD"), "ad_obv": latest.get("AD_OBV"), } except Exception: logger.warning("A/D failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute A/D Line"} async def get_cones(symbol: str, days: int = 365) -> dict[str, Any]: """Volatility Cones -- realized volatility quantiles for options analysis.""" hist = await fetch_historical(symbol, days) if hist is None: return {"symbol": symbol, "error": "No historical data"} try: result = await asyncio.to_thread( obb.technical.cones, data=hist.results, ) items = to_list(result) return {"symbol": symbol, "cones": items} except Exception: logger.warning("Volatility cones failed for %s", symbol, exc_info=True) return {"symbol": symbol, "error": "Failed to compute volatility cones"}