"""Database connection pool and PostgresSaver checkpoint setup.""" from __future__ import annotations from typing import TYPE_CHECKING from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from psycopg.rows import dict_row from psycopg_pool import AsyncConnectionPool if TYPE_CHECKING: from app.config import Settings _CONVERSATIONS_DDL = """ CREATE TABLE IF NOT EXISTS conversations ( thread_id TEXT PRIMARY KEY, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), last_activity TIMESTAMPTZ NOT NULL DEFAULT NOW(), total_tokens INTEGER NOT NULL DEFAULT 0, total_cost_usd DOUBLE PRECISION NOT NULL DEFAULT 0.0, status TEXT NOT NULL DEFAULT 'active' ); """ _INTERRUPTS_DDL = """ CREATE TABLE IF NOT EXISTS active_interrupts ( interrupt_id TEXT PRIMARY KEY, thread_id TEXT NOT NULL REFERENCES conversations(thread_id), action TEXT NOT NULL, params JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), resolved_at TIMESTAMPTZ, resolution TEXT ); """ _ANALYTICS_EVENTS_DDL = """ CREATE TABLE IF NOT EXISTS analytics_events ( id BIGSERIAL PRIMARY KEY, thread_id TEXT NOT NULL, event_type TEXT NOT NULL, agent_name TEXT, tool_name TEXT, tokens_used INTEGER NOT NULL DEFAULT 0, cost_usd DOUBLE PRECISION NOT NULL DEFAULT 0.0, duration_ms INTEGER, success BOOLEAN, error_message TEXT, metadata JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ _CONVERSATIONS_MIGRATION_DDL = """ ALTER TABLE conversations ADD COLUMN IF NOT EXISTS resolution_type TEXT, ADD COLUMN IF NOT EXISTS agents_used TEXT[], ADD COLUMN IF NOT EXISTS turn_count INTEGER NOT NULL DEFAULT 0, ADD COLUMN IF NOT EXISTS ended_at TIMESTAMPTZ; """ async def create_pool(settings: Settings) -> AsyncConnectionPool: """Create an async connection pool with the required psycopg settings.""" pool = AsyncConnectionPool( conninfo=settings.database_url, kwargs={"autocommit": True, "row_factory": dict_row}, min_size=2, max_size=10, ) await pool.open() return pool async def create_checkpointer(pool: AsyncConnectionPool) -> AsyncPostgresSaver: """Create and initialize the LangGraph checkpointer.""" checkpointer = AsyncPostgresSaver(conn=pool) await checkpointer.setup() return checkpointer async def setup_app_tables(pool: AsyncConnectionPool) -> None: """Create application-specific tables and apply migrations.""" async with pool.connection() as conn: await conn.execute(_CONVERSATIONS_DDL) await conn.execute(_INTERRUPTS_DDL) await conn.execute(_ANALYTICS_EVENTS_DDL) await conn.execute(_CONVERSATIONS_MIGRATION_DDL)