Files
openbb-invest-api/finnhub_service.py
Yaojia Wang 4eb06dd8e5 feat: add social media sentiment endpoints
- /stock/{symbol}/social-sentiment -- Finnhub Reddit+Twitter sentiment
  (requires premium, gracefully degrades)
- /stock/{symbol}/reddit-sentiment -- Reddit WSB/stocks/investing
  mentions, upvotes, rank via ApeWisdom (free, no key)
- /discover/reddit-trending -- Top 25 trending stocks on Reddit
  (free, no key)

ApeWisdom provides real-time Reddit data without API key.
Finnhub social-sentiment requires premium plan but endpoint
responds gracefully with premium_required flag.
2026-03-19 20:50:28 +01:00

304 lines
11 KiB
Python

"""Finnhub API client for sentiment, insider trades, and analyst data."""
import logging
from datetime import datetime, timedelta
from typing import Any
import httpx
from config import settings
logger = logging.getLogger(__name__)
BASE_URL = "https://finnhub.io/api/v1"
TIMEOUT = 15.0
def _client() -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=BASE_URL,
timeout=TIMEOUT,
params={"token": settings.finnhub_api_key},
)
def _is_configured() -> bool:
return bool(settings.finnhub_api_key)
async def get_news_sentiment(symbol: str) -> dict[str, Any]:
"""Get aggregated news sentiment scores for a symbol.
Note: This endpoint requires a Finnhub premium plan.
Returns empty dict on 403 (free tier).
"""
if not _is_configured():
return {}
async with _client() as client:
resp = await client.get("/news-sentiment", params={"symbol": symbol})
if resp.status_code == 403:
logger.debug("news-sentiment endpoint requires premium plan, skipping")
return {}
resp.raise_for_status()
return resp.json()
async def get_company_news(symbol: str, days: int = 7) -> list[dict[str, Any]]:
"""Get recent company news articles."""
if not _is_configured():
return []
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/company-news",
params={"symbol": symbol, "from": start, "to": end},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_insider_transactions(symbol: str) -> list[dict[str, Any]]:
"""Get insider transactions for a symbol."""
if not _is_configured():
return []
async with _client() as client:
resp = await client.get(
"/stock/insider-transactions",
params={"symbol": symbol},
)
resp.raise_for_status()
data = resp.json()
return data.get("data", []) if isinstance(data, dict) else []
async def get_recommendation_trends(symbol: str) -> list[dict[str, Any]]:
"""Get analyst recommendation trends (monthly breakdown)."""
if not _is_configured():
return []
async with _client() as client:
resp = await client.get(
"/stock/recommendation",
params={"symbol": symbol},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_upgrade_downgrade(
symbol: str, days: int = 90
) -> list[dict[str, Any]]:
"""Get recent analyst upgrades/downgrades."""
if not _is_configured():
return []
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/stock/upgrade-downgrade",
params={"symbol": symbol, "from": start},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
async def get_social_sentiment(symbol: str) -> dict[str, Any]:
"""Get social media sentiment from Reddit and Twitter.
Returns mention counts, positive/negative scores, and trends.
"""
if not _is_configured():
return {"configured": False, "message": "Set INVEST_API_FINNHUB_API_KEY"}
start = (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d")
async with _client() as client:
resp = await client.get(
"/stock/social-sentiment",
params={"symbol": symbol, "from": start},
)
if resp.status_code in (403, 401):
logger.debug("social-sentiment requires premium, skipping")
return {"configured": True, "symbol": symbol, "premium_required": True, "reddit": [], "twitter": []}
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
return {"configured": True, "symbol": symbol, "reddit": [], "twitter": []}
reddit = data.get("reddit", [])
twitter = data.get("twitter", [])
# Compute summary stats
reddit_summary = _summarize_social(reddit) if reddit else None
twitter_summary = _summarize_social(twitter) if twitter else None
return {
"configured": True,
"symbol": symbol,
"reddit_summary": reddit_summary,
"twitter_summary": twitter_summary,
"reddit": reddit[-20:],
"twitter": twitter[-20:],
}
def _summarize_social(entries: list[dict[str, Any]]) -> dict[str, Any]:
"""Summarize social sentiment entries into aggregate stats."""
if not entries:
return {}
total_mentions = sum(e.get("mention", 0) for e in entries)
total_positive = sum(e.get("positiveScore", 0) for e in entries)
total_negative = sum(e.get("negativeScore", 0) for e in entries)
avg_score = sum(e.get("score", 0) for e in entries) / len(entries)
return {
"total_mentions": total_mentions,
"total_positive": total_positive,
"total_negative": total_negative,
"avg_score": round(avg_score, 4),
"data_points": len(entries),
}
async def get_reddit_sentiment(symbol: str) -> dict[str, Any]:
"""Get Reddit sentiment from ApeWisdom (free, no key needed).
Tracks mentions and upvotes across r/wallstreetbets, r/stocks, r/investing.
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://apewisdom.io/api/v1.0/filter/all-stocks/page/1"
)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
# Find the requested symbol
match = next(
(r for r in results if r.get("ticker", "").upper() == symbol.upper()),
None,
)
if match is None:
return {
"symbol": symbol,
"found": False,
"message": f"{symbol} not in Reddit top trending (not enough mentions)",
}
mentions_prev = match.get("mentions_24h_ago", 0)
mentions_now = match.get("mentions", 0)
change_pct = (
round((mentions_now - mentions_prev) / mentions_prev * 100, 1)
if mentions_prev > 0
else None
)
return {
"symbol": symbol,
"found": True,
"rank": match.get("rank"),
"mentions_24h": mentions_now,
"mentions_24h_ago": mentions_prev,
"mentions_change_pct": change_pct,
"upvotes": match.get("upvotes"),
"rank_24h_ago": match.get("rank_24h_ago"),
}
except Exception:
logger.warning("Reddit sentiment failed for %s", symbol, exc_info=True)
return {"symbol": symbol, "error": "Failed to fetch Reddit sentiment"}
async def get_reddit_trending() -> list[dict[str, Any]]:
"""Get top trending stocks on Reddit (ApeWisdom, free, no key)."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://apewisdom.io/api/v1.0/filter/all-stocks/page/1"
)
resp.raise_for_status()
data = resp.json()
return [
{
"rank": r.get("rank"),
"symbol": r.get("ticker"),
"name": r.get("name"),
"mentions_24h": r.get("mentions"),
"upvotes": r.get("upvotes"),
"rank_24h_ago": r.get("rank_24h_ago"),
"mentions_24h_ago": r.get("mentions_24h_ago"),
}
for r in data.get("results", [])[:25]
]
except Exception:
logger.warning("Reddit trending failed", exc_info=True)
return []
async def get_sentiment_summary(symbol: str) -> dict[str, Any]:
"""Aggregate all sentiment data for a symbol into one response."""
if not _is_configured():
return {"configured": False, "message": "Set INVEST_API_FINNHUB_API_KEY to enable sentiment data"}
import asyncio
news_sentiment, company_news, recommendations, upgrades = await asyncio.gather(
get_news_sentiment(symbol),
get_company_news(symbol, days=7),
get_recommendation_trends(symbol),
get_upgrade_downgrade(symbol, days=90),
return_exceptions=True,
)
def _safe_result(result: Any, default: Any) -> Any:
return default if isinstance(result, BaseException) else result
news_sentiment = _safe_result(news_sentiment, {})
company_news = _safe_result(company_news, [])
recommendations = _safe_result(recommendations, [])
upgrades = _safe_result(upgrades, [])
# Extract key sentiment metrics
sentiment_data = news_sentiment.get("sentiment", {}) if isinstance(news_sentiment, dict) else {}
buzz_data = news_sentiment.get("buzz", {}) if isinstance(news_sentiment, dict) else {}
return {
"symbol": symbol,
"news_sentiment": {
"bullish_percent": sentiment_data.get("bullishPercent"),
"bearish_percent": sentiment_data.get("bearishPercent"),
"news_score": news_sentiment.get("companyNewsScore") if isinstance(news_sentiment, dict) else None,
"sector_avg_score": news_sentiment.get("sectorAverageNewsScore") if isinstance(news_sentiment, dict) else None,
"articles_last_week": buzz_data.get("articlesInLastWeek"),
"weekly_average": buzz_data.get("weeklyAverage"),
"buzz": buzz_data.get("buzz"),
},
"recent_news": [
{
"headline": n.get("headline"),
"source": n.get("source"),
"url": n.get("url"),
"datetime": n.get("datetime"),
"summary": n.get("summary"),
}
for n in (company_news[:10] if isinstance(company_news, list) else [])
],
"analyst_recommendations": [
{
"period": r.get("period"),
"strong_buy": r.get("strongBuy"),
"buy": r.get("buy"),
"hold": r.get("hold"),
"sell": r.get("sell"),
"strong_sell": r.get("strongSell"),
}
for r in (recommendations[:6] if isinstance(recommendations, list) else [])
],
"recent_upgrades_downgrades": [
{
"company": u.get("company"),
"action": u.get("action"),
"from_grade": u.get("fromGrade"),
"to_grade": u.get("toGrade"),
"date": u.get("gradeTime"),
}
for u in (upgrades[:10] if isinstance(upgrades, list) else [])
],
}