REST API wrapping OpenBB SDK for stock data, sentiment analysis, technical indicators, macro data, and rule-based portfolio analysis. - Stock data via yfinance (quote, profile, metrics, financials, historical, news) - News sentiment via Alpha Vantage (per-article, per-ticker scores) - Analyst data via Finnhub (recommendations, insider trades, upgrades) - Macro data via FRED (Fed rate, CPI, GDP, unemployment, treasury yields) - Technical indicators via openbb-technical (RSI, MACD, SMA, EMA, Bollinger) - Rule-based portfolio analysis engine (BUY_MORE/HOLD/SELL) - Stock discovery (gainers, losers, active, undervalued, growth) - 102 tests, all passing
225 lines
6.8 KiB
Python
225 lines
6.8 KiB
Python
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from mappers import metrics_from_dict
|
|
from models import (
|
|
ActionEnum,
|
|
AnalysisResult,
|
|
ConfidenceEnum,
|
|
Holding,
|
|
HoldingAnalysis,
|
|
PortfolioResponse,
|
|
)
|
|
import openbb_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Score thresholds ---
|
|
|
|
UPSIDE_BUY = 0.15 # >15% upside from target price -> BUY
|
|
UPSIDE_SELL = -0.10 # >10% downside from target price -> SELL
|
|
|
|
PNL_LOSS_BUY = -0.20 # loss >20% -> average down
|
|
PNL_PROFIT_SELL = 0.50 # profit >50% -> take profit
|
|
|
|
REVENUE_GROWTH_BUY = 0.10 # >10% YoY
|
|
REVENUE_GROWTH_SELL = 0.0 # negative growth
|
|
|
|
PE_LOW = 15.0 # below this suggests undervaluation
|
|
PE_HIGH = 35.0 # above this suggests overvaluation
|
|
|
|
MAX_CONCURRENT_HOLDINGS = 10
|
|
|
|
|
|
def _score_target_price(
|
|
current: float | None, target: float | None
|
|
) -> tuple[int, str | None]:
|
|
"""Score based on analyst target price vs current price.
|
|
Returns (score, reason). Score: +1 buy, 0 hold, -1 sell.
|
|
"""
|
|
if current is None or target is None or current <= 0:
|
|
return 0, None
|
|
upside = (target - current) / current
|
|
if upside > UPSIDE_BUY:
|
|
return 1, f"Analyst target {target:.2f} implies {upside:.0%} upside"
|
|
if upside < UPSIDE_SELL:
|
|
return -1, f"Analyst target {target:.2f} implies {upside:.0%} (downside)"
|
|
return 0, f"Near analyst target {target:.2f} ({upside:+.0%})"
|
|
|
|
|
|
def _score_pe(pe: float | None) -> tuple[int, str | None]:
|
|
"""Score PE ratio. Using simple absolute thresholds as proxy for industry comparison."""
|
|
if pe is None:
|
|
return 0, None
|
|
if pe < 0:
|
|
return -1, f"Negative PE ({pe:.1f}) indicates losses"
|
|
if pe < PE_LOW:
|
|
return 1, f"Low PE ({pe:.1f}) suggests undervaluation"
|
|
if pe > PE_HIGH:
|
|
return -1, f"High PE ({pe:.1f}) suggests overvaluation"
|
|
return 0, f"PE ({pe:.1f}) within normal range"
|
|
|
|
|
|
def _score_revenue_growth(growth: float | None) -> tuple[int, str | None]:
|
|
if growth is None:
|
|
return 0, None
|
|
if growth > REVENUE_GROWTH_BUY:
|
|
return 1, f"Revenue growth {growth:.0%} is strong"
|
|
if growth < REVENUE_GROWTH_SELL:
|
|
return -1, f"Revenue declining ({growth:.0%})"
|
|
return 0, f"Revenue growth {growth:.0%} is moderate"
|
|
|
|
|
|
def _score_pnl(pnl_percent: float) -> tuple[int, str | None]:
|
|
"""Score based on current P&L vs cost basis."""
|
|
if pnl_percent < PNL_LOSS_BUY:
|
|
return 1, f"Down {pnl_percent:.0%} — consider averaging down"
|
|
if pnl_percent > PNL_PROFIT_SELL:
|
|
return -1, f"Up {pnl_percent:.0%} — consider taking profit"
|
|
return 0, f"P&L {pnl_percent:+.0%} within hold range"
|
|
|
|
|
|
def compute_analysis(
|
|
current_price: float | None,
|
|
buy_in_price: float,
|
|
target_price: float | None,
|
|
metrics: dict[str, Any],
|
|
) -> AnalysisResult:
|
|
"""Rule engine: compute BUY_MORE / HOLD / SELL with reasons."""
|
|
scores: list[int] = []
|
|
reasons: list[str] = []
|
|
|
|
# P&L score
|
|
if current_price is not None and current_price > 0:
|
|
pnl_pct = (current_price - buy_in_price) / buy_in_price
|
|
s, r = _score_pnl(pnl_pct)
|
|
scores.append(s)
|
|
if r:
|
|
reasons.append(r)
|
|
|
|
# Target price score
|
|
s, r = _score_target_price(current_price, target_price)
|
|
scores.append(s)
|
|
if r:
|
|
reasons.append(r)
|
|
|
|
# PE score
|
|
s, r = _score_pe(metrics.get("pe_ratio"))
|
|
scores.append(s)
|
|
if r:
|
|
reasons.append(r)
|
|
|
|
# Revenue growth score
|
|
s, r = _score_revenue_growth(metrics.get("revenue_growth"))
|
|
scores.append(s)
|
|
if r:
|
|
reasons.append(r)
|
|
|
|
# Aggregate
|
|
total = sum(scores)
|
|
active_signals = len([s for s in scores if s != 0])
|
|
|
|
if total >= 2:
|
|
action = ActionEnum.BUY_MORE
|
|
elif total <= -2:
|
|
action = ActionEnum.SELL
|
|
else:
|
|
action = ActionEnum.HOLD
|
|
|
|
if active_signals >= 3 and abs(total) >= 2:
|
|
confidence = ConfidenceEnum.HIGH
|
|
elif active_signals >= 2 and abs(total) >= 1:
|
|
confidence = ConfidenceEnum.MEDIUM
|
|
else:
|
|
confidence = ConfidenceEnum.LOW
|
|
|
|
if not reasons:
|
|
reasons.append("Insufficient data for detailed analysis")
|
|
|
|
return AnalysisResult(action=action, confidence=confidence, reasons=reasons)
|
|
|
|
|
|
async def analyze_holding(holding: Holding) -> HoldingAnalysis:
|
|
"""Analyze a single holding: fetch data and run rule engine."""
|
|
quote_data, metrics_data, target_price = await asyncio.gather(
|
|
openbb_service.get_quote(holding.symbol),
|
|
openbb_service.get_metrics(holding.symbol),
|
|
openbb_service.get_price_target(holding.symbol),
|
|
)
|
|
|
|
current_price = quote_data.get("last_price") or quote_data.get("close")
|
|
pnl = None
|
|
pnl_percent = None
|
|
if current_price is not None:
|
|
pnl = (current_price - holding.buy_in_price) * holding.shares
|
|
pnl_percent = (current_price - holding.buy_in_price) / holding.buy_in_price
|
|
|
|
metrics = metrics_from_dict(holding.symbol, metrics_data)
|
|
|
|
analysis = compute_analysis(
|
|
current_price=current_price,
|
|
buy_in_price=holding.buy_in_price,
|
|
target_price=target_price,
|
|
metrics=metrics_data,
|
|
)
|
|
|
|
return HoldingAnalysis(
|
|
symbol=holding.symbol,
|
|
current_price=current_price,
|
|
buy_in_price=holding.buy_in_price,
|
|
shares=holding.shares,
|
|
pnl=pnl,
|
|
pnl_percent=pnl_percent,
|
|
metrics=metrics,
|
|
target_price=target_price,
|
|
analysis=analysis,
|
|
)
|
|
|
|
|
|
def _make_fallback_holding(holding: Holding) -> HoldingAnalysis:
|
|
return HoldingAnalysis(
|
|
symbol=holding.symbol,
|
|
current_price=None,
|
|
buy_in_price=holding.buy_in_price,
|
|
shares=holding.shares,
|
|
analysis=AnalysisResult(
|
|
action=ActionEnum.HOLD,
|
|
confidence=ConfidenceEnum.LOW,
|
|
reasons=["Data unavailable. Please try again later."],
|
|
),
|
|
)
|
|
|
|
|
|
async def analyze_portfolio(holdings: list[Holding]) -> PortfolioResponse:
|
|
"""Analyze entire portfolio with bounded concurrency."""
|
|
sem = asyncio.Semaphore(MAX_CONCURRENT_HOLDINGS)
|
|
|
|
async def bounded_analyze(h: Holding) -> HoldingAnalysis:
|
|
async with sem:
|
|
return await analyze_holding(h)
|
|
|
|
results = await asyncio.gather(
|
|
*(bounded_analyze(h) for h in holdings),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
analyzed: list[HoldingAnalysis] = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, BaseException):
|
|
logger.error(
|
|
"Failed to analyze %s: %s",
|
|
holdings[i].symbol,
|
|
result,
|
|
exc_info=result,
|
|
)
|
|
analyzed.append(_make_fallback_holding(holdings[i]))
|
|
else:
|
|
analyzed.append(result)
|
|
|
|
return PortfolioResponse(
|
|
holdings=analyzed,
|
|
analyzed_at=datetime.now(timezone.utc),
|
|
)
|