"""OpenAPI spec endpoint parser. Extracts all endpoint definitions from a parsed OpenAPI spec dict, resolving $ref references from components. """ from __future__ import annotations import re from app.openapi.models import EndpointInfo, ParameterInfo _HTTP_METHODS = ("get", "post", "put", "patch", "delete", "head", "options") def parse_endpoints(spec_dict: dict) -> tuple[EndpointInfo, ...]: """Parse all endpoints from a validated OpenAPI spec dict. Returns an immutable tuple of EndpointInfo instances. """ paths = spec_dict.get("paths", {}) if not isinstance(paths, dict) or not paths: return () endpoints: list[EndpointInfo] = [] for path, path_item in paths.items(): if not isinstance(path_item, dict): continue for method in _HTTP_METHODS: operation = path_item.get(method) if operation is None: continue endpoint = _parse_operation(path, method.upper(), operation, spec_dict) endpoints.append(endpoint) return tuple(endpoints) def _parse_operation( path: str, method: str, operation: dict, spec_dict: dict, ) -> EndpointInfo: """Parse a single operation dict into an EndpointInfo.""" operation_id = operation.get("operationId") or _generate_operation_id(path, method) summary = operation.get("summary", "") description = operation.get("description", "") parameters = _parse_parameters(operation.get("parameters", []), spec_dict) request_body_schema = _parse_request_body(operation.get("requestBody"), spec_dict) response_schema = _parse_response_schema(operation.get("responses", {}), spec_dict) return EndpointInfo( path=path, method=method, operation_id=operation_id, summary=summary, description=description, parameters=tuple(parameters), request_body_schema=request_body_schema, response_schema=response_schema, ) def _parse_parameters( params_list: list, spec_dict: dict, ) -> list[ParameterInfo]: """Parse list of parameter dicts into ParameterInfo instances.""" result: list[ParameterInfo] = [] for param in params_list: if not isinstance(param, dict): continue schema = param.get("schema", {}) schema_type = schema.get("type", "string") if isinstance(schema, dict) else "string" result.append( ParameterInfo( name=param.get("name", ""), location=param.get("in", "query"), required=bool(param.get("required", False)), schema_type=schema_type, description=param.get("description", ""), ) ) return result def _parse_request_body(request_body: dict | None, spec_dict: dict) -> dict | None: """Extract schema from requestBody, resolving $ref if present.""" if not isinstance(request_body, dict): return None content = request_body.get("content", {}) if not isinstance(content, dict): return None # Prefer application/json for media_type in ("application/json", *content.keys()): media = content.get(media_type) if not isinstance(media, dict): continue schema = media.get("schema") if schema: return _resolve_ref(schema, spec_dict) return None def _parse_response_schema(responses: dict, spec_dict: dict) -> dict | None: """Extract schema from the first 2xx response.""" if not isinstance(responses, dict): return None for status_code in ("200", "201", "202", "204"): response = responses.get(status_code) if not isinstance(response, dict): continue content = response.get("content", {}) if not isinstance(content, dict): continue for media_type in ("application/json", *content.keys()): media = content.get(media_type) if not isinstance(media, dict): continue schema = media.get("schema") if schema: return _resolve_ref(schema, spec_dict) return None def _resolve_ref(schema: object, spec_dict: dict) -> dict: """Resolve a $ref to its target schema, or return the schema as-is.""" if not isinstance(schema, dict): return {} ref = schema.get("$ref") if not ref: return schema # Only handle local refs like #/components/schemas/Foo if not isinstance(ref, str) or not ref.startswith("#/"): return schema parts = ref.lstrip("#/").split("/") target: object = spec_dict for part in parts: if not isinstance(target, dict): return schema target = target.get(part) return target if isinstance(target, dict) else schema def _generate_operation_id(path: str, method: str) -> str: """Generate a snake_case operation_id from path and method.""" # Remove path parameters braces and replace / with _ clean = re.sub(r"\{[^}]+\}", "by_param", path) clean = re.sub(r"[^a-zA-Z0-9]+", "_", clean).strip("_") return f"{method.lower()}_{clean}" if clean else method.lower()