- Extract shared route_utils.py (validate_symbol, safe decorator)
removing duplication from 6 route files
- Extract shared obb_utils.py (to_list, extract_single, safe_last)
removing duplication from calendar_service and market_service
- Fix _to_list dict mutation during iteration (use comprehension)
- Fix double vars() call and live __dict__ mutation risk
- Fix route ordering: /etf/search and /crypto/search now registered
before /{symbol} path params to prevent shadowing
- Add date format validation (YYYY-MM-DD pattern) on calendar routes
- Use timezone-aware datetime.now(tz=timezone.utc) in all services
- Add explicit type annotation for asyncio.gather results
142 lines
4.5 KiB
Python
142 lines
4.5 KiB
Python
"""Calendar events (earnings, dividends, IPOs, splits), screening, and ownership."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from openbb import obb
|
|
|
|
from obb_utils import to_list
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_earnings_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming earnings announcements."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.earnings, **kwargs)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Earnings calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_dividend_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming dividend dates."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.dividend, **kwargs)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Dividend calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_ipo_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming IPO dates."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.ipo, **kwargs)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("IPO calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_splits_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming stock split dates."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.splits, **kwargs)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Splits calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_analyst_estimates(symbol: str) -> dict[str, Any]:
|
|
"""Get analyst consensus estimates for a symbol."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.estimates.consensus, symbol, provider="yfinance"
|
|
)
|
|
items = to_list(result)
|
|
return {"symbol": symbol, "estimates": items}
|
|
except Exception:
|
|
logger.warning("Analyst estimates failed for %s", symbol, exc_info=True)
|
|
return {"symbol": symbol, "estimates": []}
|
|
|
|
|
|
async def get_share_statistics(symbol: str) -> dict[str, Any]:
|
|
"""Get share statistics (float, shares outstanding, etc.)."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.ownership.share_statistics, symbol, provider="yfinance"
|
|
)
|
|
items = to_list(result)
|
|
return items[0] if items else {}
|
|
except Exception:
|
|
logger.warning("Share statistics failed for %s", symbol, exc_info=True)
|
|
return {}
|
|
|
|
|
|
async def get_insider_trading(symbol: str) -> list[dict[str, Any]]:
|
|
"""Get insider trading data from SEC (free)."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.ownership.insider_trading, symbol, provider="sec"
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("SEC insider trading failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_institutional_holders(symbol: str) -> list[dict[str, Any]]:
|
|
"""Get institutional holders from SEC 13F filings."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.ownership.form_13f, symbol, provider="sec"
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("13F data failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def screen_stocks() -> list[dict[str, Any]]:
|
|
"""Screen stocks using available screener."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.screener, provider="yfinance"
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Stock screener failed", exc_info=True)
|
|
return []
|