feat: OpenBB Investment Analysis API

REST API wrapping OpenBB SDK for stock data, sentiment analysis,
technical indicators, macro data, and rule-based portfolio analysis.

- Stock data via yfinance (quote, profile, metrics, financials, historical, news)
- News sentiment via Alpha Vantage (per-article, per-ticker scores)
- Analyst data via Finnhub (recommendations, insider trades, upgrades)
- Macro data via FRED (Fed rate, CPI, GDP, unemployment, treasury yields)
- Technical indicators via openbb-technical (RSI, MACD, SMA, EMA, Bollinger)
- Rule-based portfolio analysis engine (BUY_MORE/HOLD/SELL)
- Stock discovery (gainers, losers, active, undervalued, growth)
- 102 tests, all passing
This commit is contained in:
Yaojia Wang
2026-03-09 00:20:10 +01:00
commit ad45cb429c
30 changed files with 3107 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
.env
__pycache__/
*.pyc
.pytest_cache/
*.egg-info/
dist/
build/
.mypy_cache/
.coverage
.claude/

254
README.md Normal file
View File

@@ -0,0 +1,254 @@
# OpenBB Investment Analysis API
REST API wrapping OpenBB SDK, providing stock data query, sentiment analysis, technical indicators, macro data, and rule-based investment analysis for US and Swedish markets. Designed to be called by OpenClaw (or any AI assistant) — the API returns structured data, all LLM reasoning happens on the caller side.
## API Keys
### Required: None
The core functionality uses **yfinance** (free, no API key). The API works without any keys configured.
### Recommended Free Keys
| Provider | Env Variable | How to Get | What It Unlocks | Free Limit |
|----------|-------------|------------|-----------------|------------|
| **Finnhub** | `INVEST_API_FINNHUB_API_KEY` | https://finnhub.io/register | Insider trades, analyst upgrades, recommendation trends | 60 calls/min |
| **FRED** | `INVEST_API_FRED_API_KEY` | https://fred.stlouisfed.org/docs/api/api_key.html | Macro data: Fed rate, CPI, GDP, unemployment, treasury yields | 120 calls/min |
| **Alpha Vantage** | `INVEST_API_ALPHAVANTAGE_API_KEY` | https://www.alphavantage.co/support/#api-key | News sentiment scores (bullish/bearish per article per ticker) | 25 calls/day |
### Optional Paid Keys (for higher quality data)
| Provider | Env Variable | What It Adds |
|----------|-------------|--------------|
| **FMP** | `OBB_FMP_API_KEY` | More granular financials, earnings transcripts (250 calls/day free) |
| **Intrinio** | `OBB_INTRINIO_API_KEY` | Institutional-grade fundamentals |
| **Tiingo** | `OBB_TIINGO_TOKEN` | Reliable historical price data |
| **Benzinga** | `OBB_BENZINGA_API_KEY` | Real-time news, analyst ratings |
### Configuration
Set environment variables before starting, or add to a `.env` file:
```bash
export INVEST_API_FINNHUB_API_KEY=your_finnhub_key
export INVEST_API_FRED_API_KEY=your_fred_key
export INVEST_API_ALPHAVANTAGE_API_KEY=your_alphavantage_key
```
## Quick Start
### 1. Create conda environment
```bash
conda env create -f environment.yml
conda activate openbb-invest-api
```
### 2. Start the server
```bash
python main.py
```
Server starts at `http://localhost:8000`. Visit `http://localhost:8000/docs` for Swagger UI.
### 3. Test it
```bash
# Health check
curl http://localhost:8000/health
# US stock quote
curl http://localhost:8000/api/v1/stock/AAPL/quote
# Swedish stock quote
curl http://localhost:8000/api/v1/stock/VOLV-B.ST/quote
# Sentiment analysis (requires Finnhub + Alpha Vantage keys)
curl http://localhost:8000/api/v1/stock/AAPL/sentiment
# News sentiment with per-article scores (requires Alpha Vantage key)
curl http://localhost:8000/api/v1/stock/AAPL/news-sentiment
# Technical indicators
curl http://localhost:8000/api/v1/stock/AAPL/technical
# Macro overview (requires FRED key)
curl http://localhost:8000/api/v1/macro/overview
# Portfolio analysis
curl -X POST http://localhost:8000/api/v1/portfolio/analyze \
-H "Content-Type: application/json" \
-d '{"holdings":[{"symbol":"AAPL","shares":100,"buy_in_price":150},{"symbol":"VOLV-B.ST","shares":50,"buy_in_price":250}]}'
```
## API Endpoints
### Health
| Method | Path | Description |
|--------|------|-------------|
| GET | `/health` | Health check |
### Stock Data (yfinance, no key needed)
| Method | Path | Description |
|--------|------|-------------|
| GET | `/api/v1/stock/{symbol}/quote` | Current price and volume |
| GET | `/api/v1/stock/{symbol}/profile` | Company overview (sector, industry, description) |
| GET | `/api/v1/stock/{symbol}/metrics` | Key ratios (PE, PB, ROE, EPS, etc.) |
| GET | `/api/v1/stock/{symbol}/financials` | Income statement + balance sheet + cash flow |
| GET | `/api/v1/stock/{symbol}/historical?days=365` | Historical OHLCV data |
| GET | `/api/v1/stock/{symbol}/news` | Recent company news |
| GET | `/api/v1/stock/{symbol}/summary` | Aggregated: quote + profile + metrics + financials |
### Sentiment & Analyst Data (Finnhub + Alpha Vantage, free keys)
| Method | Path | Description |
|--------|------|-------------|
| GET | `/api/v1/stock/{symbol}/sentiment` | Aggregated: news sentiment + recommendations + upgrades |
| GET | `/api/v1/stock/{symbol}/news-sentiment?limit=30` | News articles with per-ticker sentiment scores (Alpha Vantage) |
| GET | `/api/v1/stock/{symbol}/insider-trades` | Insider transactions (CEO/CFO buys and sells) |
| GET | `/api/v1/stock/{symbol}/recommendations` | Monthly analyst buy/hold/sell counts |
| GET | `/api/v1/stock/{symbol}/upgrades` | Recent analyst upgrades and downgrades |
### Technical Analysis (local computation, no key needed)
| Method | Path | Description |
|--------|------|-------------|
| GET | `/api/v1/stock/{symbol}/technical` | RSI, MACD, SMA, EMA, Bollinger Bands + signal interpretation |
### Macro Economics (FRED, free key)
| Method | Path | Description |
|--------|------|-------------|
| GET | `/api/v1/macro/overview` | Key indicators: Fed rate, treasury yields, CPI, unemployment, GDP, VIX |
| GET | `/api/v1/macro/series/{series_id}?limit=30` | Any FRED time series by ID |
### Portfolio Analysis (no key needed)
| Method | Path | Description |
|--------|------|-------------|
| POST | `/api/v1/portfolio/analyze` | Rule-based analysis of holdings (max 50) |
Request body:
```json
{
"holdings": [
{"symbol": "AAPL", "shares": 100, "buy_in_price": 150.0},
{"symbol": "VOLV-B.ST", "shares": 50, "buy_in_price": 250.0}
]
}
```
Response includes per-holding: current price, P&L, key metrics, analyst target price, and a rule-engine recommendation (BUY_MORE / HOLD / SELL) with confidence level and reasons.
### Stock Discovery (no key needed)
| Method | Path | Description |
|--------|------|-------------|
| GET | `/api/v1/discover/gainers` | Top gainers |
| GET | `/api/v1/discover/losers` | Top losers |
| GET | `/api/v1/discover/active` | Most active |
| GET | `/api/v1/discover/undervalued` | Undervalued large caps |
| GET | `/api/v1/discover/growth` | Growth tech stocks |
## Rule Engine
The portfolio analysis endpoint uses a rule-based engine (no LLM) that scores each holding on four signals:
| Signal | BUY_MORE (+1) | HOLD (0) | SELL (-1) |
|--------|---------------|----------|-----------|
| Price vs analyst target | >15% upside | -10% to +15% | >10% downside |
| PE ratio | < 15 | 15 - 35 | > 35 or negative |
| Revenue growth | > 10% YoY | 0 - 10% | Negative |
| P&L vs cost basis | Loss > 20% | -20% to +50% | Profit > 50% |
Scores are summed. Total >= 2 = BUY_MORE, <= -2 = SELL, otherwise HOLD. Confidence is HIGH/MEDIUM/LOW based on how many signals agree.
## Configuration
All settings are configurable via environment variables with the `INVEST_API_` prefix:
| Variable | Default | Description |
|----------|---------|-------------|
| `INVEST_API_HOST` | `0.0.0.0` | Server bind address |
| `INVEST_API_PORT` | `8000` | Server port |
| `INVEST_API_CORS_ORIGINS` | `["http://localhost:3000"]` | Allowed CORS origins (JSON array) |
| `INVEST_API_LOG_LEVEL` | `info` | Logging level |
| `INVEST_API_DEBUG` | `false` | Enable debug mode (auto-reload) |
| `INVEST_API_FINNHUB_API_KEY` | _(empty)_ | Finnhub API key for analyst data |
| `INVEST_API_FRED_API_KEY` | _(empty)_ | FRED API key for macro data |
| `INVEST_API_ALPHAVANTAGE_API_KEY` | _(empty)_ | Alpha Vantage API key for news sentiment |
## Project Structure
```
openbb-invest-api/
├── main.py # FastAPI app entry point
├── config.py # Settings (env-based)
├── models.py # Pydantic request/response models
├── mappers.py # Dict-to-model mapping functions
├── openbb_service.py # OpenBB SDK wrapper (async)
├── finnhub_service.py # Finnhub REST client (insider, analyst data)
├── alphavantage_service.py # Alpha Vantage REST client (news sentiment)
├── macro_service.py # FRED macro data via OpenBB
├── technical_service.py # Technical indicators via openbb-technical
├── analysis_service.py # Rule engine for portfolio analysis
├── routes.py # Core stock data + portfolio + discovery routes
├── routes_sentiment.py # Sentiment & analyst routes (Finnhub + Alpha Vantage)
├── routes_macro.py # Macro economics routes (FRED)
├── routes_technical.py # Technical analysis routes
├── environment.yml # Conda environment
├── pyproject.toml # Project metadata
└── tests/ # 102 tests
├── test_models.py
├── test_mappers.py
├── test_openbb_service.py
├── test_finnhub_service.py
├── test_analysis_service.py
├── test_routes.py
├── test_routes_sentiment.py
├── test_alphavantage_service.py
├── test_routes_macro.py
└── test_routes_technical.py
```
## Running Tests
```bash
conda activate openbb-invest-api
python -m pytest tests/ -v
```
## Swedish Stocks
Swedish stocks are supported via the `.ST` suffix (Stockholm exchange):
- `VOLV-B.ST` (Volvo)
- `ERIC-B.ST` (Ericsson)
- `HM-B.ST` (H&M)
- `SEB-A.ST` (SEB)
- `SAND.ST` (Sandvik)
## Integration with OpenClaw
This API is designed to be called by OpenClaw as an MCP tool or HTTP data source. OpenClaw sends requests to this API to fetch structured stock data and rule-based analysis, then uses its LLM to generate natural language investment advice.
Example OpenClaw workflow:
1. User asks: "Should I buy more AAPL?"
2. OpenClaw calls `GET /api/v1/stock/AAPL/summary` for fundamental data
3. OpenClaw calls `GET /api/v1/stock/AAPL/sentiment` for news/analyst sentiment
4. OpenClaw calls `GET /api/v1/stock/AAPL/technical` for technical signals
5. OpenClaw calls `GET /api/v1/macro/overview` for market context
6. OpenClaw calls `POST /api/v1/portfolio/analyze` with user's holdings
7. OpenClaw's LLM synthesizes all structured data into a personalized recommendation
## Data Sources
| Source | Cost | Key Required | Data Provided |
|--------|------|-------------|---------------|
| **yfinance** | Free | No | Quotes, fundamentals, financials, historical prices, news, discovery |
| **Finnhub** | Free | Yes (free registration) | Insider trades, analyst recommendations, upgrades/downgrades |
| **Alpha Vantage** | Free | Yes (free registration) | News sentiment scores (bullish/bearish per ticker per article), 25 req/day |
| **FRED** | Free | Yes (free registration) | Fed rate, treasury yields, CPI, unemployment, GDP, VIX, 800K+ economic series |
| **openbb-technical** | Free | No (local computation) | RSI, MACD, SMA, EMA, Bollinger Bands |

