Files
openbb-invest-api/finnhub_service.py
Yaojia Wang 0f7341b158 refactor: address architect review findings (6 items)
R1: Extend @safe to catch ValueError->400, simplify routes_backtest
    (eliminated 4 copies of duplicated try/except)
R2: Consolidate PROVIDER constant into obb_utils.py (single source)
R3: Add days_ago() helper to obb_utils.py, replace 8+ duplications
R4: Extract Reddit/ApeWisdom into reddit_service.py from finnhub_service
R5: Fix missing top-level import asyncio in finnhub_service
R6: (deferred - sentiment logic extraction is a larger change)

All 561 tests passing.
2026-03-19 23:15:00 +01:00

232 lines
8.1 KiB
Python

"""Finnhub API client for sentiment, insider trades, and analyst data."""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Any
import httpx
from config import settings
logger = logging.getLogger(__name__)
BASE_URL = "https://finnhub.io/api/v1"
TIMEOUT = 15.0
def _client() -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=BASE_URL,
timeout=TIMEOUT,
params={"token": settings.finnhub_api_key},
)
def _is_configured() -> bool:
return bool(settings.finnhub_api_key)
async def get_news_sentiment(symbol: str) -> dict[str, Any]:
"""Get aggregated news sentiment scores for a symbol.
Note: This endpoint requires a Finnhub premium plan.
Returns empty dict on 403 (free tier).
"""
if not _is_configured():
return {}
async with _client() as client:
resp = await client.get("/news-sentiment", params={"symbol": symbol})
if resp.status_code == 403:
logger.debug("news-sentiment endpoint requires premium plan, skipping")
return {}
resp.raise_for_status()
return resp.json()
async def get_company_news(symbol: str, days: int = 7) -> list[dict[str, Any]]:
"""Get recent company news articles."""
if not _is_configured():
return []
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/company-news",
params={"symbol": symbol, "from": start, "to": end},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_insider_transactions(symbol: str) -> list[dict[str, Any]]:
"""Get insider transactions for a symbol."""
if not _is_configured():
return []
async with _client() as client:
resp = await client.get(
"/stock/insider-transactions",
params={"symbol": symbol},
)
resp.raise_for_status()
data = resp.json()
return data.get("data", []) if isinstance(data, dict) else []
async def get_recommendation_trends(symbol: str) -> list[dict[str, Any]]:
"""Get analyst recommendation trends (monthly breakdown)."""
if not _is_configured():
return []
async with _client() as client:
resp = await client.get(
"/stock/recommendation",
params={"symbol": symbol},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_upgrade_downgrade(
symbol: str, days: int = 90
) -> list[dict[str, Any]]:
"""Get recent analyst upgrades/downgrades."""
if not _is_configured():
return []
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/stock/upgrade-downgrade",
params={"symbol": symbol, "from": start},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_social_sentiment(symbol: str) -> dict[str, Any]:
"""Get social media sentiment from Reddit and Twitter.
Returns mention counts, positive/negative scores, and trends.
"""
if not _is_configured():
return {"configured": False, "message": "Set INVEST_API_FINNHUB_API_KEY"}
start = (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/stock/social-sentiment",
params={"symbol": symbol, "from": start},
)
if resp.status_code in (403, 401):
logger.debug("social-sentiment requires premium, skipping")
return {"configured": True, "symbol": symbol, "premium_required": True, "reddit": [], "twitter": []}
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
return {"configured": True, "symbol": symbol, "reddit": [], "twitter": []}
reddit = data.get("reddit", [])
twitter = data.get("twitter", [])
# Compute summary stats
reddit_summary = _summarize_social(reddit) if reddit else None
twitter_summary = _summarize_social(twitter) if twitter else None
return {
"configured": True,
"symbol": symbol,
"reddit_summary": reddit_summary,
"twitter_summary": twitter_summary,
"reddit": reddit[-20:],
"twitter": twitter[-20:],
}
def _summarize_social(entries: list[dict[str, Any]]) -> dict[str, Any]:
"""Summarize social sentiment entries into aggregate stats."""
if not entries:
return {}
total_mentions = sum(e.get("mention", 0) for e in entries)
total_positive = sum(e.get("positiveScore", 0) for e in entries)
total_negative = sum(e.get("negativeScore", 0) for e in entries)
avg_score = sum(e.get("score", 0) for e in entries) / len(entries)
return {
"total_mentions": total_mentions,
"total_positive": total_positive,
"total_negative": total_negative,
"avg_score": round(avg_score, 4),
"data_points": len(entries),
}
# Reddit sentiment moved to reddit_service.py
async def get_sentiment_summary(symbol: str) -> dict[str, Any]:
"""Aggregate all sentiment data for a symbol into one response."""
if not _is_configured():
return {"configured": False, "message": "Set INVEST_API_FINNHUB_API_KEY to enable sentiment data"}
news_sentiment, company_news, recommendations, upgrades = await asyncio.gather(
get_news_sentiment(symbol),
get_company_news(symbol, days=7),
get_recommendation_trends(symbol),
get_upgrade_downgrade(symbol, days=90),
return_exceptions=True,
)
def _safe_result(result: Any, default: Any) -> Any:
return default if isinstance(result, BaseException) else result
news_sentiment = _safe_result(news_sentiment, {})
company_news = _safe_result(company_news, [])
recommendations = _safe_result(recommendations, [])
upgrades = _safe_result(upgrades, [])
# Extract key sentiment metrics
sentiment_data = news_sentiment.get("sentiment", {}) if isinstance(news_sentiment, dict) else {}
buzz_data = news_sentiment.get("buzz", {}) if isinstance(news_sentiment, dict) else {}
return {
"symbol": symbol,
"news_sentiment": {
"bullish_percent": sentiment_data.get("bullishPercent"),
"bearish_percent": sentiment_data.get("bearishPercent"),
"news_score": news_sentiment.get("companyNewsScore") if isinstance(news_sentiment, dict) else None,
"sector_avg_score": news_sentiment.get("sectorAverageNewsScore") if isinstance(news_sentiment, dict) else None,
"articles_last_week": buzz_data.get("articlesInLastWeek"),
"weekly_average": buzz_data.get("weeklyAverage"),
"buzz": buzz_data.get("buzz"),
},
"recent_news": [
{
"headline": n.get("headline"),
"source": n.get("source"),
"url": n.get("url"),
"datetime": n.get("datetime"),
"summary": n.get("summary"),
}
for n in (company_news[:10] if isinstance(company_news, list) else [])
],
"analyst_recommendations": [
{
"period": r.get("period"),
"strong_buy": r.get("strongBuy"),
"buy": r.get("buy"),
"hold": r.get("hold"),
"sell": r.get("sell"),
"strong_sell": r.get("strongSell"),
}
for r in (recommendations[:6] if isinstance(recommendations, list) else [])
],
"recent_upgrades_downgrades": [
{
"company": u.get("company"),
"action": u.get("action"),
"from_grade": u.get("fromGrade"),
"to_grade": u.get("toGrade"),
"date": u.get("gradeTime"),
}
for u in (upgrades[:10] if isinstance(upgrades, list) else [])
],
}