"""Economy surveys: Michigan, SLOOS, NFP, Empire State, BLS.""" import asyncio import logging from typing import Any from openbb import obb from obb_utils import to_list logger = logging.getLogger(__name__) async def get_michigan() -> list[dict[str, Any]]: """Get University of Michigan Consumer Sentiment + inflation expectations.""" try: result = await asyncio.to_thread( obb.economy.survey.university_of_michigan, provider="fred" ) return to_list(result) except Exception: logger.warning("Michigan survey failed", exc_info=True) return [] async def get_sloos() -> list[dict[str, Any]]: """Get Senior Loan Officer Opinion Survey (recession predictor).""" try: result = await asyncio.to_thread( obb.economy.survey.sloos, provider="fred" ) return to_list(result) except Exception: logger.warning("SLOOS failed", exc_info=True) return [] async def get_nonfarm_payrolls() -> list[dict[str, Any]]: """Get detailed employment data (NFP).""" try: result = await asyncio.to_thread( obb.economy.survey.nonfarm_payrolls, provider="fred" ) return to_list(result) except Exception: logger.warning("NFP failed", exc_info=True) return [] async def get_empire_state() -> list[dict[str, Any]]: """Get Empire State Manufacturing Survey.""" try: result = await asyncio.to_thread( obb.economy.survey.manufacturing_outlook_ny, provider="fred" ) return to_list(result) except Exception: logger.warning("Empire State failed", exc_info=True) return [] async def bls_search(query: str) -> list[dict[str, Any]]: """Search BLS data series.""" try: result = await asyncio.to_thread( obb.economy.survey.bls_search, query, provider="bls" ) return to_list(result) except Exception: logger.warning("BLS search failed for %s", query, exc_info=True) return []