158
alphavantage_service.py Normal file
View File

@@ -0,0 +1,158 @@
"""Alpha Vantage API client for news sentiment analysis."""
import logging
from typing import Any
import httpx
from config import settings
logger = logging.getLogger(__name__)
BASE_URL = "https://www.alphavantage.co/query"
TIMEOUT = 15.0
def _is_configured() -> bool:
return bool(settings.alphavantage_api_key)
async def get_news_sentiment(
symbol: str, limit: int = 50
) -> dict[str, Any]:
"""Get news articles with per-ticker sentiment scores.
Returns articles with sentiment_score (-1 to 1),
sentiment_label (Bearish/Bullish/Neutral), and relevance_score.
Free tier: 25 requests/day.
"""
if not _is_configured():
return {
"configured": False,
"message": "Set INVEST_API_ALPHAVANTAGE_API_KEY to enable news sentiment",
}
params = {
"function": "NEWS_SENTIMENT",
"tickers": symbol,
"limit": limit,
"apikey": settings.alphavantage_api_key,
}
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
resp = await client.get(BASE_URL, params=params)
resp.raise_for_status()
data = resp.json()
if "Error Message" in data or "Note" in data:
msg = data.get("Error Message") or data.get("Note", "")
logger.warning("Alpha Vantage error: %s", msg)
return {"configured": True, "error": msg, "articles": []}
feed = data.get("feed", [])
articles = [_parse_article(article, symbol) for article in feed]
overall = _compute_overall_sentiment(articles)
return {
"configured": True,
"symbol": symbol,
"article_count": len(articles),
"overall_sentiment": overall,
"articles": articles,
}
def _parse_article(article: dict[str, Any], symbol: str) -> dict[str, Any]:
"""Extract relevant fields from a single news article."""
ticker_sentiment = _find_ticker_sentiment(
article.get("ticker_sentiment", []), symbol
)
return {
"title": article.get("title"),
"url": article.get("url"),
"source": article.get("source"),
"published": article.get("time_published"),
"summary": article.get("summary"),
"overall_sentiment_score": _safe_float(
article.get("overall_sentiment_score")
),
"overall_sentiment_label": article.get("overall_sentiment_label"),
"ticker_sentiment_score": ticker_sentiment.get("score"),
"ticker_sentiment_label": ticker_sentiment.get("label"),
"ticker_relevance": ticker_sentiment.get("relevance"),
"topics": [
t.get("topic") for t in article.get("topics", [])
],
}
def _find_ticker_sentiment(
sentiments: list[dict[str, Any]], symbol: str
) -> dict[str, Any]:
"""Find the sentiment entry matching the requested ticker."""
upper = symbol.upper()
for s in sentiments:
ticker = s.get("ticker", "")
if ticker.upper() == upper:
return {
"score": _safe_float(s.get("ticker_sentiment_score")),
"label": s.get("ticker_sentiment_label"),
"relevance": _safe_float(s.get("relevance_score")),
}
return {"score": None, "label": None, "relevance": None}
def _safe_float(val: Any) -> float | None:
"""Convert value to float, returning None on failure."""
if val is None:
return None
try:
return float(val)
except (ValueError, TypeError):
return None
def _compute_overall_sentiment(
articles: list[dict[str, Any]],
) -> dict[str, Any]:
"""Compute aggregate sentiment stats from parsed articles."""
scores = [
a["ticker_sentiment_score"]
for a in articles
if a["ticker_sentiment_score"] is not None
]
if not scores:
return {
"avg_score": None,
"label": "Unknown",
"bullish_count": 0,
"bearish_count": 0,
"neutral_count": 0,
"total_scored": 0,
}
avg = sum(scores) / len(scores)
bullish = sum(1 for s in scores if s >= 0.15)
bearish = sum(1 for s in scores if s <= -0.15)
neutral = len(scores) - bullish - bearish
if avg >= 0.35:
label = "Bullish"
elif avg >= 0.15:
label = "Somewhat-Bullish"
elif avg >= -0.15:
label = "Neutral"
elif avg >= -0.35:
label = "Somewhat-Bearish"
else:
label = "Bearish"
return {
"avg_score": round(avg, 4),
"label": label,
"bullish_count": bullish,
"bearish_count": bearish,
"neutral_count": neutral,
"total_scored": len(scores),
}

224
analysis_service.py Normal file
View File

@@ -0,0 +1,224 @@
import asyncio
import logging
from datetime import datetime, timezone
from typing import Any
from mappers import metrics_from_dict
from models import (
ActionEnum,
AnalysisResult,
ConfidenceEnum,
Holding,
HoldingAnalysis,
PortfolioResponse,
)
import openbb_service
logger = logging.getLogger(__name__)
# --- Score thresholds ---
UPSIDE_BUY = 0.15 # >15% upside from target price -> BUY
UPSIDE_SELL = -0.10 # >10% downside from target price -> SELL
PNL_LOSS_BUY = -0.20 # loss >20% -> average down
PNL_PROFIT_SELL = 0.50 # profit >50% -> take profit
REVENUE_GROWTH_BUY = 0.10 # >10% YoY
REVENUE_GROWTH_SELL = 0.0 # negative growth
PE_LOW = 15.0 # below this suggests undervaluation
PE_HIGH = 35.0 # above this suggests overvaluation
MAX_CONCURRENT_HOLDINGS = 10
def _score_target_price(
current: float | None, target: float | None
) -> tuple[int, str | None]:
"""Score based on analyst target price vs current price.
Returns (score, reason). Score: +1 buy, 0 hold, -1 sell.
"""
if current is None or target is None or current <= 0:
return 0, None
upside = (target - current) / current
if upside > UPSIDE_BUY:
return 1, f"Analyst target {target:.2f} implies {upside:.0%} upside"
if upside < UPSIDE_SELL:
return -1, f"Analyst target {target:.2f} implies {upside:.0%} (downside)"
return 0, f"Near analyst target {target:.2f} ({upside:+.0%})"
def _score_pe(pe: float | None) -> tuple[int, str | None]:
"""Score PE ratio. Using simple absolute thresholds as proxy for industry comparison."""
if pe is None:
return 0, None
if pe < 0:
return -1, f"Negative PE ({pe:.1f}) indicates losses"
if pe < PE_LOW:
return 1, f"Low PE ({pe:.1f}) suggests undervaluation"
if pe > PE_HIGH:
return -1, f"High PE ({pe:.1f}) suggests overvaluation"
return 0, f"PE ({pe:.1f}) within normal range"
def _score_revenue_growth(growth: float | None) -> tuple[int, str | None]:
if growth is None:
return 0, None
if growth > REVENUE_GROWTH_BUY:
return 1, f"Revenue growth {growth:.0%} is strong"
if growth < REVENUE_GROWTH_SELL:
return -1, f"Revenue declining ({growth:.0%})"
return 0, f"Revenue growth {growth:.0%} is moderate"
def _score_pnl(pnl_percent: float) -> tuple[int, str | None]:
"""Score based on current P&L vs cost basis."""
if pnl_percent < PNL_LOSS_BUY:
return 1, f"Down {pnl_percent:.0%} — consider averaging down"
if pnl_percent > PNL_PROFIT_SELL:
return -1, f"Up {pnl_percent:.0%} — consider taking profit"
return 0, f"P&L {pnl_percent:+.0%} within hold range"
def compute_analysis(
current_price: float | None,
buy_in_price: float,
target_price: float | None,
metrics: dict[str, Any],
) -> AnalysisResult:
"""Rule engine: compute BUY_MORE / HOLD / SELL with reasons."""
scores: list[int] = []
reasons: list[str] = []
# P&L score
if current_price is not None and current_price > 0:
pnl_pct = (current_price - buy_in_price) / buy_in_price
s, r = _score_pnl(pnl_pct)
scores.append(s)
if r:
reasons.append(r)
# Target price score
s, r = _score_target_price(current_price, target_price)
scores.append(s)
if r:
reasons.append(r)
# PE score
s, r = _score_pe(metrics.get("pe_ratio"))
scores.append(s)
if r:
reasons.append(r)
# Revenue growth score
s, r = _score_revenue_growth(metrics.get("revenue_growth"))
scores.append(s)
if r:
reasons.append(r)
# Aggregate
total = sum(scores)
active_signals = len([s for s in scores if s != 0])
if total >= 2:
action = ActionEnum.BUY_MORE
elif total <= -2:
action = ActionEnum.SELL
else:
action = ActionEnum.HOLD
if active_signals >= 3 and abs(total) >= 2:
confidence = ConfidenceEnum.HIGH
elif active_signals >= 2 and abs(total) >= 1:
confidence = ConfidenceEnum.MEDIUM
else:
confidence = ConfidenceEnum.LOW
if not reasons:
reasons.append("Insufficient data for detailed analysis")
return AnalysisResult(action=action, confidence=confidence, reasons=reasons)
async def analyze_holding(holding: Holding) -> HoldingAnalysis:
"""Analyze a single holding: fetch data and run rule engine."""
quote_data, metrics_data, target_price = await asyncio.gather(
openbb_service.get_quote(holding.symbol),
openbb_service.get_metrics(holding.symbol),
openbb_service.get_price_target(holding.symbol),
)
current_price = quote_data.get("last_price") or quote_data.get("close")
pnl = None
pnl_percent = None
if current_price is not None:
pnl = (current_price - holding.buy_in_price) * holding.shares
pnl_percent = (current_price - holding.buy_in_price) / holding.buy_in_price
metrics = metrics_from_dict(holding.symbol, metrics_data)
analysis = compute_analysis(
current_price=current_price,
buy_in_price=holding.buy_in_price,
target_price=target_price,
metrics=metrics_data,
)
return HoldingAnalysis(
symbol=holding.symbol,
current_price=current_price,
buy_in_price=holding.buy_in_price,
shares=holding.shares,
pnl=pnl,
pnl_percent=pnl_percent,
metrics=metrics,
target_price=target_price,
analysis=analysis,
)
def _make_fallback_holding(holding: Holding) -> HoldingAnalysis:
return HoldingAnalysis(
symbol=holding.symbol,
current_price=None,
buy_in_price=holding.buy_in_price,
shares=holding.shares,
analysis=AnalysisResult(
action=ActionEnum.HOLD,
confidence=ConfidenceEnum.LOW,
reasons=["Data unavailable. Please try again later."],
),
)
async def analyze_portfolio(holdings: list[Holding]) -> PortfolioResponse:
"""Analyze entire portfolio with bounded concurrency."""
sem = asyncio.Semaphore(MAX_CONCURRENT_HOLDINGS)
async def bounded_analyze(h: Holding) -> HoldingAnalysis:
async with sem:
return await analyze_holding(h)
results = await asyncio.gather(
*(bounded_analyze(h) for h in holdings),
return_exceptions=True,
)
analyzed: list[HoldingAnalysis] = []
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(
"Failed to analyze %s: %s",
holdings[i].symbol,
result,
exc_info=result,
)
analyzed.append(_make_fallback_holding(holdings[i]))
else:
analyzed.append(result)
return PortfolioResponse(
holdings=analyzed,
analyzed_at=datetime.now(timezone.utc),
)

