"""A-share and HK stock data service using AKShare.""" import asyncio import logging import re from datetime import datetime, timedelta from typing import Any import akshare as ak import pandas as pd logger = logging.getLogger(__name__) # --- Symbol validation patterns --- _A_SHARE_PATTERN = re.compile(r"^[036]\d{5}$") _HK_PATTERN = re.compile(r"^\d{5}$") # --- Chinese column name mappings --- _HIST_COLUMNS: dict[str, str] = { "日期": "date", "开盘": "open", "收盘": "close", "最高": "high", "最低": "low", "成交量": "volume", "成交额": "turnover", "振幅": "amplitude", "涨跌幅": "change_percent", "涨跌额": "change", "换手率": "turnover_rate", } _QUOTE_COLUMNS: dict[str, str] = { "代码": "symbol", "名称": "name", "最新价": "price", "涨跌幅": "change_percent", "涨跌额": "change", "成交量": "volume", "成交额": "turnover", "今开": "open", "最高": "high", "最低": "low", "昨收": "prev_close", } # --- Validation helpers --- def validate_a_share_symbol(symbol: str) -> bool: """Return True if symbol matches A-share format (6 digits, starts with 0, 3, or 6).""" return bool(_A_SHARE_PATTERN.match(symbol)) def validate_hk_symbol(symbol: str) -> bool: """Return True if symbol matches HK stock format (exactly 5 digits).""" return bool(_HK_PATTERN.match(symbol)) # --- DataFrame parsers --- def _parse_hist_df(df: pd.DataFrame) -> list[dict[str, Any]]: """Convert a Chinese-column historical DataFrame to a list of English-key dicts.""" if df.empty: return [] df = df.rename(columns=_HIST_COLUMNS) # Serialize date column to ISO string if "date" in df.columns: df["date"] = df["date"].astype(str) return df.to_dict(orient="records") def _parse_spot_row(df: pd.DataFrame, symbol: str) -> dict[str, Any] | None: """ Filter a spot quote DataFrame by symbol code column and return an English-key dict for the matching row, or None if not found. """ if df.empty: return None code_col = "代码" if code_col not in df.columns: return None matched = df[df[code_col] == symbol] if matched.empty: return None row = matched.iloc[0] result: dict[str, Any] = {} for cn_key, en_key in _QUOTE_COLUMNS.items(): result[en_key] = row.get(cn_key) return result # --- Date helpers --- def _date_range(days: int) -> tuple[str, str]: """Return (start_date, end_date) strings in YYYYMMDD format for the given window.""" end = datetime.now() start = end - timedelta(days=days) return start.strftime("%Y%m%d"), end.strftime("%Y%m%d") # --- A-share service functions --- async def get_a_share_quote(symbol: str) -> dict[str, Any] | None: """ Fetch real-time A-share quote for a single symbol. Returns a dict with English keys, or None if the symbol is not found in the spot market data. Propagates AKShare exceptions to the caller. """ df: pd.DataFrame = await asyncio.to_thread(ak.stock_zh_a_spot_em) return _parse_spot_row(df, symbol) async def get_a_share_historical( symbol: str, *, days: int = 365 ) -> list[dict[str, Any]]: """ Fetch daily OHLCV history for an A-share symbol with qfq (前复权) adjustment. Propagates AKShare exceptions to the caller. """ start_date, end_date = _date_range(days) df: pd.DataFrame = await asyncio.to_thread( ak.stock_zh_a_hist, symbol=symbol, period="daily", start_date=start_date, end_date=end_date, adjust="qfq", ) return _parse_hist_df(df) async def search_a_shares(query: str) -> list[dict[str, Any]]: """ Search A-share stocks by name substring. Returns a list of {code, name} dicts. An empty query returns all stocks. Propagates AKShare exceptions to the caller. """ df: pd.DataFrame = await asyncio.to_thread(ak.stock_info_a_code_name) if query: df = df[df["name"].str.contains(query, na=False)] return df[["code", "name"]].to_dict(orient="records") # --- HK stock service functions --- async def get_hk_quote(symbol: str) -> dict[str, Any] | None: """ Fetch real-time HK stock quote for a single symbol. Returns a dict with English keys, or None if the symbol is not found. Propagates AKShare exceptions to the caller. """ df: pd.DataFrame = await asyncio.to_thread(ak.stock_hk_spot_em) return _parse_spot_row(df, symbol) async def get_hk_historical( symbol: str, *, days: int = 365 ) -> list[dict[str, Any]]: """ Fetch daily OHLCV history for a HK stock symbol with qfq adjustment. Propagates AKShare exceptions to the caller. """ start_date, end_date = _date_range(days) df: pd.DataFrame = await asyncio.to_thread( ak.stock_hk_hist, symbol=symbol, period="daily", start_date=start_date, end_date=end_date, adjust="qfq", ) return _parse_hist_df(df)