Add 3 new service layers and route modules: - quantitative_service: Sharpe ratio, CAPM, normality tests, unit root tests - calendar_service: earnings/dividends/IPO/splits calendars, estimates, SEC ownership - market_service: ETF, index, crypto, forex, options, futures data Total endpoints: 50. All use free providers (yfinance, SEC). Update README with comprehensive endpoint documentation.
160 lines
5.0 KiB
Python
160 lines
5.0 KiB
Python
"""Calendar events (earnings, dividends, IPOs, splits), screening, and ownership."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from openbb import obb
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_earnings_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming earnings announcements."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.earnings, **kwargs)
|
|
return _to_list(result)
|
|
except Exception:
|
|
logger.warning("Earnings calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_dividend_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming dividend dates."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.dividend, **kwargs)
|
|
return _to_list(result)
|
|
except Exception:
|
|
logger.warning("Dividend calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_ipo_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming IPO dates."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.ipo, **kwargs)
|
|
return _to_list(result)
|
|
except Exception:
|
|
logger.warning("IPO calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_splits_calendar(
|
|
start_date: str | None = None, end_date: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get upcoming stock split dates."""
|
|
try:
|
|
kwargs: dict[str, Any] = {}
|
|
if start_date:
|
|
kwargs["start_date"] = start_date
|
|
if end_date:
|
|
kwargs["end_date"] = end_date
|
|
result = await asyncio.to_thread(obb.equity.calendar.splits, **kwargs)
|
|
return _to_list(result)
|
|
except Exception:
|
|
logger.warning("Splits calendar failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_analyst_estimates(symbol: str) -> dict[str, Any]:
|
|
"""Get analyst consensus estimates for a symbol."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.estimates.consensus, symbol, provider="yfinance"
|
|
)
|
|
items = _to_list(result)
|
|
return {"symbol": symbol, "estimates": items}
|
|
except Exception:
|
|
logger.warning("Analyst estimates failed for %s", symbol, exc_info=True)
|
|
return {"symbol": symbol, "estimates": []}
|
|
|
|
|
|
async def get_share_statistics(symbol: str) -> dict[str, Any]:
|
|
"""Get share statistics (float, shares outstanding, etc.)."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.ownership.share_statistics, symbol, provider="yfinance"
|
|
)
|
|
items = _to_list(result)
|
|
return items[0] if items else {}
|
|
except Exception:
|
|
logger.warning("Share statistics failed for %s", symbol, exc_info=True)
|
|
return {}
|
|
|
|
|
|
async def get_insider_trading(symbol: str) -> list[dict[str, Any]]:
|
|
"""Get insider trading data from SEC (free)."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.ownership.insider_trading, symbol, provider="sec"
|
|
)
|
|
return _to_list(result)
|
|
except Exception:
|
|
logger.warning("SEC insider trading failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_institutional_holders(symbol: str) -> list[dict[str, Any]]:
|
|
"""Get institutional holders from SEC 13F filings."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.ownership.form_13f, symbol, provider="sec"
|
|
)
|
|
return _to_list(result)
|
|
except Exception:
|
|
logger.warning("13F data failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def screen_stocks() -> list[dict[str, Any]]:
|
|
"""Screen stocks using available screener."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.equity.screener, provider="yfinance"
|
|
)
|
|
return _to_list(result)
|
|
except Exception:
|
|
logger.warning("Stock screener failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
def _to_list(result: Any) -> list[dict[str, Any]]:
|
|
"""Convert OBBject result to list of dicts."""
|
|
if result is None or result.results is None:
|
|
return []
|
|
items = result.results
|
|
if not isinstance(items, list):
|
|
items = [items]
|
|
out = []
|
|
for item in items:
|
|
if hasattr(item, "model_dump"):
|
|
d = item.model_dump()
|
|
else:
|
|
d = vars(item) if vars(item) else {}
|
|
for k, v in d.items():
|
|
if hasattr(v, "isoformat"):
|
|
d[k] = v.isoformat()
|
|
out.append(d)
|
|
return out
|