20
config.py Normal file
View File

@@ -0,0 +1,20 @@
from pydantic import Field
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
host: str = "0.0.0.0"
port: int = 8000
cors_origins: list[str] = Field(default_factory=lambda: ["http://localhost:3000"])
log_level: str = "info"
debug: bool = False
# Optional API keys (free tiers)
finnhub_api_key: str = ""
fred_api_key: str = ""
alphavantage_api_key: str = ""
model_config = {"env_prefix": "INVEST_API_", "env_file": ".env"}
settings = Settings()

19
environment.yml Normal file
View File

@@ -0,0 +1,19 @@
name: openbb-invest-api
channels:
- conda-forge
- defaults
dependencies:
- python=3.12
- pip
- setuptools
- pip:
- fastapi
- uvicorn[standard]
- openbb
- openbb-yfinance
- openbb-technical
- pydantic-settings
- httpx
- pytest
- pytest-asyncio
- pytest-cov

175
finnhub_service.py Normal file
View File

@@ -0,0 +1,175 @@
"""Finnhub API client for sentiment, insider trades, and analyst data."""
import logging
from datetime import datetime, timedelta
from typing import Any
import httpx
from config import settings
logger = logging.getLogger(__name__)
BASE_URL = "https://finnhub.io/api/v1"
TIMEOUT = 15.0
def _client() -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=BASE_URL,
timeout=TIMEOUT,
params={"token": settings.finnhub_api_key},
)
def _is_configured() -> bool:
return bool(settings.finnhub_api_key)
async def get_news_sentiment(symbol: str) -> dict[str, Any]:
"""Get aggregated news sentiment scores for a symbol.
Note: This endpoint requires a Finnhub premium plan.
Returns empty dict on 403 (free tier).
"""
if not _is_configured():
return {}
async with _client() as client:
resp = await client.get("/news-sentiment", params={"symbol": symbol})
if resp.status_code == 403:
logger.debug("news-sentiment endpoint requires premium plan, skipping")
return {}
resp.raise_for_status()
return resp.json()
async def get_company_news(symbol: str, days: int = 7) -> list[dict[str, Any]]:
"""Get recent company news articles."""
if not _is_configured():
return []
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/company-news",
params={"symbol": symbol, "from": start, "to": end},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_insider_transactions(symbol: str) -> list[dict[str, Any]]:
"""Get insider transactions for a symbol."""
if not _is_configured():
return []
async with _client() as client:
resp = await client.get(
"/stock/insider-transactions",
params={"symbol": symbol},
)
resp.raise_for_status()
data = resp.json()
return data.get("data", []) if isinstance(data, dict) else []
async def get_recommendation_trends(symbol: str) -> list[dict[str, Any]]:
"""Get analyst recommendation trends (monthly breakdown)."""
if not _is_configured():
return []
async with _client() as client:
resp = await client.get(
"/stock/recommendation",
params={"symbol": symbol},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_upgrade_downgrade(
symbol: str, days: int = 90
) -> list[dict[str, Any]]:
"""Get recent analyst upgrades/downgrades."""
if not _is_configured():
return []
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/stock/upgrade-downgrade",
params={"symbol": symbol, "from": start},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_sentiment_summary(symbol: str) -> dict[str, Any]:
"""Aggregate all sentiment data for a symbol into one response."""
if not _is_configured():
return {"configured": False, "message": "Set INVEST_API_FINNHUB_API_KEY to enable sentiment data"}
import asyncio
news_sentiment, company_news, recommendations, upgrades = await asyncio.gather(
get_news_sentiment(symbol),
get_company_news(symbol, days=7),
get_recommendation_trends(symbol),
get_upgrade_downgrade(symbol, days=90),
return_exceptions=True,
)
def _safe_result(result: Any, default: Any) -> Any:
return default if isinstance(result, BaseException) else result
news_sentiment = _safe_result(news_sentiment, {})
company_news = _safe_result(company_news, [])
recommendations = _safe_result(recommendations, [])
upgrades = _safe_result(upgrades, [])
# Extract key sentiment metrics
sentiment_data = news_sentiment.get("sentiment", {}) if isinstance(news_sentiment, dict) else {}
buzz_data = news_sentiment.get("buzz", {}) if isinstance(news_sentiment, dict) else {}
return {
"symbol": symbol,
"news_sentiment": {
"bullish_percent": sentiment_data.get("bullishPercent"),
"bearish_percent": sentiment_data.get("bearishPercent"),
"news_score": news_sentiment.get("companyNewsScore") if isinstance(news_sentiment, dict) else None,
"sector_avg_score": news_sentiment.get("sectorAverageNewsScore") if isinstance(news_sentiment, dict) else None,
"articles_last_week": buzz_data.get("articlesInLastWeek"),
"weekly_average": buzz_data.get("weeklyAverage"),
"buzz": buzz_data.get("buzz"),
},
"recent_news": [
{
"headline": n.get("headline"),
"source": n.get("source"),
"url": n.get("url"),
"datetime": n.get("datetime"),
"summary": n.get("summary"),
}
for n in (company_news[:10] if isinstance(company_news, list) else [])
],
"analyst_recommendations": [
{
"period": r.get("period"),
"strong_buy": r.get("strongBuy"),
"buy": r.get("buy"),
"hold": r.get("hold"),
"sell": r.get("sell"),
"strong_sell": r.get("strongSell"),
}
for r in (recommendations[:6] if isinstance(recommendations, list) else [])
],
"recent_upgrades_downgrades": [
{
"company": u.get("company"),
"action": u.get("action"),
"from_grade": u.get("fromGrade"),
"to_grade": u.get("toGrade"),
"date": u.get("gradeTime"),
}
for u in (upgrades[:10] if isinstance(upgrades, list) else [])
],
}

80
macro_service.py Normal file
View File

@@ -0,0 +1,80 @@
"""Macro economic data via OpenBB FRED provider."""
import asyncio
import logging
from typing import Any
from openbb import obb
logger = logging.getLogger(__name__)
PROVIDER = "fred"
# Key FRED series IDs
SERIES = {
"fed_funds_rate": "FEDFUNDS",
"us_10y_treasury": "DGS10",
"us_2y_treasury": "DGS2",
"cpi_yoy": "CPIAUCSL",
"unemployment_rate": "UNRATE",
"gdp_growth": "A191RL1Q225SBEA",
"sp500": "SP500",
"vix": "VIXCLS",
}
def _to_dicts(result: Any) -> list[dict[str, Any]]:
if result is None or result.results is None:
return []
if isinstance(result.results, list):
return [
item.model_dump() if hasattr(item, "model_dump") else vars(item)
for item in result.results
]
if hasattr(result.results, "model_dump"):
return [result.results.model_dump()]
return [vars(result.results)]
async def get_series(series_id: str, limit: int = 10) -> list[dict[str, Any]]:
"""Get a FRED time series by ID."""
try:
result = await asyncio.to_thread(
obb.economy.fred_series,
symbol=series_id,
limit=limit,
provider=PROVIDER,
)
items = _to_dicts(result)
for item in items:
if "date" in item and not isinstance(item["date"], str):
item = {**item, "date": str(item["date"])}
return items
except Exception:
logger.warning("Failed to fetch FRED series %s", series_id, exc_info=True)
return []
async def get_macro_overview() -> dict[str, Any]:
"""Get a summary of key macro indicators."""
tasks = {
name: get_series(series_id, limit=1)
for name, series_id in SERIES.items()
}
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
overview: dict[str, Any] = {}
for name, result in zip(tasks.keys(), results):
if isinstance(result, BaseException):
logger.warning("Failed to fetch %s: %s", name, result)
overview[name] = None
elif result and len(result) > 0:
entry = result[0]
overview[name] = {
"value": entry.get("value"),
"date": str(entry.get("date", "")),
}
else:
overview[name] = None
return overview

49
main.py Normal file
View File

@@ -0,0 +1,49 @@
import logging
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from routes import router
from routes_sentiment import router as sentiment_router
from routes_macro import router as macro_router
from routes_technical import router as technical_router
logging.basicConfig(
level=settings.log_level.upper(),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
app = FastAPI(
title="OpenBB Investment Analysis API",
version="0.1.0",
description="REST API for stock data and rule-based investment analysis, powered by OpenBB SDK.",
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=False,
allow_methods=["GET", "POST"],
allow_headers=["Content-Type", "Authorization"],
)
app.include_router(router)
app.include_router(sentiment_router)
app.include_router(macro_router)
app.include_router(technical_router)
@app.get("/health", response_model=dict[str, str])
async def health() -> dict[str, str]:
return {"status": "ok"}
if __name__ == "__main__":
uvicorn.run(
"main:app",
host=settings.host,
port=settings.port,
reload=settings.debug,
)

68
mappers.py Normal file
View File

@@ -0,0 +1,68 @@
"""Shared mapping functions from raw dicts to response models."""
from typing import Any
from models import (
DiscoverItem,
MetricsResponse,
ProfileResponse,
QuoteResponse,
)
def quote_from_dict(symbol: str, data: dict[str, Any]) -> QuoteResponse:
return QuoteResponse(
symbol=symbol,
name=data.get("name"),
price=data.get("last_price") or data.get("close"),
change=data.get("change"),
change_percent=data.get("change_percent"),
volume=data.get("volume"),
market_cap=data.get("market_cap"),
currency=data.get("currency"),
)
def profile_from_dict(symbol: str, data: dict[str, Any]) -> ProfileResponse:
return ProfileResponse(
symbol=symbol,
name=data.get("name"),
sector=data.get("sector"),
industry=data.get("industry"),
country=data.get("country"),
description=data.get("long_description") or data.get("description"),
website=data.get("website"),
employees=data.get("full_time_employees") or data.get("employees"),
)
def metrics_from_dict(symbol: str, data: dict[str, Any]) -> MetricsResponse:
return MetricsResponse(
symbol=symbol,
pe_ratio=data.get("pe_ratio"),
pb_ratio=data.get("pb_ratio"),
ps_ratio=data.get("ps_ratio"),
peg_ratio=data.get("peg_ratio"),
roe=data.get("roe") or data.get("return_on_equity"),
roa=data.get("roa") or data.get("return_on_assets"),
dividend_yield=data.get("dividend_yield"),
beta=data.get("beta"),
eps=data.get("eps_ttm") or data.get("eps"),
revenue_growth=data.get("revenue_growth"),
earnings_growth=data.get("earnings_growth"),
)
def discover_item_from_dict(data: dict[str, Any]) -> DiscoverItem:
return DiscoverItem(
symbol=data.get("symbol"),
name=data.get("name"),
price=data.get("price") or data.get("last_price"),
change_percent=data.get("change_percent") or data.get("percent_change"),
volume=data.get("volume"),
market_cap=data.get("market_cap"),
)
def discover_items_from_list(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [discover_item_from_dict(item).model_dump() for item in items]

150
models.py Normal file
View File

@@ -0,0 +1,150 @@
import re
from enum import Enum
from typing import Any
from pydantic import AwareDatetime, BaseModel, Field, field_validator
# --- Constants ---
SYMBOL_PATTERN = re.compile(r"^[A-Za-z0-9.\-]{1,20}$")
# --- Request Models ---
class Holding(BaseModel):
symbol: str = Field(..., description="Stock symbol, e.g. AAPL or VOLV-B.ST")
shares: float = Field(..., gt=0, description="Number of shares held")
buy_in_price: float = Field(..., gt=0, description="Average cost basis per share")
@field_validator("symbol")
@classmethod
def validate_symbol(cls, v: str) -> str:
if not SYMBOL_PATTERN.match(v):
raise ValueError("Invalid symbol format. Use 1-20 alphanumeric chars, dots, or hyphens.")
return v.upper()
class PortfolioRequest(BaseModel):
holdings: list[Holding] = Field(..., min_length=1, max_length=50)
# --- Response Models ---
class ActionEnum(str, Enum):
BUY_MORE = "BUY_MORE"
HOLD = "HOLD"
SELL = "SELL"
class ConfidenceEnum(str, Enum):
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
class QuoteResponse(BaseModel):
symbol: str
name: str | None = None
price: float | None = None
change: float | None = None
change_percent: float | None = None
volume: int | None = None
market_cap: float | None = None
currency: str | None = None
class ProfileResponse(BaseModel):
symbol: str
name: str | None = None
sector: str | None = None
industry: str | None = None
country: str | None = None
description: str | None = None
website: str | None = None
employees: int | None = None
class MetricsResponse(BaseModel):
symbol: str
pe_ratio: float | None = None
pb_ratio: float | None = None
ps_ratio: float | None = None
peg_ratio: float | None = None
roe: float | None = None
roa: float | None = None
dividend_yield: float | None = None
beta: float | None = None
eps: float | None = None
revenue_growth: float | None = None
earnings_growth: float | None = None
class HistoricalBar(BaseModel):
date: str
open: float | None = None
high: float | None = None
low: float | None = None
close: float | None = None
volume: int | None = None
class FinancialsResponse(BaseModel):
symbol: str
income: list[dict] = Field(default_factory=list)
balance: list[dict] = Field(default_factory=list)
cash_flow: list[dict] = Field(default_factory=list)
class NewsItem(BaseModel):
title: str | None = None
url: str | None = None
date: str | None = None
source: str | None = None
class SummaryResponse(BaseModel):
quote: QuoteResponse | None = None
profile: ProfileResponse | None = None
metrics: MetricsResponse | None = None
financials: FinancialsResponse | None = None
class AnalysisResult(BaseModel):
action: ActionEnum
confidence: ConfidenceEnum
reasons: list[str]
class HoldingAnalysis(BaseModel):
symbol: str
current_price: float | None = None
buy_in_price: float
shares: float
pnl: float | None = None
pnl_percent: float | None = None
metrics: MetricsResponse | None = None
target_price: float | None = None
analysis: AnalysisResult
class PortfolioResponse(BaseModel):
holdings: list[HoldingAnalysis]
analyzed_at: AwareDatetime
class DiscoverItem(BaseModel):
symbol: str | None = None
name: str | None = None
price: float | None = None
change_percent: float | None = None
volume: int | None = None
market_cap: float | None = None
class ApiResponse(BaseModel):
success: bool = True
data: dict[str, Any] | list[Any] | None = None
error: str | None = None

173
openbb_service.py Normal file
View File

@@ -0,0 +1,173 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Any
from openbb import obb
logger = logging.getLogger(__name__)
PROVIDER = "yfinance"
def _to_dicts(result: Any) -> list[dict[str, Any]]:
"""Convert OBBject results to list of dicts."""
if result is None or result.results is None:
return []
if isinstance(result.results, list):
return [
item.model_dump() if hasattr(item, "model_dump") else vars(item)
for item in result.results
]
if hasattr(result.results, "model_dump"):
return [result.results.model_dump()]
return [vars(result.results)]
def _first_or_empty(result: Any) -> dict[str, Any]:
"""Get first result as dict, or empty dict."""
items = _to_dicts(result)
return items[0] if items else {}
async def get_quote(symbol: str) -> dict:
result = await asyncio.to_thread(
obb.equity.price.quote, symbol, provider=PROVIDER
)
return _first_or_empty(result)
async def get_historical(symbol: str, days: int = 365) -> list[dict]:
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
result = await asyncio.to_thread(
obb.equity.price.historical,
symbol,
start_date=start,
provider=PROVIDER,
)
items = _to_dicts(result)
return [
{**item, "date": str(item["date"])}
if "date" in item and not isinstance(item["date"], str)
else item
for item in items
]
async def get_profile(symbol: str) -> dict:
result = await asyncio.to_thread(
obb.equity.profile, symbol, provider=PROVIDER
)
return _first_or_empty(result)
async def get_metrics(symbol: str) -> dict:
result = await asyncio.to_thread(
obb.equity.fundamental.metrics, symbol, provider=PROVIDER
)
return _first_or_empty(result)
async def get_income(symbol: str) -> list[dict]:
result = await asyncio.to_thread(
obb.equity.fundamental.income, symbol, provider=PROVIDER
)
return _to_dicts(result)
async def get_balance(symbol: str) -> list[dict]:
result = await asyncio.to_thread(
obb.equity.fundamental.balance, symbol, provider=PROVIDER
)
return _to_dicts(result)
async def get_cash_flow(symbol: str) -> list[dict]:
result = await asyncio.to_thread(
obb.equity.fundamental.cash, symbol, provider=PROVIDER
)
return _to_dicts(result)
async def get_financials(symbol: str) -> dict:
income, balance, cash_flow = await asyncio.gather(
get_income(symbol),
get_balance(symbol),
get_cash_flow(symbol),
)
return {
"symbol": symbol,
"income": income,
"balance": balance,
"cash_flow": cash_flow,
}
async def get_price_target(symbol: str) -> float | None:
try:
result = await asyncio.to_thread(
obb.equity.estimates.price_target, symbol, provider=PROVIDER
)
items = _to_dicts(result)
if items:
return items[0].get("adj_price_target") or items[0].get("price_target")
except Exception:
logger.warning("Failed to get price target for %s", symbol, exc_info=True)
return None
async def get_news(symbol: str) -> list[dict]:
result = await asyncio.to_thread(
obb.news.company, symbol, provider=PROVIDER
)
return _to_dicts(result)
async def get_summary(symbol: str) -> dict:
quote, profile, metrics, financials = await asyncio.gather(
get_quote(symbol),
get_profile(symbol),
get_metrics(symbol),
get_financials(symbol),
)
return {
"quote": quote,
"profile": profile,
"metrics": metrics,
"financials": financials,
}
async def get_gainers() -> list[dict]:
result = await asyncio.to_thread(
obb.equity.discovery.gainers, provider=PROVIDER
)
return _to_dicts(result)
async def get_losers() -> list[dict]:
result = await asyncio.to_thread(
obb.equity.discovery.losers, provider=PROVIDER
)
return _to_dicts(result)
async def get_active() -> list[dict]:
result = await asyncio.to_thread(
obb.equity.discovery.active, provider=PROVIDER
)
return _to_dicts(result)
async def get_undervalued() -> list[dict]:
result = await asyncio.to_thread(
obb.equity.discovery.undervalued_large_caps, provider=PROVIDER
)
return _to_dicts(result)
async def get_growth() -> list[dict]:
result = await asyncio.to_thread(
obb.equity.discovery.growth_tech, provider=PROVIDER
)
return _to_dicts(result)

24
pyproject.toml Normal file
View File

@@ -0,0 +1,24 @@
[project]
name = "openbb-invest-api"
version = "0.1.0"
description = "REST API wrapping OpenBB SDK for stock investment analysis"
requires-python = ">=3.12"
dependencies = [
"fastapi",
"uvicorn[standard]",
"openbb[yfinance]",
"pydantic-settings",
"httpx",
]
[project.optional-dependencies]
dev = [
"pytest",
"pytest-asyncio",
"pytest-cov",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
pythonpath = ["."]

208
routes.py Normal file
View File

@@ -0,0 +1,208 @@
import functools
import logging
from collections.abc import Awaitable, Callable
from typing import ParamSpec, TypeVar
from fastapi import APIRouter, HTTPException, Path, Query
from mappers import (
discover_items_from_list,
metrics_from_dict,
profile_from_dict,
quote_from_dict,
)
from models import (
SYMBOL_PATTERN,
ApiResponse,
FinancialsResponse,
HistoricalBar,
NewsItem,
PortfolioRequest,
PortfolioResponse,
SummaryResponse,
)
import openbb_service
import analysis_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1")
P = ParamSpec("P")
R = TypeVar("R")
def _validate_symbol(symbol: str) -> str:
if not SYMBOL_PATTERN.match(symbol):
raise HTTPException(status_code=400, detail="Invalid symbol format")
return symbol.upper()
def _safe(fn: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
"""Decorator to catch OpenBB errors and return 502."""
@functools.wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
return await fn(*args, **kwargs)
except HTTPException:
raise
except Exception:
logger.exception("Upstream data error")
raise HTTPException(
status_code=502,
detail="Data provider error. Check server logs.",
)
return wrapper # type: ignore[return-value]
# --- Stock Data ---
@router.get("/stock/{symbol}/quote", response_model=ApiResponse)
@_safe
async def stock_quote(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get current quote for a stock."""
symbol = _validate_symbol(symbol)
data = await openbb_service.get_quote(symbol)
return ApiResponse(data=quote_from_dict(symbol, data).model_dump())
@router.get("/stock/{symbol}/profile", response_model=ApiResponse)
@_safe
async def stock_profile(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get company profile."""
symbol = _validate_symbol(symbol)
data = await openbb_service.get_profile(symbol)
return ApiResponse(data=profile_from_dict(symbol, data).model_dump())
@router.get("/stock/{symbol}/metrics", response_model=ApiResponse)
@_safe
async def stock_metrics(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get key financial metrics (PE, PB, ROE, etc.)."""
symbol = _validate_symbol(symbol)
data = await openbb_service.get_metrics(symbol)
return ApiResponse(data=metrics_from_dict(symbol, data).model_dump())
@router.get("/stock/{symbol}/financials", response_model=ApiResponse)
@_safe
async def stock_financials(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get income statement, balance sheet, and cash flow."""
symbol = _validate_symbol(symbol)
data = await openbb_service.get_financials(symbol)
return ApiResponse(data=FinancialsResponse(**data).model_dump())
@router.get("/stock/{symbol}/historical", response_model=ApiResponse)
@_safe
async def stock_historical(
symbol: str = Path(..., min_length=1, max_length=20),
days: int = Query(default=365, ge=1, le=3650),
):
"""Get historical price data."""
symbol = _validate_symbol(symbol)
data = await openbb_service.get_historical(symbol, days=days)
bars = [
HistoricalBar(
date=str(item.get("date", "")),
open=item.get("open"),
high=item.get("high"),
low=item.get("low"),
close=item.get("close"),
volume=item.get("volume"),
).model_dump()
for item in data
]
return ApiResponse(data=bars)
@router.get("/stock/{symbol}/news", response_model=ApiResponse)
@_safe
async def stock_news(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get recent company news."""
symbol = _validate_symbol(symbol)
data = await openbb_service.get_news(symbol)
news = [
NewsItem(
title=item.get("title"),
url=item.get("url"),
date=str(item.get("date", "")),
source=item.get("source"),
).model_dump()
for item in data
]
return ApiResponse(data=news)
@router.get("/stock/{symbol}/summary", response_model=ApiResponse)
@_safe
async def stock_summary(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get aggregated stock data: quote + profile + metrics + financials."""
symbol = _validate_symbol(symbol)
data = await openbb_service.get_summary(symbol)
summary = SummaryResponse(
quote=quote_from_dict(symbol, data.get("quote", {})),
profile=profile_from_dict(symbol, data.get("profile", {})),
metrics=metrics_from_dict(symbol, data.get("metrics", {})),
financials=FinancialsResponse(
**data.get("financials", {"symbol": symbol})
),
)
return ApiResponse(data=summary.model_dump())
# --- Portfolio Analysis ---
@router.post("/portfolio/analyze", response_model=ApiResponse)
@_safe
async def portfolio_analyze(request: PortfolioRequest):
"""Analyze portfolio holdings with rule-based engine."""
result: PortfolioResponse = await analysis_service.analyze_portfolio(
request.holdings
)
return ApiResponse(data=result.model_dump())
# --- Discovery ---
@router.get("/discover/gainers", response_model=ApiResponse)
@_safe
async def discover_gainers():
"""Get top gainers (US market)."""
data = await openbb_service.get_gainers()
return ApiResponse(data=discover_items_from_list(data))
@router.get("/discover/losers", response_model=ApiResponse)
@_safe
async def discover_losers():
"""Get top losers (US market)."""
data = await openbb_service.get_losers()
return ApiResponse(data=discover_items_from_list(data))
@router.get("/discover/active", response_model=ApiResponse)
@_safe
async def discover_active():
"""Get most active stocks (US market)."""
data = await openbb_service.get_active()
return ApiResponse(data=discover_items_from_list(data))
@router.get("/discover/undervalued", response_model=ApiResponse)
@_safe
async def discover_undervalued():
"""Get undervalued large cap stocks."""
data = await openbb_service.get_undervalued()
return ApiResponse(data=discover_items_from_list(data))
@router.get("/discover/growth", response_model=ApiResponse)
@_safe
async def discover_growth():
"""Get growth tech stocks."""
data = await openbb_service.get_growth()
return ApiResponse(data=discover_items_from_list(data))

53
routes_macro.py Normal file
View File

@@ -0,0 +1,53 @@
"""Routes for macroeconomic data (FRED-powered)."""
import functools
import logging
from collections.abc import Awaitable, Callable
from typing import ParamSpec, TypeVar
from fastapi import APIRouter, HTTPException, Query
from models import ApiResponse
import macro_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1")
P = ParamSpec("P")
R = TypeVar("R")
def _safe(fn: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
@functools.wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
return await fn(*args, **kwargs)
except HTTPException:
raise
except Exception:
logger.exception("Upstream data error")
raise HTTPException(
status_code=502,
detail="Data provider error. Check server logs.",
)
return wrapper # type: ignore[return-value]
@router.get("/macro/overview", response_model=ApiResponse)
@_safe
async def macro_overview():
"""Get key macro indicators: Fed rate, treasury yields, CPI, unemployment, GDP, VIX."""
data = await macro_service.get_macro_overview()
return ApiResponse(data=data)
@router.get("/macro/series/{series_id}", response_model=ApiResponse)
@_safe
async def macro_series(
series_id: str,
limit: int = Query(default=30, ge=1, le=1000),
):
"""Get a specific FRED time series by ID."""
data = await macro_service.get_series(series_id, limit=limit)
return ApiResponse(data=data)

138
routes_sentiment.py Normal file
View File

@@ -0,0 +1,138 @@
"""Routes for sentiment, insider trades, and analyst data (Finnhub + Alpha Vantage)."""
import asyncio
import functools
import logging
from collections.abc import Awaitable, Callable
from typing import ParamSpec, TypeVar
from fastapi import APIRouter, HTTPException, Path, Query
from models import SYMBOL_PATTERN, ApiResponse
import alphavantage_service
import finnhub_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1")
P = ParamSpec("P")
R = TypeVar("R")
def _validate_symbol(symbol: str) -> str:
if not SYMBOL_PATTERN.match(symbol):
raise HTTPException(status_code=400, detail="Invalid symbol format")
return symbol.upper()
def _safe(fn: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
@functools.wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
return await fn(*args, **kwargs)
except HTTPException:
raise
except Exception:
logger.exception("Upstream data error")
raise HTTPException(
status_code=502,
detail="Data provider error. Check server logs.",
)
return wrapper # type: ignore[return-value]
# --- Sentiment & News ---
@router.get("/stock/{symbol}/sentiment", response_model=ApiResponse)
@_safe
async def stock_sentiment(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get aggregated sentiment: Alpha Vantage news sentiment + Finnhub analyst data."""
symbol = _validate_symbol(symbol)
finnhub_data, av_data = await asyncio.gather(
finnhub_service.get_sentiment_summary(symbol),
alphavantage_service.get_news_sentiment(symbol, limit=20),
return_exceptions=True,
)
if isinstance(finnhub_data, BaseException):
logger.exception("Finnhub error", exc_info=finnhub_data)
finnhub_data = {}
if isinstance(av_data, BaseException):
logger.exception("Alpha Vantage error", exc_info=av_data)
av_data = {}
data = {**finnhub_data, "alpha_vantage_sentiment": av_data}
return ApiResponse(data=data)
@router.get("/stock/{symbol}/news-sentiment", response_model=ApiResponse)
@_safe
async def stock_news_sentiment(
symbol: str = Path(..., min_length=1, max_length=20),
limit: int = Query(default=30, ge=1, le=200),
):
"""Get news articles with per-ticker sentiment scores (Alpha Vantage)."""
symbol = _validate_symbol(symbol)
data = await alphavantage_service.get_news_sentiment(symbol, limit=limit)
return ApiResponse(data=data)
@router.get("/stock/{symbol}/insider-trades", response_model=ApiResponse)
@_safe
async def stock_insider_trades(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get insider transactions (CEO/CFO buys and sells)."""
symbol = _validate_symbol(symbol)
raw = await finnhub_service.get_insider_transactions(symbol)
trades = [
{
"name": t.get("name"),
"shares": t.get("share"),
"change": t.get("change"),
"transaction_date": t.get("transactionDate"),
"transaction_code": t.get("transactionCode"),
"transaction_price": t.get("transactionPrice"),
"filing_date": t.get("filingDate"),
}
for t in raw[:20]
]
return ApiResponse(data=trades)
@router.get("/stock/{symbol}/recommendations", response_model=ApiResponse)
@_safe
async def stock_recommendations(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get analyst recommendation trends (monthly buy/hold/sell counts)."""
symbol = _validate_symbol(symbol)
raw = await finnhub_service.get_recommendation_trends(symbol)
recs = [
{
"period": r.get("period"),
"strong_buy": r.get("strongBuy"),
"buy": r.get("buy"),
"hold": r.get("hold"),
"sell": r.get("sell"),
"strong_sell": r.get("strongSell"),
}
for r in raw[:12]
]
return ApiResponse(data=recs)
@router.get("/stock/{symbol}/upgrades", response_model=ApiResponse)
@_safe
async def stock_upgrades(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get recent analyst upgrades and downgrades."""
symbol = _validate_symbol(symbol)
raw = await finnhub_service.get_upgrade_downgrade(symbol)
upgrades = [
{
"company": u.get("company"),
"action": u.get("action"),
"from_grade": u.get("fromGrade"),
"to_grade": u.get("toGrade"),
"date": u.get("gradeTime"),
}
for u in raw[:20]
]
return ApiResponse(data=upgrades)

49
routes_technical.py Normal file
View File

@@ -0,0 +1,49 @@
"""Routes for technical analysis indicators."""
import functools
import logging
from collections.abc import Awaitable, Callable
from typing import ParamSpec, TypeVar
from fastapi import APIRouter, HTTPException, Path
from models import SYMBOL_PATTERN, ApiResponse
import technical_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1")
P = ParamSpec("P")
R = TypeVar("R")
def _validate_symbol(symbol: str) -> str:
if not SYMBOL_PATTERN.match(symbol):
raise HTTPException(status_code=400, detail="Invalid symbol format")
return symbol.upper()
def _safe(fn: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
@functools.wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
return await fn(*args, **kwargs)
except HTTPException:
raise
except Exception:
logger.exception("Upstream data error")
raise HTTPException(
status_code=502,
detail="Data provider error. Check server logs.",
)
return wrapper # type: ignore[return-value]
@router.get("/stock/{symbol}/technical", response_model=ApiResponse)
@_safe
async def stock_technical(symbol: str = Path(..., min_length=1, max_length=20)):
"""Get technical indicators: RSI, MACD, SMA, EMA, Bollinger Bands + signal interpretation."""
symbol = _validate_symbol(symbol)
data = await technical_service.get_technical_indicators(symbol)
return ApiResponse(data=data)

144
technical_service.py Normal file
View File

@@ -0,0 +1,144 @@
"""Technical analysis indicators via openbb-technical (local computation)."""
import asyncio
import logging
from typing import Any
from openbb import obb
logger = logging.getLogger(__name__)
PROVIDER = "yfinance"
async def get_technical_indicators(
symbol: str, days: int = 200
) -> dict[str, Any]:
"""Compute key technical indicators for a symbol."""
from datetime import datetime, timedelta
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
# Fetch historical data first
hist = await asyncio.to_thread(
obb.equity.price.historical,
symbol,
start_date=start,
provider=PROVIDER,
)
if hist is None or hist.results is None:
return {"symbol": symbol, "error": "No historical data available"}
result: dict[str, Any] = {"symbol": symbol}
# RSI (14-period)
try:
rsi = await asyncio.to_thread(obb.technical.rsi, data=hist.results, length=14)
rsi_items = _extract_latest(rsi)
result["rsi_14"] = rsi_items.get("RSI_14")
except Exception:
logger.warning("RSI calculation failed for %s", symbol, exc_info=True)
result["rsi_14"] = None
# MACD (12, 26, 9)
try:
macd = await asyncio.to_thread(
obb.technical.macd, data=hist.results, fast=12, slow=26, signal=9
)
macd_items = _extract_latest(macd)
result["macd"] = {
"macd": macd_items.get("MACD_12_26_9"),
"signal": macd_items.get("MACDs_12_26_9"),
"histogram": macd_items.get("MACDh_12_26_9"),
}
except Exception:
logger.warning("MACD calculation failed for %s", symbol, exc_info=True)
result["macd"] = None
# SMA (20, 50, 200)
for period in [20, 50, 200]:
try:
sma = await asyncio.to_thread(
obb.technical.sma, data=hist.results, length=period
)
sma_items = _extract_latest(sma)
result[f"sma_{period}"] = sma_items.get(f"SMA_{period}")
except Exception:
logger.warning("SMA_%d failed for %s", period, symbol, exc_info=True)
result[f"sma_{period}"] = None
# EMA (12, 26)
for period in [12, 26]:
try:
ema = await asyncio.to_thread(
obb.technical.ema, data=hist.results, length=period
)
ema_items = _extract_latest(ema)
result[f"ema_{period}"] = ema_items.get(f"EMA_{period}")
except Exception:
logger.warning("EMA_%d failed for %s", period, symbol, exc_info=True)
result[f"ema_{period}"] = None
# Bollinger Bands (20, 2)
try:
bbands = await asyncio.to_thread(
obb.technical.bbands, data=hist.results, length=20, std=2
)
bb_items = _extract_latest(bbands)
result["bollinger_bands"] = {
"upper": bb_items.get("BBU_20_2.0"),
"middle": bb_items.get("BBM_20_2.0"),
"lower": bb_items.get("BBL_20_2.0"),
}
except Exception:
logger.warning("Bollinger Bands failed for %s", symbol, exc_info=True)
result["bollinger_bands"] = None
# Add interpretation
result["signals"] = _interpret_signals(result)
return result
def _extract_latest(result: Any) -> dict[str, Any]:
"""Get the last row from a technical indicator result as a dict."""
if result is None or result.results is None:
return {}
items = result.results
if isinstance(items, list) and items:
last = items[-1]
return last.model_dump() if hasattr(last, "model_dump") else vars(last)
return {}
def _interpret_signals(data: dict[str, Any]) -> list[str]:
"""Generate simple text signals from technical indicators."""
signals: list[str] = []
rsi = data.get("rsi_14")
if rsi is not None:
if rsi > 70:
signals.append(f"RSI {rsi:.1f}: Overbought (bearish signal)")
elif rsi < 30:
signals.append(f"RSI {rsi:.1f}: Oversold (bullish signal)")
else:
signals.append(f"RSI {rsi:.1f}: Neutral")
macd = data.get("macd")
if macd and macd.get("histogram") is not None:
hist = macd["histogram"]
if hist > 0:
signals.append("MACD histogram positive (bullish momentum)")
else:
signals.append("MACD histogram negative (bearish momentum)")
sma_50 = data.get("sma_50")
sma_200 = data.get("sma_200")
if sma_50 is not None and sma_200 is not None:
if sma_50 > sma_200:
signals.append("Golden cross: SMA50 above SMA200 (bullish trend)")
else:
signals.append("Death cross: SMA50 below SMA200 (bearish trend)")
return signals

0
tests/__init__.py Normal file
View File

1
tests/conftest.py Normal file
View File

@@ -0,0 +1 @@
# pythonpath configured in pyproject.toml [tool.pytest.ini_options]

View File

@@ -0,0 +1,193 @@
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from alphavantage_service import (
_compute_overall_sentiment,
_find_ticker_sentiment,
_parse_article,
_safe_float,
get_news_sentiment,
)
# --- Unit tests for helpers ---
def test_safe_float_valid():
assert _safe_float("0.35") == 0.35
assert _safe_float(1) == 1.0
assert _safe_float(0.0) == 0.0
def test_safe_float_invalid():
assert _safe_float(None) is None
assert _safe_float("abc") is None
assert _safe_float({}) is None
def test_find_ticker_sentiment_found():
sentiments = [
{"ticker": "MSFT", "ticker_sentiment_score": "0.2", "ticker_sentiment_label": "Bullish", "relevance_score": "0.5"},
{"ticker": "AAPL", "ticker_sentiment_score": "0.45", "ticker_sentiment_label": "Bullish", "relevance_score": "0.9"},
]
result = _find_ticker_sentiment(sentiments, "AAPL")
assert result["score"] == 0.45
assert result["label"] == "Bullish"
assert result["relevance"] == 0.9
def test_find_ticker_sentiment_not_found():
result = _find_ticker_sentiment([], "AAPL")
assert result["score"] is None
assert result["label"] is None
def test_find_ticker_sentiment_case_insensitive():
sentiments = [{"ticker": "aapl", "ticker_sentiment_score": "0.1", "ticker_sentiment_label": "Neutral", "relevance_score": "0.5"}]
result = _find_ticker_sentiment(sentiments, "AAPL")
assert result["score"] == 0.1
def test_parse_article():
article = {
"title": "Apple rises",
"url": "https://example.com",
"source": "Reuters",
"time_published": "20260308T120000",
"summary": "Apple stock rose today.",
"overall_sentiment_score": "0.3",
"overall_sentiment_label": "Somewhat-Bullish",
"ticker_sentiment": [
{"ticker": "AAPL", "ticker_sentiment_score": "0.5", "ticker_sentiment_label": "Bullish", "relevance_score": "0.95"},
],
"topics": [{"topic": "Technology"}, {"topic": "Earnings"}],
}
result = _parse_article(article, "AAPL")
assert result["title"] == "Apple rises"
assert result["ticker_sentiment_score"] == 0.5
assert result["ticker_sentiment_label"] == "Bullish"
assert result["topics"] == ["Technology", "Earnings"]
def test_compute_overall_sentiment_bullish():
articles = [
{"ticker_sentiment_score": 0.5},
{"ticker_sentiment_score": 0.4},
{"ticker_sentiment_score": 0.3},
]
result = _compute_overall_sentiment(articles)
assert result["label"] == "Bullish"
assert result["bullish_count"] == 3
assert result["bearish_count"] == 0
assert result["total_scored"] == 3
def test_compute_overall_sentiment_bearish():
articles = [
{"ticker_sentiment_score": -0.5},
{"ticker_sentiment_score": -0.4},
]
result = _compute_overall_sentiment(articles)
assert result["label"] == "Bearish"
assert result["bearish_count"] == 2
def test_compute_overall_sentiment_neutral():
articles = [
{"ticker_sentiment_score": 0.05},
{"ticker_sentiment_score": -0.05},
]
result = _compute_overall_sentiment(articles)
assert result["label"] == "Neutral"
def test_compute_overall_sentiment_empty():
result = _compute_overall_sentiment([])
assert result["label"] == "Unknown"
assert result["avg_score"] is None
assert result["total_scored"] == 0
def test_compute_overall_sentiment_skips_none():
articles = [
{"ticker_sentiment_score": 0.5},
{"ticker_sentiment_score": None},
]
result = _compute_overall_sentiment(articles)
assert result["total_scored"] == 1
# --- Integration tests for get_news_sentiment ---
@pytest.mark.asyncio
@patch("alphavantage_service.settings")
async def test_get_news_sentiment_not_configured(mock_settings):
mock_settings.alphavantage_api_key = ""
result = await get_news_sentiment("AAPL")
assert result["configured"] is False
@pytest.mark.asyncio
@patch("alphavantage_service.httpx.AsyncClient")
@patch("alphavantage_service.settings")
async def test_get_news_sentiment_success(mock_settings, mock_client_cls):
mock_settings.alphavantage_api_key = "test-key"
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.raise_for_status = MagicMock()
mock_resp.json.return_value = {
"feed": [
{
"title": "AAPL up",
"url": "https://example.com",
"source": "Reuters",
"time_published": "20260308T120000",
"summary": "Good news",
"overall_sentiment_score": "0.4",
"overall_sentiment_label": "Bullish",
"ticker_sentiment": [
{"ticker": "AAPL", "ticker_sentiment_score": "0.5", "ticker_sentiment_label": "Bullish", "relevance_score": "0.9"},
],
"topics": [{"topic": "Tech"}],
}
],
}
mock_client = AsyncMock()
mock_client.get.return_value = mock_resp
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client_cls.return_value = mock_client
result = await get_news_sentiment("AAPL")
assert result["configured"] is True
assert result["symbol"] == "AAPL"
assert result["article_count"] == 1
assert result["overall_sentiment"]["label"] == "Bullish"
assert result["articles"][0]["ticker_sentiment_score"] == 0.5
@pytest.mark.asyncio
@patch("alphavantage_service.httpx.AsyncClient")
@patch("alphavantage_service.settings")
async def test_get_news_sentiment_api_error(mock_settings, mock_client_cls):
mock_settings.alphavantage_api_key = "test-key"
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.raise_for_status = MagicMock()
mock_resp.json.return_value = {"Error Message": "Invalid API call"}
mock_client = AsyncMock()
mock_client.get.return_value = mock_resp
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client_cls.return_value = mock_client
result = await get_news_sentiment("AAPL")
assert result["configured"] is True
assert result["articles"] == []
assert "Invalid API call" in result["error"]

View File

@@ -0,0 +1,132 @@
from models import ActionEnum, ConfidenceEnum
from analysis_service import (
compute_analysis,
_score_pnl,
_score_pe,
_score_revenue_growth,
_score_target_price,
)
class TestScorePnl:
def test_large_loss_suggests_buy(self):
score, reason = _score_pnl(-0.25)
assert score == 1
assert "averaging down" in reason
def test_large_profit_suggests_sell(self):
score, reason = _score_pnl(0.60)
assert score == -1
assert "taking profit" in reason
def test_moderate_suggests_hold(self):
score, reason = _score_pnl(0.10)
assert score == 0
class TestScorePe:
def test_low_pe(self):
score, _ = _score_pe(10.0)
assert score == 1
def test_high_pe(self):
score, _ = _score_pe(50.0)
assert score == -1
def test_normal_pe(self):
score, _ = _score_pe(20.0)
assert score == 0
def test_negative_pe(self):
score, _ = _score_pe(-5.0)
assert score == -1
def test_none_pe(self):
score, reason = _score_pe(None)
assert score == 0
assert reason is None
class TestScoreRevenueGrowth:
def test_strong_growth(self):
score, _ = _score_revenue_growth(0.20)
assert score == 1
def test_negative_growth(self):
score, _ = _score_revenue_growth(-0.05)
assert score == -1
def test_moderate_growth(self):
score, _ = _score_revenue_growth(0.05)
assert score == 0
def test_none(self):
score, reason = _score_revenue_growth(None)
assert score == 0
class TestScoreTargetPrice:
def test_big_upside(self):
score, _ = _score_target_price(100.0, 120.0)
assert score == 1
def test_big_downside(self):
score, _ = _score_target_price(100.0, 85.0)
assert score == -1
def test_near_target(self):
score, _ = _score_target_price(100.0, 105.0)
assert score == 0
def test_none_price(self):
score, _ = _score_target_price(None, 120.0)
assert score == 0
class TestComputeAnalysis:
def test_strong_buy_signals(self):
result = compute_analysis(
current_price=100.0,
buy_in_price=130.0, # loss > 20%
target_price=120.0, # upside > 15%
metrics={"pe_ratio": 10.0, "revenue_growth": 0.20},
)
assert result.action == ActionEnum.BUY_MORE
assert result.confidence == ConfidenceEnum.HIGH
def test_strong_sell_signals(self):
result = compute_analysis(
current_price=200.0,
buy_in_price=100.0, # profit > 50%
target_price=170.0, # downside
metrics={"pe_ratio": 50.0, "revenue_growth": -0.10},
)
assert result.action == ActionEnum.SELL
def test_mixed_signals_hold(self):
result = compute_analysis(
current_price=100.0,
buy_in_price=95.0,
target_price=105.0,
metrics={"pe_ratio": 20.0, "revenue_growth": 0.05},
)
assert result.action == ActionEnum.HOLD
def test_no_data(self):
result = compute_analysis(
current_price=None,
buy_in_price=100.0,
target_price=None,
metrics={},
)
assert result.action == ActionEnum.HOLD
assert result.confidence == ConfidenceEnum.LOW
def test_reasons_populated(self):
result = compute_analysis(
current_price=100.0,
buy_in_price=90.0,
target_price=110.0,
metrics={"pe_ratio": 25.0},
)
assert len(result.reasons) > 0

View File

@@ -0,0 +1,94 @@
from unittest.mock import patch, AsyncMock, MagicMock
import pytest
import finnhub_service
@pytest.mark.asyncio
@patch("finnhub_service.settings")
async def test_not_configured_returns_empty(mock_settings):
mock_settings.finnhub_api_key = ""
result = await finnhub_service.get_news_sentiment("AAPL")
assert result == {}
@pytest.mark.asyncio
@patch("finnhub_service.settings")
async def test_not_configured_returns_message(mock_settings):
mock_settings.finnhub_api_key = ""
result = await finnhub_service.get_sentiment_summary("AAPL")
assert result["configured"] is False
@pytest.mark.asyncio
@patch("finnhub_service.settings")
async def test_insider_not_configured(mock_settings):
mock_settings.finnhub_api_key = ""
result = await finnhub_service.get_insider_transactions("AAPL")
assert result == []
@pytest.mark.asyncio
@patch("finnhub_service.settings")
async def test_recommendations_not_configured(mock_settings):
mock_settings.finnhub_api_key = ""
result = await finnhub_service.get_recommendation_trends("AAPL")
assert result == []
@pytest.mark.asyncio
@patch("finnhub_service.settings")
async def test_upgrade_downgrade_not_configured(mock_settings):
mock_settings.finnhub_api_key = ""
result = await finnhub_service.get_upgrade_downgrade("AAPL")
assert result == []
@pytest.mark.asyncio
@patch("finnhub_service.settings")
@patch("finnhub_service.httpx.AsyncClient")
async def test_news_sentiment_with_key(mock_client_cls, mock_settings):
mock_settings.finnhub_api_key = "test_key"
mock_resp = MagicMock()
mock_resp.json.return_value = {
"buzz": {"articlesInLastWeek": 10},
"sentiment": {"bullishPercent": 0.7, "bearishPercent": 0.3},
"companyNewsScore": 0.65,
"symbol": "AAPL",
}
mock_resp.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get.return_value = mock_resp
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client_cls.return_value = mock_client
result = await finnhub_service.get_news_sentiment("AAPL")
assert result["symbol"] == "AAPL"
assert result["sentiment"]["bullishPercent"] == 0.7
@pytest.mark.asyncio
@patch("finnhub_service.settings")
@patch("finnhub_service.httpx.AsyncClient")
async def test_insider_transactions_with_key(mock_client_cls, mock_settings):
mock_settings.finnhub_api_key = "test_key"
mock_resp = MagicMock()
mock_resp.json.return_value = {
"data": [
{"name": "Tim Cook", "share": 100000, "change": -50000, "transactionCode": "S"}
]
}
mock_resp.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get.return_value = mock_resp
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client_cls.return_value = mock_client
result = await finnhub_service.get_insider_transactions("AAPL")
assert len(result) == 1
assert result[0]["name"] == "Tim Cook"

68
tests/test_mappers.py Normal file
View File

@@ -0,0 +1,68 @@
from mappers import (
discover_items_from_list,
metrics_from_dict,
profile_from_dict,
quote_from_dict,
)
class TestQuoteFromDict:
def test_basic(self):
q = quote_from_dict("AAPL", {"name": "Apple", "last_price": 180.0})
assert q.symbol == "AAPL"
assert q.price == 180.0
def test_fallback_to_close(self):
q = quote_from_dict("AAPL", {"close": 175.0})
assert q.price == 175.0
def test_empty_dict(self):
q = quote_from_dict("AAPL", {})
assert q.price is None
class TestProfileFromDict:
def test_basic(self):
p = profile_from_dict("AAPL", {"name": "Apple", "sector": "Tech"})
assert p.sector == "Tech"
def test_description_fallback(self):
p = profile_from_dict("AAPL", {"long_description": "A company"})
assert p.description == "A company"
def test_employees_fallback(self):
p = profile_from_dict("AAPL", {"full_time_employees": 150000})
assert p.employees == 150000
class TestMetricsFromDict:
def test_basic(self):
m = metrics_from_dict("AAPL", {"pe_ratio": 28.0, "roe": 0.15})
assert m.pe_ratio == 28.0
assert m.roe == 0.15
def test_roe_fallback(self):
m = metrics_from_dict("AAPL", {"return_on_equity": 0.20})
assert m.roe == 0.20
def test_eps_fallback(self):
m = metrics_from_dict("AAPL", {"eps_ttm": 6.5})
assert m.eps == 6.5
def test_empty_dict(self):
m = metrics_from_dict("AAPL", {})
assert m.pe_ratio is None
class TestDiscoverItemsFromList:
def test_basic(self):
items = discover_items_from_list([
{"symbol": "TSLA", "price": 250.0},
{"symbol": "AAPL", "last_price": 180.0},
])
assert len(items) == 2
assert items[0]["symbol"] == "TSLA"
assert items[1]["price"] == 180.0
def test_empty_list(self):
assert discover_items_from_list([]) == []

149
tests/test_models.py Normal file
View File

@@ -0,0 +1,149 @@
from datetime import datetime, timezone
from pydantic import ValidationError
from models import (
ActionEnum,
AnalysisResult,
ConfidenceEnum,
Holding,
HoldingAnalysis,
MetricsResponse,
PortfolioRequest,
PortfolioResponse,
QuoteResponse,
)
def test_holding_valid():
h = Holding(symbol="AAPL", shares=100, buy_in_price=150.0)
assert h.symbol == "AAPL"
assert h.shares == 100
assert h.buy_in_price == 150.0
def test_holding_swedish_symbol():
h = Holding(symbol="VOLV-B.ST", shares=50, buy_in_price=250.0)
assert h.symbol == "VOLV-B.ST"
def test_holding_symbol_uppercased():
h = Holding(symbol="aapl", shares=10, buy_in_price=150.0)
assert h.symbol == "AAPL"
def test_holding_invalid_symbol_format():
try:
Holding(symbol="AAPL;DROP TABLE", shares=10, buy_in_price=150.0)
assert False, "Should have raised"
except ValidationError:
pass
def test_holding_symbol_too_long():
try:
Holding(symbol="A" * 21, shares=10, buy_in_price=150.0)
assert False, "Should have raised"
except ValidationError:
pass
def test_holding_invalid_shares():
try:
Holding(symbol="AAPL", shares=0, buy_in_price=150.0)
assert False, "Should have raised"
except ValidationError:
pass
def test_holding_invalid_price():
try:
Holding(symbol="AAPL", shares=10, buy_in_price=-5.0)
assert False, "Should have raised"
except ValidationError:
pass
def test_portfolio_request_empty():
try:
PortfolioRequest(holdings=[])
assert False, "Should have raised"
except ValidationError:
pass
def test_portfolio_request_too_many():
holdings = [
Holding(symbol="AAPL", shares=1, buy_in_price=100)
for _ in range(51)
]
try:
PortfolioRequest(holdings=holdings)
assert False, "Should have raised"
except ValidationError:
pass
def test_portfolio_request_valid():
req = PortfolioRequest(
holdings=[Holding(symbol="AAPL", shares=10, buy_in_price=150)]
)
assert len(req.holdings) == 1
def test_quote_response_defaults():
q = QuoteResponse(symbol="AAPL")
assert q.price is None
assert q.change is None
def test_metrics_response():
m = MetricsResponse(symbol="AAPL", pe_ratio=25.0, roe=0.15)
assert m.pe_ratio == 25.0
assert m.pb_ratio is None
def test_analysis_result():
a = AnalysisResult(
action=ActionEnum.BUY_MORE,
confidence=ConfidenceEnum.HIGH,
reasons=["Low PE", "Strong growth"],
)
assert a.action == ActionEnum.BUY_MORE
assert len(a.reasons) == 2
def test_holding_analysis():
ha = HoldingAnalysis(
symbol="AAPL",
current_price=180.0,
buy_in_price=150.0,
shares=100,
pnl=3000.0,
pnl_percent=0.2,
analysis=AnalysisResult(
action=ActionEnum.HOLD,
confidence=ConfidenceEnum.MEDIUM,
reasons=["Within hold range"],
),
)
assert ha.pnl == 3000.0
def test_portfolio_response():
pr = PortfolioResponse(
holdings=[
HoldingAnalysis(
symbol="AAPL",
buy_in_price=150.0,
shares=100,
analysis=AnalysisResult(
action=ActionEnum.HOLD,
confidence=ConfidenceEnum.LOW,
reasons=["No data"],
),
)
],
analyzed_at=datetime.now(timezone.utc),
)
assert len(pr.holdings) == 1

View File

@@ -0,0 +1,47 @@
from openbb_service import _to_dicts, _first_or_empty
class MockModel:
def __init__(self, data: dict):
self._data = data
def model_dump(self):
return self._data
class MockOBBject:
def __init__(self, results):
self.results = results
class TestToDicts:
def test_none_result(self):
assert _to_dicts(None) == []
def test_none_results(self):
obj = MockOBBject(results=None)
assert _to_dicts(obj) == []
def test_list_results(self):
obj = MockOBBject(results=[
MockModel({"a": 1}),
MockModel({"b": 2}),
])
result = _to_dicts(obj)
assert len(result) == 2
assert result[0] == {"a": 1}
def test_single_result(self):
obj = MockOBBject(results=MockModel({"x": 42}))
result = _to_dicts(obj)
assert result == [{"x": 42}]
class TestFirstOrEmpty:
def test_empty(self):
assert _first_or_empty(None) == {}
def test_with_data(self):
obj = MockOBBject(results=[MockModel({"price": 150.0})])
result = _first_or_empty(obj)
assert result == {"price": 150.0}

242
tests/test_routes.py Normal file
View File

@@ -0,0 +1,242 @@
import pytest
from datetime import datetime, timezone
from unittest.mock import patch, AsyncMock
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
@pytest.mark.asyncio
async def test_health(client):
resp = await client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
# --- Symbol Validation ---
@pytest.mark.asyncio
async def test_invalid_symbol_returns_400(client):
resp = await client.get("/api/v1/stock/AAPL;DROP/quote")
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_symbol_too_long_returns_422(client):
resp = await client.get(f"/api/v1/stock/{'A' * 25}/quote")
assert resp.status_code == 422
# --- Stock Endpoints ---
@pytest.mark.asyncio
@patch("routes.openbb_service.get_quote", new_callable=AsyncMock)
async def test_stock_quote(mock_quote, client):
mock_quote.return_value = {
"name": "Apple Inc.",
"last_price": 180.0,
"change": 2.5,
"change_percent": 1.4,
"volume": 50000000,
"market_cap": 2800000000000,
"currency": "USD",
}
resp = await client.get("/api/v1/stock/AAPL/quote")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["price"] == 180.0
assert data["data"]["symbol"] == "AAPL"
@pytest.mark.asyncio
@patch("routes.openbb_service.get_profile", new_callable=AsyncMock)
async def test_stock_profile(mock_profile, client):
mock_profile.return_value = {
"name": "Apple Inc.",
"sector": "Technology",
"industry": "Consumer Electronics",
"country": "US",
}
resp = await client.get("/api/v1/stock/AAPL/profile")
assert resp.status_code == 200
data = resp.json()
assert data["data"]["sector"] == "Technology"
@pytest.mark.asyncio
@patch("routes.openbb_service.get_metrics", new_callable=AsyncMock)
async def test_stock_metrics(mock_metrics, client):
mock_metrics.return_value = {"pe_ratio": 28.5, "roe": 0.15}
resp = await client.get("/api/v1/stock/AAPL/metrics")
assert resp.status_code == 200
assert resp.json()["data"]["pe_ratio"] == 28.5
@pytest.mark.asyncio
@patch("routes.openbb_service.get_historical", new_callable=AsyncMock)
async def test_stock_historical(mock_hist, client):
mock_hist.return_value = [
{"date": "2025-01-01", "open": 150, "close": 155, "volume": 1000000}
]
resp = await client.get("/api/v1/stock/AAPL/historical?days=30")
assert resp.status_code == 200
data = resp.json()["data"]
assert len(data) == 1
@pytest.mark.asyncio
@patch("routes.openbb_service.get_news", new_callable=AsyncMock)
async def test_stock_news(mock_news, client):
mock_news.return_value = [
{"title": "Apple reports earnings", "url": "https://example.com", "date": "2025-01-01"}
]
resp = await client.get("/api/v1/stock/AAPL/news")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 1
@pytest.mark.asyncio
@patch("routes.openbb_service.get_summary", new_callable=AsyncMock)
async def test_stock_summary(mock_summary, client):
mock_summary.return_value = {
"quote": {"name": "Apple", "last_price": 180.0},
"profile": {"name": "Apple", "sector": "Tech"},
"metrics": {"pe_ratio": 28.0},
"financials": {"symbol": "AAPL", "income": [], "balance": [], "cash_flow": []},
}
resp = await client.get("/api/v1/stock/AAPL/summary")
assert resp.status_code == 200
data = resp.json()["data"]
assert data["quote"]["price"] == 180.0
assert data["profile"]["sector"] == "Tech"
# --- Error Handling ---
@pytest.mark.asyncio
@patch("routes.openbb_service.get_quote", new_callable=AsyncMock)
async def test_upstream_error_returns_502(mock_quote, client):
mock_quote.side_effect = RuntimeError("Connection failed")
resp = await client.get("/api/v1/stock/AAPL/quote")
assert resp.status_code == 502
assert "Data provider error" in resp.json()["detail"]
# Verify raw exception is NOT leaked
assert "Connection failed" not in resp.json()["detail"]
# --- Portfolio Analysis ---
@pytest.mark.asyncio
@patch("routes.analysis_service.analyze_portfolio", new_callable=AsyncMock)
async def test_portfolio_analyze(mock_analyze, client):
from models import (
ActionEnum,
AnalysisResult,
ConfidenceEnum,
HoldingAnalysis,
PortfolioResponse,
)
mock_analyze.return_value = PortfolioResponse(
holdings=[
HoldingAnalysis(
symbol="AAPL",
current_price=180.0,
buy_in_price=150.0,
shares=100,
pnl=3000.0,
pnl_percent=0.20,
analysis=AnalysisResult(
action=ActionEnum.HOLD,
confidence=ConfidenceEnum.MEDIUM,
reasons=["Within hold range"],
),
)
],
analyzed_at=datetime.now(timezone.utc),
)
resp = await client.post(
"/api/v1/portfolio/analyze",
json={
"holdings": [
{"symbol": "AAPL", "shares": 100, "buy_in_price": 150.0}
]
},
)
assert resp.status_code == 200
data = resp.json()["data"]
assert len(data["holdings"]) == 1
assert data["holdings"][0]["analysis"]["action"] == "HOLD"
@pytest.mark.asyncio
async def test_portfolio_analyze_empty_body(client):
resp = await client.post(
"/api/v1/portfolio/analyze",
json={"holdings": []},
)
assert resp.status_code == 422
# --- Discovery Endpoints ---
@pytest.mark.asyncio
@patch("routes.openbb_service.get_gainers", new_callable=AsyncMock)
async def test_discover_gainers(mock_gainers, client):
mock_gainers.return_value = [
{"symbol": "TSLA", "name": "Tesla", "price": 250.0, "change_percent": 5.0}
]
resp = await client.get("/api/v1/discover/gainers")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 1
@pytest.mark.asyncio
@patch("routes.openbb_service.get_losers", new_callable=AsyncMock)
async def test_discover_losers(mock_losers, client):
mock_losers.return_value = [
{"symbol": "XYZ", "name": "XYZ Corp", "price": 10.0, "change_percent": -8.0}
]
resp = await client.get("/api/v1/discover/losers")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 1
@pytest.mark.asyncio
@patch("routes.openbb_service.get_active", new_callable=AsyncMock)
async def test_discover_active(mock_active, client):
mock_active.return_value = [
{"symbol": "NVDA", "name": "NVIDIA", "volume": 99000000}
]
resp = await client.get("/api/v1/discover/active")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 1
@pytest.mark.asyncio
@patch("routes.openbb_service.get_undervalued", new_callable=AsyncMock)
async def test_discover_undervalued(mock_uv, client):
mock_uv.return_value = [{"symbol": "IBM", "name": "IBM"}]
resp = await client.get("/api/v1/discover/undervalued")
assert resp.status_code == 200
@pytest.mark.asyncio
@patch("routes.openbb_service.get_growth", new_callable=AsyncMock)
async def test_discover_growth(mock_growth, client):
mock_growth.return_value = [{"symbol": "PLTR", "name": "Palantir"}]
resp = await client.get("/api/v1/discover/growth")
assert resp.status_code == 200

View File

@@ -0,0 +1,39 @@
from unittest.mock import patch, AsyncMock
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
@pytest.mark.asyncio
@patch("routes_macro.macro_service.get_macro_overview", new_callable=AsyncMock)
async def test_macro_overview(mock_overview, client):
mock_overview.return_value = {
"fed_funds_rate": {"value": 5.33, "date": "2026-01-01"},
"us_10y_treasury": {"value": 4.25, "date": "2026-01-01"},
"unemployment_rate": {"value": 3.7, "date": "2026-01-01"},
}
resp = await client.get("/api/v1/macro/overview")
assert resp.status_code == 200
data = resp.json()["data"]
assert data["fed_funds_rate"]["value"] == 5.33
@pytest.mark.asyncio
@patch("routes_macro.macro_service.get_series", new_callable=AsyncMock)
async def test_macro_series(mock_series, client):
mock_series.return_value = [
{"date": "2026-01-01", "value": 5.33}
]
resp = await client.get("/api/v1/macro/series/FEDFUNDS?limit=5")
assert resp.status_code == 200
data = resp.json()["data"]
assert len(data) == 1

View File

@@ -0,0 +1,104 @@
from unittest.mock import patch, AsyncMock
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
@pytest.mark.asyncio
@patch("routes_sentiment.alphavantage_service.get_news_sentiment", new_callable=AsyncMock)
@patch("routes_sentiment.finnhub_service.get_sentiment_summary", new_callable=AsyncMock)
async def test_stock_sentiment(mock_sentiment, mock_av, client):
mock_sentiment.return_value = {
"symbol": "AAPL",
"news_sentiment": {"bullish_percent": 0.7, "bearish_percent": 0.3},
"recent_news": [],
"analyst_recommendations": [],
"recent_upgrades_downgrades": [],
}
mock_av.return_value = {
"configured": True,
"symbol": "AAPL",
"article_count": 1,
"overall_sentiment": {"avg_score": 0.4, "label": "Bullish"},
"articles": [],
}
resp = await client.get("/api/v1/stock/AAPL/sentiment")
assert resp.status_code == 200
data = resp.json()["data"]
assert data["symbol"] == "AAPL"
assert data["news_sentiment"]["bullish_percent"] == 0.7
assert data["alpha_vantage_sentiment"]["overall_sentiment"]["label"] == "Bullish"
@pytest.mark.asyncio
@patch("routes_sentiment.finnhub_service.get_insider_transactions", new_callable=AsyncMock)
async def test_stock_insider_trades(mock_insider, client):
mock_insider.return_value = [
{"name": "Tim Cook", "share": 100000, "change": -50000, "transactionCode": "S"}
]
resp = await client.get("/api/v1/stock/AAPL/insider-trades")
assert resp.status_code == 200
data = resp.json()["data"]
assert len(data) == 1
assert data[0]["name"] == "Tim Cook"
@pytest.mark.asyncio
@patch("routes_sentiment.finnhub_service.get_recommendation_trends", new_callable=AsyncMock)
async def test_stock_recommendations(mock_recs, client):
mock_recs.return_value = [
{"period": "2026-01-01", "strongBuy": 10, "buy": 15, "hold": 5, "sell": 1, "strongSell": 0}
]
resp = await client.get("/api/v1/stock/AAPL/recommendations")
assert resp.status_code == 200
data = resp.json()["data"]
assert len(data) == 1
assert data[0]["strong_buy"] == 10
@pytest.mark.asyncio
@patch("routes_sentiment.finnhub_service.get_upgrade_downgrade", new_callable=AsyncMock)
async def test_stock_upgrades(mock_upgrades, client):
mock_upgrades.return_value = [
{"company": "Morgan Stanley", "action": "upgrade", "fromGrade": "Hold", "toGrade": "Buy"}
]
resp = await client.get("/api/v1/stock/AAPL/upgrades")
assert resp.status_code == 200
data = resp.json()["data"]
assert len(data) == 1
assert data[0]["action"] == "upgrade"
@pytest.mark.asyncio
@patch("routes_sentiment.alphavantage_service.get_news_sentiment", new_callable=AsyncMock)
async def test_stock_news_sentiment(mock_av, client):
mock_av.return_value = {
"configured": True,
"symbol": "AAPL",
"article_count": 2,
"overall_sentiment": {"avg_score": 0.3, "label": "Somewhat-Bullish"},
"articles": [
{"title": "Apple up", "ticker_sentiment_score": 0.5},
{"title": "Apple flat", "ticker_sentiment_score": 0.1},
],
}
resp = await client.get("/api/v1/stock/AAPL/news-sentiment")
assert resp.status_code == 200
data = resp.json()["data"]
assert data["article_count"] == 2
assert data["overall_sentiment"]["label"] == "Somewhat-Bullish"
@pytest.mark.asyncio
async def test_invalid_symbol_sentiment(client):
resp = await client.get("/api/v1/stock/AAPL;DROP/sentiment")
assert resp.status_code == 400

View File

@@ -0,0 +1,42 @@
from unittest.mock import patch, AsyncMock
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
@pytest.mark.asyncio
@patch("routes_technical.technical_service.get_technical_indicators", new_callable=AsyncMock)
async def test_stock_technical(mock_tech, client):
mock_tech.return_value = {
"symbol": "AAPL",
"rsi_14": 55.3,
"macd": {"macd": 1.5, "signal": 1.2, "histogram": 0.3},
"sma_20": 180.0,
"sma_50": 175.0,
"sma_200": 170.0,
"ema_12": 179.0,
"ema_26": 176.0,
"bollinger_bands": {"upper": 190.0, "middle": 180.0, "lower": 170.0},
"signals": ["RSI 55.3: Neutral", "MACD histogram positive (bullish momentum)"],
}
resp = await client.get("/api/v1/stock/AAPL/technical")
assert resp.status_code == 200
data = resp.json()["data"]
assert data["symbol"] == "AAPL"
assert data["rsi_14"] == 55.3
assert len(data["signals"]) == 2
@pytest.mark.asyncio
async def test_invalid_symbol_technical(client):
resp = await client.get("/api/v1/stock/INVALID!!!/technical")
assert resp.status_code == 400