- Extract shared route_utils.py (validate_symbol, safe decorator)
removing duplication from 6 route files
- Extract shared obb_utils.py (to_list, extract_single, safe_last)
removing duplication from calendar_service and market_service
- Fix _to_list dict mutation during iteration (use comprehension)
- Fix double vars() call and live __dict__ mutation risk
- Fix route ordering: /etf/search and /crypto/search now registered
before /{symbol} path params to prevent shadowing
- Add date format validation (YYYY-MM-DD pattern) on calendar routes
- Use timezone-aware datetime.now(tz=timezone.utc) in all services
- Add explicit type annotation for asyncio.gather results
164 lines
5.0 KiB
Python
164 lines
5.0 KiB
Python
"""Market data: ETFs, indices, crypto, currencies, and derivatives."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Any
|
|
|
|
from openbb import obb
|
|
|
|
from obb_utils import to_list
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PROVIDER = "yfinance"
|
|
|
|
|
|
# --- ETF ---
|
|
|
|
|
|
async def get_etf_info(symbol: str) -> dict[str, Any]:
|
|
"""Get ETF profile/info."""
|
|
try:
|
|
result = await asyncio.to_thread(obb.etf.info, symbol, provider=PROVIDER)
|
|
items = to_list(result)
|
|
return items[0] if items else {}
|
|
except Exception:
|
|
logger.warning("ETF info failed for %s", symbol, exc_info=True)
|
|
return {}
|
|
|
|
|
|
async def get_etf_historical(symbol: str, days: int = 365) -> list[dict[str, Any]]:
|
|
"""Get ETF price history."""
|
|
start = (datetime.now(tz=timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.etf.historical, symbol, start_date=start, provider=PROVIDER
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("ETF historical failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def search_etf(query: str) -> list[dict[str, Any]]:
|
|
"""Search for ETFs by name or keyword."""
|
|
try:
|
|
result = await asyncio.to_thread(obb.etf.search, query)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("ETF search failed for %s", query, exc_info=True)
|
|
return []
|
|
|
|
|
|
# --- Index ---
|
|
|
|
|
|
async def get_available_indices() -> list[dict[str, Any]]:
|
|
"""List available market indices."""
|
|
try:
|
|
result = await asyncio.to_thread(obb.index.available, provider=PROVIDER)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Available indices failed", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_index_historical(symbol: str, days: int = 365) -> list[dict[str, Any]]:
|
|
"""Get index price history."""
|
|
start = (datetime.now(tz=timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.index.price.historical, symbol, start_date=start, provider=PROVIDER
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Index historical failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
# --- Crypto ---
|
|
|
|
|
|
async def get_crypto_historical(symbol: str, days: int = 365) -> list[dict[str, Any]]:
|
|
"""Get cryptocurrency price history."""
|
|
start = (datetime.now(tz=timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.crypto.price.historical, symbol, start_date=start, provider=PROVIDER
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Crypto historical failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def search_crypto(query: str) -> list[dict[str, Any]]:
|
|
"""Search for cryptocurrencies."""
|
|
try:
|
|
result = await asyncio.to_thread(obb.crypto.search, query)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Crypto search failed for %s", query, exc_info=True)
|
|
return []
|
|
|
|
|
|
# --- Currency ---
|
|
|
|
|
|
async def get_currency_historical(
|
|
symbol: str, days: int = 365
|
|
) -> list[dict[str, Any]]:
|
|
"""Get forex price history (e.g., EURUSD)."""
|
|
start = (datetime.now(tz=timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.currency.price.historical, symbol, start_date=start, provider=PROVIDER
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Currency historical failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
# --- Derivatives ---
|
|
|
|
|
|
async def get_options_chains(symbol: str) -> list[dict[str, Any]]:
|
|
"""Get options chain data for a symbol."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.derivatives.options.chains, symbol, provider=PROVIDER
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Options chains failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_futures_historical(
|
|
symbol: str, days: int = 365
|
|
) -> list[dict[str, Any]]:
|
|
"""Get futures price history."""
|
|
start = (datetime.now(tz=timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.derivatives.futures.historical, symbol, start_date=start, provider=PROVIDER
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Futures historical failed for %s", symbol, exc_info=True)
|
|
return []
|
|
|
|
|
|
async def get_futures_curve(symbol: str) -> list[dict[str, Any]]:
|
|
"""Get futures term structure/curve."""
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
obb.derivatives.futures.curve, symbol, provider=PROVIDER
|
|
)
|
|
return to_list(result)
|
|
except Exception:
|
|
logger.warning("Futures curve failed for %s", symbol, exc_info=True)
|
|
return []
|