- SSRF protection: private IP blocking, DNS rebinding defense, redirect validation - OpenAPI fetcher with SSRF guard, JSON/YAML auto-detection, 10MB limit - Structural spec validator (3.0.x/3.1.x) - Endpoint parser with $ref resolution, auto-generated operation IDs - Heuristic + LLM endpoint classifier with Protocol interface - Review API at /api/openapi (import, job status, classification CRUD, approve) - @tool code generator + Agent YAML generator - Import orchestrator (fetch -> validate -> parse -> classify pipeline) - 125 new tests, 322 total passing, 93.23% coverage
121 lines
4.3 KiB
Python
121 lines
4.3 KiB
Python
"""Tests for OpenAPI spec fetcher module.
|
|
|
|
RED phase: written before implementation.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from app.openapi.ssrf import SSRFError
|
|
|
|
pytestmark = pytest.mark.unit
|
|
|
|
_SAMPLE_JSON = '{"openapi": "3.0.0", "info": {"title": "Test", "version": "1.0"}, "paths": {}}'
|
|
_SAMPLE_YAML = "openapi: '3.0.0'\ninfo:\n title: Test\n version: '1.0'\npaths: {}\n"
|
|
_PUBLIC_IP = "93.184.216.34"
|
|
|
|
|
|
@pytest.fixture
|
|
def _mock_public_dns():
|
|
with patch("app.openapi.ssrf.resolve_hostname", return_value=[_PUBLIC_IP]):
|
|
yield
|
|
|
|
|
|
class TestFetchSpec:
|
|
"""Tests for fetch_spec function."""
|
|
|
|
@pytest.mark.usefixtures("_mock_public_dns")
|
|
async def test_fetch_json_spec_succeeds(self, httpx_mock) -> None:
|
|
"""Fetch a JSON spec and return parsed dict."""
|
|
from app.openapi.fetcher import fetch_spec
|
|
|
|
httpx_mock.add_response(
|
|
url="https://example.com/spec.json",
|
|
text=_SAMPLE_JSON,
|
|
headers={"content-type": "application/json"},
|
|
)
|
|
result = await fetch_spec("https://example.com/spec.json")
|
|
assert isinstance(result, dict)
|
|
assert result["openapi"] == "3.0.0"
|
|
|
|
@pytest.mark.usefixtures("_mock_public_dns")
|
|
async def test_fetch_yaml_spec_succeeds(self, httpx_mock) -> None:
|
|
"""Fetch a YAML spec and return parsed dict."""
|
|
from app.openapi.fetcher import fetch_spec
|
|
|
|
httpx_mock.add_response(
|
|
url="https://example.com/spec.yaml",
|
|
text=_SAMPLE_YAML,
|
|
headers={"content-type": "application/x-yaml"},
|
|
)
|
|
result = await fetch_spec("https://example.com/spec.yaml")
|
|
assert isinstance(result, dict)
|
|
assert result["openapi"] == "3.0.0"
|
|
|
|
@pytest.mark.usefixtures("_mock_public_dns")
|
|
async def test_fetch_yaml_by_url_extension(self, httpx_mock) -> None:
|
|
"""Auto-detect YAML format from .yaml URL extension."""
|
|
from app.openapi.fetcher import fetch_spec
|
|
|
|
httpx_mock.add_response(
|
|
url="https://example.com/api.yaml",
|
|
text=_SAMPLE_YAML,
|
|
headers={"content-type": "text/plain"},
|
|
)
|
|
result = await fetch_spec("https://example.com/api.yaml")
|
|
assert isinstance(result, dict)
|
|
assert result["openapi"] == "3.0.0"
|
|
|
|
async def test_ssrf_blocked_url_raises(self) -> None:
|
|
"""SSRF-blocked URL raises SSRFError."""
|
|
from app.openapi.fetcher import fetch_spec
|
|
|
|
with (
|
|
patch("app.openapi.ssrf.resolve_hostname", return_value=["10.0.0.1"]),
|
|
pytest.raises(SSRFError),
|
|
):
|
|
await fetch_spec("http://internal.corp/spec.json")
|
|
|
|
@pytest.mark.usefixtures("_mock_public_dns")
|
|
async def test_oversized_response_raises(self, httpx_mock) -> None:
|
|
"""Response exceeding 10MB raises ValueError."""
|
|
from app.openapi.fetcher import fetch_spec
|
|
|
|
big_content = "x" * (10 * 1024 * 1024 + 1)
|
|
httpx_mock.add_response(
|
|
url="https://example.com/huge.json",
|
|
text=big_content,
|
|
headers={"content-type": "application/json"},
|
|
)
|
|
with pytest.raises(ValueError, match="too large"):
|
|
await fetch_spec("https://example.com/huge.json")
|
|
|
|
@pytest.mark.usefixtures("_mock_public_dns")
|
|
async def test_invalid_json_raises(self, httpx_mock) -> None:
|
|
"""Non-parseable JSON raises ValueError."""
|
|
from app.openapi.fetcher import fetch_spec
|
|
|
|
httpx_mock.add_response(
|
|
url="https://example.com/bad.json",
|
|
text="not valid json {{{",
|
|
headers={"content-type": "application/json"},
|
|
)
|
|
with pytest.raises(ValueError, match="[Pp]arse|[Ii]nvalid|[Dd]ecode"):
|
|
await fetch_spec("https://example.com/bad.json")
|
|
|
|
@pytest.mark.usefixtures("_mock_public_dns")
|
|
async def test_invalid_yaml_raises(self, httpx_mock) -> None:
|
|
"""Non-parseable YAML raises ValueError."""
|
|
from app.openapi.fetcher import fetch_spec
|
|
|
|
httpx_mock.add_response(
|
|
url="https://example.com/bad.yaml",
|
|
text=": invalid: yaml: {\n",
|
|
headers={"content-type": "application/x-yaml"},
|
|
)
|
|
with pytest.raises(ValueError, match="[Pp]arse|[Ii]nvalid|[Yy]AML"):
|
|
await fetch_spec("https://example.com/bad.yaml")
|