Files
openbb-invest-api/analysis_service.py
Yaojia Wang ad45cb429c feat: OpenBB Investment Analysis API
REST API wrapping OpenBB SDK for stock data, sentiment analysis,
technical indicators, macro data, and rule-based portfolio analysis.

- Stock data via yfinance (quote, profile, metrics, financials, historical, news)
- News sentiment via Alpha Vantage (per-article, per-ticker scores)
- Analyst data via Finnhub (recommendations, insider trades, upgrades)
- Macro data via FRED (Fed rate, CPI, GDP, unemployment, treasury yields)
- Technical indicators via openbb-technical (RSI, MACD, SMA, EMA, Bollinger)
- Rule-based portfolio analysis engine (BUY_MORE/HOLD/SELL)
- Stock discovery (gainers, losers, active, undervalued, growth)
- 102 tests, all passing
2026-03-09 00:20:10 +01:00

225 lines
6.8 KiB
Python

import asyncio
import logging
from datetime import datetime, timezone
from typing import Any
from mappers import metrics_from_dict
from models import (
ActionEnum,
AnalysisResult,
ConfidenceEnum,
Holding,
HoldingAnalysis,
PortfolioResponse,
)
import openbb_service
logger = logging.getLogger(__name__)
# --- Score thresholds ---
UPSIDE_BUY = 0.15 # >15% upside from target price -> BUY
UPSIDE_SELL = -0.10 # >10% downside from target price -> SELL
PNL_LOSS_BUY = -0.20 # loss >20% -> average down
PNL_PROFIT_SELL = 0.50 # profit >50% -> take profit
REVENUE_GROWTH_BUY = 0.10 # >10% YoY
REVENUE_GROWTH_SELL = 0.0 # negative growth
PE_LOW = 15.0 # below this suggests undervaluation
PE_HIGH = 35.0 # above this suggests overvaluation
MAX_CONCURRENT_HOLDINGS = 10
def _score_target_price(
current: float | None, target: float | None
) -> tuple[int, str | None]:
"""Score based on analyst target price vs current price.
Returns (score, reason). Score: +1 buy, 0 hold, -1 sell.
"""
if current is None or target is None or current <= 0:
return 0, None
upside = (target - current) / current
if upside > UPSIDE_BUY:
return 1, f"Analyst target {target:.2f} implies {upside:.0%} upside"
if upside < UPSIDE_SELL:
return -1, f"Analyst target {target:.2f} implies {upside:.0%} (downside)"
return 0, f"Near analyst target {target:.2f} ({upside:+.0%})"
def _score_pe(pe: float | None) -> tuple[int, str | None]:
"""Score PE ratio. Using simple absolute thresholds as proxy for industry comparison."""
if pe is None:
return 0, None
if pe < 0:
return -1, f"Negative PE ({pe:.1f}) indicates losses"
if pe < PE_LOW:
return 1, f"Low PE ({pe:.1f}) suggests undervaluation"
if pe > PE_HIGH:
return -1, f"High PE ({pe:.1f}) suggests overvaluation"
return 0, f"PE ({pe:.1f}) within normal range"
def _score_revenue_growth(growth: float | None) -> tuple[int, str | None]:
if growth is None:
return 0, None
if growth > REVENUE_GROWTH_BUY:
return 1, f"Revenue growth {growth:.0%} is strong"
if growth < REVENUE_GROWTH_SELL:
return -1, f"Revenue declining ({growth:.0%})"
return 0, f"Revenue growth {growth:.0%} is moderate"
def _score_pnl(pnl_percent: float) -> tuple[int, str | None]:
"""Score based on current P&L vs cost basis."""
if pnl_percent < PNL_LOSS_BUY:
return 1, f"Down {pnl_percent:.0%} — consider averaging down"
if pnl_percent > PNL_PROFIT_SELL:
return -1, f"Up {pnl_percent:.0%} — consider taking profit"
return 0, f"P&L {pnl_percent:+.0%} within hold range"
def compute_analysis(
current_price: float | None,
buy_in_price: float,
target_price: float | None,
metrics: dict[str, Any],
) -> AnalysisResult:
"""Rule engine: compute BUY_MORE / HOLD / SELL with reasons."""
scores: list[int] = []
reasons: list[str] = []
# P&L score
if current_price is not None and current_price > 0:
pnl_pct = (current_price - buy_in_price) / buy_in_price
s, r = _score_pnl(pnl_pct)
scores.append(s)
if r:
reasons.append(r)
# Target price score
s, r = _score_target_price(current_price, target_price)
scores.append(s)
if r:
reasons.append(r)
# PE score
s, r = _score_pe(metrics.get("pe_ratio"))
scores.append(s)
if r:
reasons.append(r)
# Revenue growth score
s, r = _score_revenue_growth(metrics.get("revenue_growth"))
scores.append(s)
if r:
reasons.append(r)
# Aggregate
total = sum(scores)
active_signals = len([s for s in scores if s != 0])
if total >= 2:
action = ActionEnum.BUY_MORE
elif total <= -2:
action = ActionEnum.SELL
else:
action = ActionEnum.HOLD
if active_signals >= 3 and abs(total) >= 2:
confidence = ConfidenceEnum.HIGH
elif active_signals >= 2 and abs(total) >= 1:
confidence = ConfidenceEnum.MEDIUM
else:
confidence = ConfidenceEnum.LOW
if not reasons:
reasons.append("Insufficient data for detailed analysis")
return AnalysisResult(action=action, confidence=confidence, reasons=reasons)
async def analyze_holding(holding: Holding) -> HoldingAnalysis:
"""Analyze a single holding: fetch data and run rule engine."""
quote_data, metrics_data, target_price = await asyncio.gather(
openbb_service.get_quote(holding.symbol),
openbb_service.get_metrics(holding.symbol),
openbb_service.get_price_target(holding.symbol),
)
current_price = quote_data.get("last_price") or quote_data.get("close")
pnl = None
pnl_percent = None
if current_price is not None:
pnl = (current_price - holding.buy_in_price) * holding.shares
pnl_percent = (current_price - holding.buy_in_price) / holding.buy_in_price
metrics = metrics_from_dict(holding.symbol, metrics_data)
analysis = compute_analysis(
current_price=current_price,
buy_in_price=holding.buy_in_price,
target_price=target_price,
metrics=metrics_data,
)
return HoldingAnalysis(
symbol=holding.symbol,
current_price=current_price,
buy_in_price=holding.buy_in_price,
shares=holding.shares,
pnl=pnl,
pnl_percent=pnl_percent,
metrics=metrics,
target_price=target_price,
analysis=analysis,
)
def _make_fallback_holding(holding: Holding) -> HoldingAnalysis:
return HoldingAnalysis(
symbol=holding.symbol,
current_price=None,
buy_in_price=holding.buy_in_price,
shares=holding.shares,
analysis=AnalysisResult(
action=ActionEnum.HOLD,
confidence=ConfidenceEnum.LOW,
reasons=["Data unavailable. Please try again later."],
),
)
async def analyze_portfolio(holdings: list[Holding]) -> PortfolioResponse:
"""Analyze entire portfolio with bounded concurrency."""
sem = asyncio.Semaphore(MAX_CONCURRENT_HOLDINGS)
async def bounded_analyze(h: Holding) -> HoldingAnalysis:
async with sem:
return await analyze_holding(h)
results = await asyncio.gather(
*(bounded_analyze(h) for h in holdings),
return_exceptions=True,
)
analyzed: list[HoldingAnalysis] = []
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(
"Failed to analyze %s: %s",
holdings[i].symbol,
result,
exc_info=result,
)
analyzed.append(_make_fallback_holding(holdings[i]))
else:
analyzed.append(result)
return PortfolioResponse(
holdings=analyzed,
analyzed_at=datetime.now(timezone.utc),
)