Files
Stirling-PDF/engine/tests/test_rag_routes.py
James Brunton 5541dd666c Flesh out RAG system (#6197)
# Description of Changes
Flesh out the RAG system and connect it to the PDF Question Agent so it
can respond to questions about PDFs of an extremely large size.

I'd expect lots more work will need to be done to finish off the RAG
system to really be what we need, but this should be a reasonable start
which will let us connect it to tools and have the ingestion mostly
handled automatically. I'm leaving file deletion and proper file ID
management to be done in a future PR. We also need to consider whether
all tools should retrieve content exclusively via RAG, or whether it's
beneficial to have tools sometimes fetch the direct content and other
times fetch it from RAG.

A diagram of the expected interaction is as follows:

```mermaid
sequenceDiagram
    autonumber
    actor U as User
    participant FE as Frontend<br/>(ChatPanel)
    participant J as Java<br/>(AiWorkflowService)
    participant O as Engine:<br/>OrchestratorAgent
    participant QA as Engine:<br/>PdfQuestionAgent
    participant RAG as Engine:<br/>RagService + SqliteVecStore
    participant V as VoyageAI<br/>(embeddings)
    participant L as LLM<br/>(Claude / etc.)

    U->>FE: types "Summarise this PDF"<br/>(PDF already uploaded)
    FE->>J: POST /api/v1/ai/orchestrate/stream<br/>multipart: fileInputs[], userMessage
    Note over J: ByteHashFileIdStrategy<br/>id = sha256(bytes)[:16]
    J->>O: POST /api/v1/orchestrator<br/>{ files:[{id,name}], userMessage }

    O->>L: route via fast model
    L-->>O: delegate_pdf_question
    O->>QA: PdfQuestionRequest

    loop for each file
        QA->>RAG: has_collection(file.id)
        RAG-->>QA: false
    end
    QA-->>O: NeedIngestResponse(files_to_ingest)
    O-->>J: { outcome:"need_ingest", filesToIngest:[...] }

    Note over J: onNeedIngest
    loop per file
        J->>J: PDFBox: extract page text
        J->>O: POST /api/v1/rag/documents<br/>(long-running timeout)
        O->>RAG: chunk + stage documents
        O->>V: embed_documents (batches of 256)
        V-->>O: embeddings
        O->>RAG: add_documents
        O-->>J: { chunks_indexed: N }
    end

    Note over J: retry with resumeWith=pdf_question
    J->>O: POST /api/v1/orchestrator
    Note over O: fast-path to PdfQuestionAgent

    O->>QA: PdfQuestionRequest
    Note over QA: build RagCapability<br/>pinned to file IDs
    QA->>L: run(prompt) with search_knowledge tool

    loop up to max_searches
        L->>QA: search_knowledge(query)
        QA->>V: embed_query
        V-->>QA: query vector
        QA->>RAG: search(vector, collections=[file.id])
        RAG-->>QA: top-k chunks
        QA-->>L: formatted chunks
    end

    Note over QA: once budget spent,<br/>prepare() hides the tool
    L-->>QA: PdfQuestionAnswerResponse
    QA-->>O: answer
    O-->>J: { outcome:"answer", answer, evidence }
    J-->>FE: SSE "result"
    FE->>U: assistant bubble
```
2026-05-01 14:11:54 +01:00

206 lines
6.9 KiB
Python

from __future__ import annotations
from collections.abc import Iterator
import pytest
from fastapi.testclient import TestClient
from stirling.api import app
from stirling.api.dependencies import get_rag_service
from stirling.models import FileId
from stirling.rag import Document, RagService, SqliteVecStore
class StubEmbedder:
"""Deterministic embeddings for route tests: no network, no provider needed."""
def __init__(self, dim: int = 8) -> None:
self._dim = dim
async def embed_query(self, text: str) -> list[float]:
h = hash(text) % 1000
return [(h + i) / 1000.0 for i in range(self._dim)]
async def embed_documents(self, texts: list[str]) -> list[list[float]]:
return [await self.embed_query(t) for t in texts]
def chunk_and_prepare(
self,
text: str,
source: str = "",
base_metadata: dict[str, str] | None = None,
) -> list[Document]:
from stirling.rag.chunker import chunk_text
chunks = chunk_text(text, 100, 10)
docs = []
for i, chunk in enumerate(chunks):
meta = dict(base_metadata) if base_metadata else {}
meta["source"] = source
meta["chunk_index"] = str(i)
doc_id = f"{source}:chunk:{i}" if source else f"chunk:{i}"
docs.append(Document(id=doc_id, text=chunk, metadata=meta))
return docs
def _build_service() -> RagService:
return RagService(
embedder=StubEmbedder(), # type: ignore[arg-type]
store=SqliteVecStore.ephemeral(),
default_top_k=3,
)
@pytest.fixture
def service() -> RagService:
return _build_service()
@pytest.fixture
def client(service: RagService) -> Iterator[TestClient]:
app.dependency_overrides[get_rag_service] = lambda: service
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_rag_service, None)
# ── POST /documents ─────────────────────────────────────────────────────
def test_ingest_document_indexes_page_text(client: TestClient, service: RagService) -> None:
response = client.post(
"/api/v1/rag/documents",
json={
"documentId": "doc-123",
"source": "report.pdf",
"pageText": [
{"pageNumber": 1, "text": "The introduction covers the main topic."},
{"pageNumber": 2, "text": "The conclusion summarises the findings."},
],
},
)
assert response.status_code == 200
body = response.json()
assert body["documentId"] == "doc-123"
assert body["chunksIndexed"] >= 2
@pytest.mark.anyio
async def test_ingest_document_replaces_existing_content(client: TestClient, service: RagService) -> None:
client.post(
"/api/v1/rag/documents",
json={
"documentId": "replace-me",
"source": "replace-me.pdf",
"pageText": [{"pageNumber": 1, "text": "Original content that existed before."}],
},
)
# Second ingest with different content should replace the first entirely
response = client.post(
"/api/v1/rag/documents",
json={
"documentId": "replace-me",
"source": "replace-me.pdf",
"pageText": [{"pageNumber": 1, "text": "New content that replaced the old."}],
},
)
assert response.status_code == 200
results = await service.search("New content", collection=FileId("replace-me"), top_k=5)
texts = [r.document.text for r in results]
assert any("New content" in t for t in texts)
assert not any("Original content" in t for t in texts)
def test_ingest_document_skips_empty_pages(client: TestClient) -> None:
response = client.post(
"/api/v1/rag/documents",
json={
"documentId": "mixed",
"source": "mixed.pdf",
"pageText": [
{"pageNumber": 1, "text": " "},
{"pageNumber": 2, "text": "Real content on page 2."},
],
},
)
assert response.status_code == 200
assert response.json()["chunksIndexed"] >= 1
def test_ingest_document_with_no_content_returns_zero(client: TestClient) -> None:
response = client.post("/api/v1/rag/documents", json={"documentId": "empty", "source": "empty.pdf"})
assert response.status_code == 200
assert response.json()["chunksIndexed"] == 0
def test_ingest_document_rejects_empty_id(client: TestClient) -> None:
response = client.post(
"/api/v1/rag/documents",
json={"documentId": "", "source": "x.pdf", "pageText": [{"pageNumber": 1, "text": "something"}]},
)
assert response.status_code == 422
def test_ingest_document_rejects_missing_source(client: TestClient) -> None:
response = client.post(
"/api/v1/rag/documents",
json={"documentId": "doc-1", "pageText": [{"pageNumber": 1, "text": "something"}]},
)
assert response.status_code == 422
def test_ingest_document_rejects_empty_source(client: TestClient) -> None:
response = client.post(
"/api/v1/rag/documents",
json={"documentId": "doc-1", "source": "", "pageText": [{"pageNumber": 1, "text": "something"}]},
)
assert response.status_code == 422
def test_ingest_document_rejects_non_positive_page_number(client: TestClient) -> None:
response = client.post(
"/api/v1/rag/documents",
json={
"documentId": "bad-page",
"source": "bad-page.pdf",
"pageText": [{"pageNumber": 0, "text": "something"}],
},
)
assert response.status_code == 422
# ── DELETE /documents/{id} ──────────────────────────────────────────────
def test_delete_document_reports_deleted_true_when_existed(client: TestClient) -> None:
client.post(
"/api/v1/rag/documents",
json={
"documentId": "to-delete",
"source": "to-delete.pdf",
"pageText": [{"pageNumber": 1, "text": "Text."}],
},
)
response = client.delete("/api/v1/rag/documents/to-delete")
assert response.status_code == 200
assert response.json() == {"documentId": "to-delete", "deleted": True}
def test_delete_document_is_idempotent(client: TestClient) -> None:
response = client.delete("/api/v1/rag/documents/never-existed")
assert response.status_code == 200
assert response.json() == {"documentId": "never-existed", "deleted": False}
@pytest.mark.anyio
async def test_delete_document_removes_collection(client: TestClient, service: RagService) -> None:
client.post(
"/api/v1/rag/documents",
json={"documentId": "gone", "source": "gone.pdf", "pageText": [{"pageNumber": 1, "text": "Text."}]},
)
assert await service.has_collection(FileId("gone"))
client.delete("/api/v1/rag/documents/gone")
assert not await service.has_collection(FileId("gone"))