import asyncio import logging from datetime import datetime, timezone from typing import Any from mappers import metrics_from_dict from models import ( ActionEnum, AnalysisResult, ConfidenceEnum, Holding, HoldingAnalysis, PortfolioResponse, ) import openbb_service logger = logging.getLogger(__name__) # --- Score thresholds --- UPSIDE_BUY = 0.15 # >15% upside from target price -> BUY UPSIDE_SELL = -0.10 # >10% downside from target price -> SELL PNL_LOSS_BUY = -0.20 # loss >20% -> average down PNL_PROFIT_SELL = 0.50 # profit >50% -> take profit REVENUE_GROWTH_BUY = 0.10 # >10% YoY REVENUE_GROWTH_SELL = 0.0 # negative growth PE_LOW = 15.0 # below this suggests undervaluation PE_HIGH = 35.0 # above this suggests overvaluation MAX_CONCURRENT_HOLDINGS = 10 def _score_target_price( current: float | None, target: float | None ) -> tuple[int, str | None]: """Score based on analyst target price vs current price. Returns (score, reason). Score: +1 buy, 0 hold, -1 sell. """ if current is None or target is None or current <= 0: return 0, None upside = (target - current) / current if upside > UPSIDE_BUY: return 1, f"Analyst target {target:.2f} implies {upside:.0%} upside" if upside < UPSIDE_SELL: return -1, f"Analyst target {target:.2f} implies {upside:.0%} (downside)" return 0, f"Near analyst target {target:.2f} ({upside:+.0%})" def _score_pe(pe: float | None) -> tuple[int, str | None]: """Score PE ratio. Using simple absolute thresholds as proxy for industry comparison.""" if pe is None: return 0, None if pe < 0: return -1, f"Negative PE ({pe:.1f}) indicates losses" if pe < PE_LOW: return 1, f"Low PE ({pe:.1f}) suggests undervaluation" if pe > PE_HIGH: return -1, f"High PE ({pe:.1f}) suggests overvaluation" return 0, f"PE ({pe:.1f}) within normal range" def _score_revenue_growth(growth: float | None) -> tuple[int, str | None]: if growth is None: return 0, None if growth > REVENUE_GROWTH_BUY: return 1, f"Revenue growth {growth:.0%} is strong" if growth < REVENUE_GROWTH_SELL: return -1, f"Revenue declining ({growth:.0%})" return 0, f"Revenue growth {growth:.0%} is moderate" def _score_pnl(pnl_percent: float) -> tuple[int, str | None]: """Score based on current P&L vs cost basis.""" if pnl_percent < PNL_LOSS_BUY: return 1, f"Down {pnl_percent:.0%} — consider averaging down" if pnl_percent > PNL_PROFIT_SELL: return -1, f"Up {pnl_percent:.0%} — consider taking profit" return 0, f"P&L {pnl_percent:+.0%} within hold range" def compute_analysis( current_price: float | None, buy_in_price: float, target_price: float | None, metrics: dict[str, Any], ) -> AnalysisResult: """Rule engine: compute BUY_MORE / HOLD / SELL with reasons.""" scores: list[int] = [] reasons: list[str] = [] # P&L score if current_price is not None and current_price > 0: pnl_pct = (current_price - buy_in_price) / buy_in_price s, r = _score_pnl(pnl_pct) scores.append(s) if r: reasons.append(r) # Target price score s, r = _score_target_price(current_price, target_price) scores.append(s) if r: reasons.append(r) # PE score s, r = _score_pe(metrics.get("pe_ratio")) scores.append(s) if r: reasons.append(r) # Revenue growth score s, r = _score_revenue_growth(metrics.get("revenue_growth")) scores.append(s) if r: reasons.append(r) # Aggregate total = sum(scores) active_signals = len([s for s in scores if s != 0]) if total >= 2: action = ActionEnum.BUY_MORE elif total <= -2: action = ActionEnum.SELL else: action = ActionEnum.HOLD if active_signals >= 3 and abs(total) >= 2: confidence = ConfidenceEnum.HIGH elif active_signals >= 2 and abs(total) >= 1: confidence = ConfidenceEnum.MEDIUM else: confidence = ConfidenceEnum.LOW if not reasons: reasons.append("Insufficient data for detailed analysis") return AnalysisResult(action=action, confidence=confidence, reasons=reasons) async def analyze_holding(holding: Holding) -> HoldingAnalysis: """Analyze a single holding: fetch data and run rule engine.""" quote_data, metrics_data, target_price = await asyncio.gather( openbb_service.get_quote(holding.symbol), openbb_service.get_metrics(holding.symbol), openbb_service.get_price_target(holding.symbol), ) current_price = quote_data.get("last_price") or quote_data.get("close") pnl = None pnl_percent = None if current_price is not None: pnl = (current_price - holding.buy_in_price) * holding.shares pnl_percent = (current_price - holding.buy_in_price) / holding.buy_in_price metrics = metrics_from_dict(holding.symbol, metrics_data) analysis = compute_analysis( current_price=current_price, buy_in_price=holding.buy_in_price, target_price=target_price, metrics=metrics_data, ) return HoldingAnalysis( symbol=holding.symbol, current_price=current_price, buy_in_price=holding.buy_in_price, shares=holding.shares, pnl=pnl, pnl_percent=pnl_percent, metrics=metrics, target_price=target_price, analysis=analysis, ) def _make_fallback_holding(holding: Holding) -> HoldingAnalysis: return HoldingAnalysis( symbol=holding.symbol, current_price=None, buy_in_price=holding.buy_in_price, shares=holding.shares, analysis=AnalysisResult( action=ActionEnum.HOLD, confidence=ConfidenceEnum.LOW, reasons=["Data unavailable. Please try again later."], ), ) async def analyze_portfolio(holdings: list[Holding]) -> PortfolioResponse: """Analyze entire portfolio with bounded concurrency.""" sem = asyncio.Semaphore(MAX_CONCURRENT_HOLDINGS) async def bounded_analyze(h: Holding) -> HoldingAnalysis: async with sem: return await analyze_holding(h) results = await asyncio.gather( *(bounded_analyze(h) for h in holdings), return_exceptions=True, ) analyzed: list[HoldingAnalysis] = [] for i, result in enumerate(results): if isinstance(result, BaseException): logger.error( "Failed to analyze %s: %s", holdings[i].symbol, result, exc_info=result, ) analyzed.append(_make_fallback_holding(holdings[i])) else: analyzed.append(result) return PortfolioResponse( holdings=analyzed, analyzed_at=datetime.now(timezone.utc), )