10 Commits

Author SHA1 Message Date
Vera-AI
5b44ff16ac docs: add Docker Hub publish workflow and git mirror remotes 2026-04-01 17:49:03 -05:00
Vera-AI
346f2c26fe feat: semantic search includes raw turns, deduplicate layers, fix recent turn ordering
- Layer 2 semantic search now queries both curated and raw types,
  closing the blind spot for turns past the 50-turn window pre-curation
- Layer 3 skips turns already returned by Layer 2 to avoid duplicate
  context and wasted token budget
- get_recent_turns uses Qdrant OrderBy for server-side timestamp sort
  with payload index; fallback to client-side sort if unavailable
- Bump version to 2.0.4
2026-04-01 17:43:47 -05:00
Claude Code
de7f3a78ab test: expand coverage to 92% with production-realistic fixtures
Add conftest.py with shared fixtures, new test_qdrant_service.py covering
all QdrantService methods, and expand curator/proxy/integration/utils tests
to cover debug_log, forward_to_ollama, curator.run(), load_system_prompt,
and health check edge cases.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:20:35 -05:00
Claude Code
6154e7974e fix: add debug log file locking, improve error logging, validate cloud API key
- Add portalocker file locking to debug_log() to prevent interleaved entries
- Add exc_info=True to curator _call_llm error logging for stack traces
- Add debug log message on JSON parse fallback in _parse_json_response
- Warn when cloud is enabled but API key env var is not set

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:15:56 -05:00
Claude Code
cbe12f0ebd chore: remove dead calculate_token_budget, fix hardcoded timestamp
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:13:40 -05:00
Claude Code
9fa5d08ce0 refactor: consolidate duplicate QdrantService singleton into singleton.py
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:12:21 -05:00
Claude Code
90dd87edeb fix: system prompt appends to caller's system message, empty = passthrough
Handle all 4 combinations of caller system message and systemprompt.md
correctly: append when both exist, passthrough when only one exists,
omit when neither exists. Fixes leading \n\n when no caller system msg.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:10:24 -05:00
Claude Code
2801a63b11 fix: correct timestamp parsing bug - chained .replace() was stripping timezone
The chained .replace("Z", "+00:00").replace("+00:00", "") calls were
undoing each other, causing Z-suffixed timestamps to lose timezone info.
Now strips "Z" directly and ensures naive datetime for cutoff comparison.
Added regression test for old Z-suffixed timestamps.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:07:34 -05:00
Claude Code
355986a59f fix: replace deprecated datetime.utcnow() with timezone-aware alternative
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:06:00 -05:00
Claude Code
600f9deec1 chore: pin deps to production versions, replace toml with tomllib
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:03:16 -05:00
15 changed files with 1221 additions and 242 deletions

269
CLAUDE.md
View File

@@ -1,128 +1,187 @@
# Vera-AI Project # CLAUDE.md
**Persistent Memory Proxy for Ollama** This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
> **Status:** Built and running on deb8. Goal: Validate and improve. ## Infrastructure
Vera-AI sits between AI clients and Ollama, storing conversations in Qdrant and retrieving context semantically — giving AI **true memory**. | Role | Host | Access |
|------|------|--------|
| Source (deb9) | 10.0.0.48 | `ssh deb9``/home/n8n/vera-ai/` |
| Production (deb8) | 10.0.0.46 | `ssh deb8` — runs vera-ai in Docker |
| Gitea | 10.0.0.61:3000 | `SpeedyFoxAi/vera-ai-v2`, HTTPS only (SSH disabled) |
User `n8n` on deb8/deb9. SSH key `~/.ssh/vera-ai`. Gitea credentials in `~/.netrc`.
## Git Workflow
Three locations — all point to `origin` on Gitea:
```
local (/home/adm1n/claude/vera-ai) ←→ Gitea (10.0.0.61:3000) ←→ deb9 (/home/n8n/vera-ai)
↓ ↓
github/gitlab deb8 (scp files + docker build)
(mirrors)
```
```bash
# Edit on deb9, commit, push
ssh deb9
cd /home/n8n/vera-ai
git pull origin main # sync first
git add -p && git commit -m "..."
git push origin main
# Pull to local working copy
cd /home/adm1n/claude/vera-ai
git pull origin main
# Deploy to production (deb8 has no git repo — scp files then build)
scp app/*.py n8n@10.0.0.46:/home/n8n/vera-ai/app/
ssh deb8 'cd /home/n8n/vera-ai && docker compose build && docker compose up -d'
```
## Publishing (Docker Hub + Git Mirrors)
Image: `mdkrushr/vera-ai` on Docker Hub. Build and push from deb8:
```bash
ssh deb8
cd /home/n8n/vera-ai
docker build -t mdkrushr/vera-ai:2.0.4 -t mdkrushr/vera-ai:latest .
docker push mdkrushr/vera-ai:2.0.4
docker push mdkrushr/vera-ai:latest
```
The local repo has two mirror remotes for public distribution. After committing and pushing to `origin` (Gitea), mirror with:
```bash
git push github main --tags
git push gitlab main --tags
```
| Remote | URL |
|--------|-----|
| `origin` | `10.0.0.61:3000/SpeedyFoxAi/vera-ai-v2` (Gitea, primary) |
| `github` | `github.com/speedyfoxai/vera-ai` |
| `gitlab` | `gitlab.com/mdkrush/vera-ai` |
## Build & Run (deb8, production)
```bash
ssh deb8
cd /home/n8n/vera-ai
docker compose build
docker compose up -d
docker logs vera-ai --tail 30
curl http://localhost:11434/ # health check
curl -X POST http://localhost:11434/curator/run # trigger curation
```
## Tests (deb9, source)
```bash
ssh deb9
cd /home/n8n/vera-ai
python3 -m pytest tests/ # all tests
python3 -m pytest tests/test_utils.py # single file
python3 -m pytest tests/test_utils.py::TestParseCuratedTurn::test_single_turn # single test
python3 -m pytest tests/ --cov=app --cov-report=term-missing # with coverage
```
Tests are unit-only — no live Qdrant/Ollama required. `pytest.ini` sets `asyncio_mode=auto`. Shared fixtures with production-realistic data in `tests/conftest.py`.
Test files and what they cover:
| File | Covers |
|------|--------|
| `tests/test_utils.py` | Token counting, truncation, memory filtering/merging, `parse_curated_turn`, `load_system_prompt`, `build_augmented_messages` |
| `tests/test_config.py` | Config defaults, TOML loading, `CloudConfig`, env var overrides |
| `tests/test_curator.py` | JSON parsing, `_is_recent`, `_format_raw_turns`, `_format_existing_memories`, `_call_llm`, `_append_rule_to_file`, `load_curator_prompt`, full `run()` scenarios |
| `tests/test_proxy_handler.py` | `clean_message_content`, `handle_chat_non_streaming`, `debug_log`, `forward_to_ollama` |
| `tests/test_integration.py` | FastAPI health check, `/api/tags` (with cloud models), `/api/chat` round-trips (streaming + non-streaming), curator trigger, proxy passthrough |
| `tests/test_qdrant_service.py` | `_ensure_collection`, `get_embedding`, `store_turn`, `store_qa_turn`, `semantic_search`, `get_recent_turns`, `delete_points`, `close` |
## Architecture ## Architecture
``` ```
Client → Vera-AI (port 11434) → Ollama Client → Vera-AI :11434 → Ollama :11434
↓↑
Qdrant (vector DB) Qdrant :6333
Memory Storage
``` ```
## Key Components Vera-AI is a FastAPI proxy. Every `/api/chat` request is intercepted, augmented with memory context, forwarded to Ollama, and the response Q&A is stored back in Qdrant.
| File | Purpose | ### 4-Layer Context System (`app/utils.py:build_augmented_messages`)
|------|---------|
| `app/main.py` | FastAPI application entry point |
| `app/proxy_handler.py` | Chat request handling |
| `app/qdrant_service.py` | Vector DB operations |
| `app/curator.py` | Memory curation (daily/monthly) |
| `app/config.py` | Configuration loader |
| `config/config.toml` | Main configuration file |
## 4-Layer Context System Each chat request builds an augmented message list in this order:
1. **System Prompt**From `prompts/systemprompt.md` 1. **System**caller's system prompt passed through; `prompts/systemprompt.md` appended if non-empty (if empty, caller's prompt passes through unchanged; if no caller system prompt, vera's prompt used alone)
2. **Semantic Memory**Curated Q&A from Qdrant (relevance search) 2. **Semantic**curated AND raw Q&A pairs from Qdrant matching the query (score ≥ `semantic_score_threshold`, up to `semantic_token_budget` tokens). Searches both types to avoid a blind spot where raw turns fall off the recent window before curation runs.
3. **Recent Context**Last N conversation turns 3. **Recent context**last 50 turns from Qdrant (server-sorted by timestamp via payload index), oldest first, up to `context_token_budget` tokens. Deduplicates against Layer 2 results to avoid wasting token budget.
4. **Current Messages** — User's current request 4. **Current** — the incoming messages (non-system) passed through unchanged
The system prompt is **never truncated**. Semantic and context layers are budget-limited and drop excess entries silently.
### Memory Types in Qdrant
| Type | When created | Retention |
|------|-------------|-----------|
| `raw` | After each chat turn | Until curation runs |
| `curated` | After curator processes `raw` | Permanent |
Payload format: `{type, text, timestamp, role, content}`. Curated entries use `role="curated"` with text formatted as `User: ...\nAssistant: ...\nTimestamp: ...`, which `parse_curated_turn()` deserializes back into proper message role pairs at retrieval time.
### Curator (`app/curator.py`)
Scheduled via APScheduler at `config.run_time` (default 02:00). Automatically detects day 01 of month for monthly mode (processes ALL raw) vs. daily mode (last 24h only). Sends raw memories to `curator_model` LLM with `prompts/curator_prompt.md`, expects JSON response:
```json
{
"new_curated_turns": [{"content": "User: ...\nAssistant: ..."}],
"permanent_rules": [{"rule": "...", "target_file": "systemprompt.md"}],
"deletions": ["uuid1", "uuid2"],
"summary": "..."
}
```
`permanent_rules` are appended to the named file in `prompts/`. After curation, all processed raw entries are deleted.
### Cloud Model Routing
Optional `[cloud]` section in `config.toml` routes specific model names to an OpenRouter-compatible API instead of Ollama. Cloud models are injected into `/api/tags` so clients see them alongside local models.
```toml
[cloud]
enabled = true
api_base = "https://openrouter.ai/api/v1"
api_key_env = "OPENROUTER_API_KEY"
[cloud.models]
"gpt-oss:120b" = "openai/gpt-4o"
```
### Key Implementation Details
- **Config loading** uses stdlib `tomllib` (read-only, Python 3.11+). No third-party TOML dependency.
- **QdrantService singleton** lives in `app/singleton.py`. All modules import from there — `app/utils.py` re-exports via `from .singleton import get_qdrant_service`.
- **Datetime handling** uses `datetime.now(timezone.utc)` throughout. No `utcnow()` calls. Stored timestamps are naive UTC with "Z" suffix; comparison code strips tzinfo for naive-vs-naive matching.
- **Debug logging** in `proxy_handler.py` uses `portalocker` for file locking under concurrent requests. Controlled by `config.debug`.
## Configuration ## Configuration
Key settings in `config/config.toml`: All settings in `config/config.toml`. Key tuning knobs:
```toml - `semantic_token_budget` / `context_token_budget` — controls how much memory gets injected
[general] - `semantic_score_threshold` — lower = more (but less relevant) memories returned
ollama_host = "http://10.0.0.10:11434" - `curator_model` — model used for daily curation (needs strong reasoning)
qdrant_host = "http://10.0.0.22:6333" - `debug = true` — enables per-request JSON logs written to `logs/debug_YYYY-MM-DD.log`
qdrant_collection = "memories"
embedding_model = "snowflake-arctic-embed2"
[layers] Environment variable overrides: `VERA_CONFIG_DIR`, `VERA_PROMPTS_DIR`, `VERA_LOG_DIR`.
semantic_token_budget = 25000
context_token_budget = 22000
semantic_search_turns = 2
semantic_score_threshold = 0.6
[curator] ## Related Services
run_time = "02:00" # Daily curation time
curator_model = "gpt-oss:120b"
```
## Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `APP_UID` | `999` | Container user ID |
| `APP_GID` | `999` | Container group ID |
| `TZ` | `UTC` | Timezone |
| `VERA_DEBUG` | `false` | Enable debug logging |
## Running
```bash
# Build and start
docker compose build
docker compose up -d
# Check status
docker ps
docker logs VeraAI --tail 20
# Health check
curl http://localhost:11434/
```
## API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/` | GET | Health check |
| `/api/chat` | POST | Chat completion (with memory) |
| `/api/tags` | GET | List models |
| `/api/generate` | POST | Generate completion |
| `/curator/run` | POST | Trigger curation manually |
## Development Workflow
This project is synced with **deb9** (10.0.0.48). To sync changes:
```bash
# Pull from deb9
sshpass -p 'passw0rd' scp -r -o StrictHostKeyChecking=no n8n@10.0.0.48:/home/n8n/vera-ai/* /home/n8n/vera-ai/
# Push to deb9 (after local changes)
sshpass -p 'passw0rd' scp -r -o StrictHostKeyChecking=no /home/n8n/vera-ai/* n8n@10.0.0.48:/home/n8n/vera-ai/
```
## Memory System
- **raw** memories — Unprocessed conversation turns (until curation)
- **curated** memories — Cleaned Q&A pairs (permanent)
- **test** memories — Test entries (can be ignored)
Curation runs daily at 02:00 and monthly on the 1st at 03:00.
## Related Infrastructure
| Service | Host | Port | | Service | Host | Port |
|---------|------|------| |---------|------|------|
| Qdrant | 10.0.0.22 | 6333 |
| Ollama | 10.0.0.10 | 11434 | | Ollama | 10.0.0.10 | 11434 |
| deb9 | 10.0.0.48 | Source project (SSH) | | Qdrant | 10.0.0.22 | 6333 |
| deb8 | 10.0.0.46 | Docker runtime |
## Qdrant Collections Qdrant collections: `memories` (default), `vera_memories` (alternative), `python_kb` (reference patterns).
| Collection | Purpose |
|------------|---------|
| `python_kb` | Python code patterns reference for this project |
| `memories` | Conversation memory storage (default) |
| `vera_memories` | Alternative memory collection |

View File

@@ -1,5 +1,5 @@
# app/config.py # app/config.py
import toml import tomllib
import os import os
from pathlib import Path from pathlib import Path
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -83,8 +83,8 @@ class Config:
config = cls() config = cls()
if config_path.exists(): if config_path.exists():
with open(config_path, "r") as f: with open(config_path, "rb") as f:
data = toml.load(f) data = tomllib.load(f)
if "general" in data: if "general" in data:
config.ollama_host = data["general"].get("ollama_host", config.ollama_host) config.ollama_host = data["general"].get("ollama_host", config.ollama_host)
@@ -112,7 +112,14 @@ class Config:
api_key_env=cloud_data.get("api_key_env", "OPENROUTER_API_KEY"), api_key_env=cloud_data.get("api_key_env", "OPENROUTER_API_KEY"),
models=cloud_data.get("models", {}) models=cloud_data.get("models", {})
) )
if config.cloud.enabled and not config.cloud.api_key:
import logging
logging.getLogger(__name__).warning(
"Cloud is enabled but API key env var '%s' is not set",
config.cloud.api_key_env
)
return config return config
config = Config.load() config = Config.load()

View File

@@ -6,7 +6,7 @@ The prompt determines behavior based on current date.
""" """
import logging import logging
import os import os
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from pathlib import Path from pathlib import Path
import httpx import httpx
@@ -49,7 +49,7 @@ class Curator:
Otherwise runs daily mode (processes recent 24h only). Otherwise runs daily mode (processes recent 24h only).
The prompt determines behavior based on current date. The prompt determines behavior based on current date.
""" """
current_date = datetime.utcnow() current_date = datetime.now(timezone.utc)
is_monthly = current_date.day == 1 is_monthly = current_date.day == 1
mode = "MONTHLY" if is_monthly else "DAILY" mode = "MONTHLY" if is_monthly else "DAILY"
@@ -169,7 +169,7 @@ Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the J
return True return True
try: try:
mem_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) mem_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
cutoff = datetime.utcnow() - timedelta(hours=hours) cutoff = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=hours)
return mem_time.replace(tzinfo=None) > cutoff return mem_time.replace(tzinfo=None) > cutoff
except (ValueError, TypeError): except (ValueError, TypeError):
logger.debug(f"Could not parse timestamp: {timestamp}") logger.debug(f"Could not parse timestamp: {timestamp}")
@@ -212,7 +212,7 @@ Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the J
result = response.json() result = response.json()
return result.get("response", "") return result.get("response", "")
except Exception as e: except Exception as e:
logger.error(f"Failed to call LLM: {e}") logger.error(f"LLM call failed: {e}", exc_info=True)
return "" return ""
def _parse_json_response(self, response: str) -> Optional[Dict]: def _parse_json_response(self, response: str) -> Optional[Dict]:
@@ -223,6 +223,7 @@ Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the J
try: try:
return json.loads(response) return json.loads(response)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.debug("Direct JSON parse failed, trying brace extraction")
pass pass
try: try:

View File

@@ -4,7 +4,7 @@ from fastapi.responses import StreamingResponse, JSONResponse
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import httpx import httpx
import logging import logging
from datetime import datetime from datetime import datetime, timezone
from .config import config from .config import config
from .singleton import get_qdrant_service from .singleton import get_qdrant_service
@@ -68,7 +68,7 @@ async def lifespan(app: FastAPI):
await qdrant_service.close() await qdrant_service.close()
app = FastAPI(title="Vera-AI", version="2.0.0", lifespan=lifespan) app = FastAPI(title="Vera-AI", version="2.0.4", lifespan=lifespan)
@app.get("/") @app.get("/")
@@ -96,7 +96,7 @@ async def api_tags():
for name in config.cloud.models.keys(): for name in config.cloud.models.keys():
data["models"].append({ data["models"].append({
"name": name, "name": name,
"modified_at": "2026-03-25T00:00:00Z", "modified_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"size": 0, "size": 0,
"digest": "cloud", "digest": "cloud",
"details": {"family": "cloud"} "details": {"family": "cloud"}

View File

@@ -6,6 +6,7 @@ import json
import re import re
import logging import logging
import os import os
import portalocker
from pathlib import Path from pathlib import Path
from .config import config from .config import config
from .singleton import get_qdrant_service from .singleton import get_qdrant_service
@@ -48,17 +49,17 @@ def debug_log(category: str, message: str, data: dict = None):
if not config.debug: if not config.debug:
return return
from datetime import datetime from datetime import datetime, timezone
# Create logs directory # Create logs directory
log_dir = DEBUG_LOG_DIR log_dir = DEBUG_LOG_DIR
log_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True)
today = datetime.utcnow().strftime("%Y-%m-%d") today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
log_path = log_dir / f"debug_{today}.log" log_path = log_dir / f"debug_{today}.log"
entry = { entry = {
"timestamp": datetime.utcnow().isoformat() + "Z", "timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"category": category, "category": category,
"message": message "message": message
} }
@@ -66,7 +67,9 @@ def debug_log(category: str, message: str, data: dict = None):
entry["data"] = data entry["data"] = data
with open(log_path, "a") as f: with open(log_path, "a") as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write(json.dumps(entry) + "\n") f.write(json.dumps(entry) + "\n")
portalocker.unlock(f)
async def handle_chat_non_streaming(body: dict): async def handle_chat_non_streaming(body: dict):

View File

@@ -1,8 +1,8 @@
"""Qdrant service for memory storage - ASYNC VERSION.""" """Qdrant service for memory storage - ASYNC VERSION."""
from qdrant_client import AsyncQdrantClient from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, PayloadSchemaType
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from datetime import datetime from datetime import datetime, timezone
import uuid import uuid
import logging import logging
import httpx import httpx
@@ -34,6 +34,15 @@ class QdrantService:
vectors_config=VectorParams(size=self.vector_size, distance=Distance.COSINE) vectors_config=VectorParams(size=self.vector_size, distance=Distance.COSINE)
) )
logger.info(f"Created collection {self.collection} with vector size {self.vector_size}") logger.info(f"Created collection {self.collection} with vector size {self.vector_size}")
# Ensure payload index on timestamp for ordered scroll
try:
await self.client.create_payload_index(
collection_name=self.collection,
field_name="timestamp",
field_schema=PayloadSchemaType.KEYWORD
)
except Exception:
pass # Index may already exist
self._collection_ensured = True self._collection_ensured = True
async def get_embedding(self, text: str) -> List[float]: async def get_embedding(self, text: str) -> List[float]:
@@ -54,7 +63,7 @@ class QdrantService:
point_id = str(uuid.uuid4()) point_id = str(uuid.uuid4())
embedding = await self.get_embedding(content) embedding = await self.get_embedding(content)
timestamp = datetime.utcnow().isoformat() + "Z" timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
text = content text = content
if role == "user": if role == "user":
text = f"User: {content}" text = f"User: {content}"
@@ -85,7 +94,7 @@ class QdrantService:
"""Store a complete Q&A turn as one document.""" """Store a complete Q&A turn as one document."""
await self._ensure_collection() await self._ensure_collection()
timestamp = datetime.utcnow().isoformat() + "Z" timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
text = f"User: {user_question}\nAssistant: {assistant_answer}\nTimestamp: {timestamp}" text = f"User: {user_question}\nAssistant: {assistant_answer}\nTimestamp: {timestamp}"
point_id = str(uuid.uuid4()) point_id = str(uuid.uuid4())
@@ -105,20 +114,28 @@ class QdrantService:
) )
return point_id return point_id
async def semantic_search(self, query: str, limit: int = 10, score_threshold: float = 0.6, entry_type: str = "curated") -> List[Dict]: async def semantic_search(self, query: str, limit: int = 10, score_threshold: float = 0.6, entry_type: str = "curated", entry_types: Optional[List[str]] = None) -> List[Dict]:
"""Semantic search for relevant turns, filtered by type.""" """Semantic search for relevant turns, filtered by type(s)."""
await self._ensure_collection() await self._ensure_collection()
embedding = await self.get_embedding(query) embedding = await self.get_embedding(query)
if entry_types and len(entry_types) > 1:
type_filter = Filter(
should=[FieldCondition(key="type", match=MatchValue(value=t)) for t in entry_types]
)
else:
filter_type = entry_types[0] if entry_types else entry_type
type_filter = Filter(
must=[FieldCondition(key="type", match=MatchValue(value=filter_type))]
)
results = await self.client.query_points( results = await self.client.query_points(
collection_name=self.collection, collection_name=self.collection,
query=embedding, query=embedding,
limit=limit, limit=limit,
score_threshold=score_threshold, score_threshold=score_threshold,
query_filter=Filter( query_filter=type_filter
must=[FieldCondition(key="type", match=MatchValue(value=entry_type))]
)
) )
return [{"id": str(r.id), "score": r.score, "payload": r.payload} for r in results.points] return [{"id": str(r.id), "score": r.score, "payload": r.payload} for r in results.points]
@@ -126,21 +143,29 @@ class QdrantService:
async def get_recent_turns(self, limit: int = 20) -> List[Dict]: async def get_recent_turns(self, limit: int = 20) -> List[Dict]:
"""Get recent turns from Qdrant (both raw and curated).""" """Get recent turns from Qdrant (both raw and curated)."""
await self._ensure_collection() await self._ensure_collection()
points, _ = await self.client.scroll( try:
collection_name=self.collection, from qdrant_client.models import OrderBy
limit=limit * 2, points, _ = await self.client.scroll(
with_payload=True collection_name=self.collection,
) limit=limit,
with_payload=True,
# Sort by timestamp descending order_by=OrderBy(key="timestamp", direction="desc")
sorted_points = sorted( )
points, except Exception:
key=lambda p: p.payload.get("timestamp", ""), # Fallback: fetch extra points and sort client-side
reverse=True points, _ = await self.client.scroll(
) collection_name=self.collection,
limit=limit * 5,
return [{"id": str(p.id), "payload": p.payload} for p in sorted_points[:limit]] with_payload=True
)
points = sorted(
points,
key=lambda p: p.payload.get("timestamp", ""),
reverse=True
)[:limit]
return [{"id": str(p.id), "payload": p.payload} for p in points]
async def delete_points(self, point_ids: List[str]) -> None: async def delete_points(self, point_ids: List[str]) -> None:
"""Delete points by ID.""" """Delete points by ID."""

View File

@@ -1,8 +1,9 @@
"""Global singleton instances for Vera-AI.""" """Global singleton instances for Vera-AI."""
from typing import Optional
from .qdrant_service import QdrantService from .qdrant_service import QdrantService
from .config import config from .config import config
_qdrant_service: QdrantService = None _qdrant_service: Optional[QdrantService] = None
def get_qdrant_service() -> QdrantService: def get_qdrant_service() -> QdrantService:

View File

@@ -1,9 +1,10 @@
"""Utility functions for vera-ai.""" """Utility functions for vera-ai."""
from .config import config from .config import config
from .singleton import get_qdrant_service
import tiktoken import tiktoken
import os import os
from typing import List, Dict, Optional from typing import List, Dict, Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
# Use cl100k_base encoding (GPT-4 compatible) # Use cl100k_base encoding (GPT-4 compatible)
@@ -13,24 +14,6 @@ ENCODING = tiktoken.get_encoding("cl100k_base")
PROMPTS_DIR = Path(os.environ.get("VERA_PROMPTS_DIR", "/app/prompts")) PROMPTS_DIR = Path(os.environ.get("VERA_PROMPTS_DIR", "/app/prompts"))
STATIC_DIR = Path(os.environ.get("VERA_STATIC_DIR", "/app/static")) STATIC_DIR = Path(os.environ.get("VERA_STATIC_DIR", "/app/static"))
# Global qdrant_service instance for utils
_qdrant_service = None
def get_qdrant_service():
"""Get or create the QdrantService singleton."""
global _qdrant_service
if _qdrant_service is None:
from .config import config
from .qdrant_service import QdrantService
_qdrant_service = QdrantService(
host=config.qdrant_host,
collection=config.qdrant_collection,
embedding_model=config.embedding_model,
vector_size=config.vector_size,
ollama_host=config.ollama_host
)
return _qdrant_service
def count_tokens(text: str) -> int: def count_tokens(text: str) -> int:
"""Count tokens in text.""" """Count tokens in text."""
if not text: if not text:
@@ -56,7 +39,7 @@ def truncate_by_tokens(text: str, max_tokens: int) -> str:
def filter_memories_by_time(memories: List[Dict], hours: int = 24) -> List[Dict]: def filter_memories_by_time(memories: List[Dict], hours: int = 24) -> List[Dict]:
"""Filter memories from the last N hours.""" """Filter memories from the last N hours."""
cutoff = datetime.utcnow() - timedelta(hours=hours) cutoff = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=hours)
filtered = [] filtered = []
for mem in memories: for mem in memories:
ts = mem.get("timestamp") ts = mem.get("timestamp")
@@ -64,7 +47,7 @@ def filter_memories_by_time(memories: List[Dict], hours: int = 24) -> List[Dict]
try: try:
# Parse ISO timestamp # Parse ISO timestamp
if isinstance(ts, str): if isinstance(ts, str):
mem_time = datetime.fromisoformat(ts.replace("Z", "+00:00").replace("+00:00", "")) mem_time = datetime.fromisoformat(ts.replace("Z", "")).replace(tzinfo=None)
else: else:
mem_time = ts mem_time = ts
if mem_time > cutoff: if mem_time > cutoff:
@@ -100,15 +83,6 @@ def merge_memories(memories: List[Dict]) -> Dict:
"ids": ids "ids": ids
} }
def calculate_token_budget(total_budget: int, system_ratio: float = 0.2,
semantic_ratio: float = 0.5, context_ratio: float = 0.3) -> Dict[int, int]:
"""Calculate token budgets for each layer."""
return {
"system": int(total_budget * system_ratio),
"semantic": int(total_budget * semantic_ratio),
"context": int(total_budget * context_ratio)
}
def load_system_prompt() -> str: def load_system_prompt() -> str:
"""Load system prompt from prompts directory.""" """Load system prompt from prompts directory."""
import logging import logging
@@ -219,36 +193,45 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
} }
# === LAYER 1: System Prompt === # === LAYER 1: System Prompt ===
system_content = "" # Caller's system message passes through; systemprompt.md appends if non-empty.
caller_system = ""
for msg in incoming_messages: for msg in incoming_messages:
if msg.get("role") == "system": if msg.get("role") == "system":
system_content = msg.get("content", "") caller_system = msg.get("content", "")
break break
if system_prompt: if caller_system and system_prompt:
system_content += "\n\n" + system_prompt system_content = caller_system + "\n\n" + system_prompt
elif caller_system:
system_content = caller_system
elif system_prompt:
system_content = system_prompt
else:
system_content = ""
if system_content: if system_content:
messages.append({"role": "system", "content": system_content}) messages.append({"role": "system", "content": system_content})
logger.info(f"Layer 1 (system): {count_tokens(system_content)} tokens") logger.info(f"Layer 1 (system): {count_tokens(system_content)} tokens")
# === LAYER 2: Semantic (curated memories) === # === LAYER 2: Semantic (curated + raw memories) ===
qdrant = get_qdrant_service() qdrant = get_qdrant_service()
semantic_results = await qdrant.semantic_search( semantic_results = await qdrant.semantic_search(
query=search_context if search_context else user_question, query=search_context if search_context else user_question,
limit=20, limit=20,
score_threshold=config.semantic_score_threshold, score_threshold=config.semantic_score_threshold,
entry_type="curated" entry_types=["curated", "raw"]
) )
semantic_messages = [] semantic_messages = []
semantic_tokens_used = 0 semantic_tokens_used = 0
semantic_ids = set()
for result in semantic_results: for result in semantic_results:
semantic_ids.add(result.get("id"))
payload = result.get("payload", {}) payload = result.get("payload", {})
text = payload.get("text", "") text = payload.get("text", "")
if text: if text:
# Parse curated turn into proper user/assistant messages # Parse curated/raw turn into proper user/assistant messages
parsed = parse_curated_turn(text) parsed = parse_curated_turn(text)
for msg in parsed: for msg in parsed:
msg_tokens = count_tokens(msg.get("content", "")) msg_tokens = count_tokens(msg.get("content", ""))
@@ -273,8 +256,10 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
context_messages = [] context_messages = []
context_tokens_used = 0 context_tokens_used = 0
# Process oldest first for chronological order # Process oldest first for chronological order, skip duplicates from Layer 2
for turn in reversed(recent_turns): for turn in reversed(recent_turns):
if turn.get("id") in semantic_ids:
continue
payload = turn.get("payload", {}) payload = turn.get("payload", {})
text = payload.get("text", "") text = payload.get("text", "")
entry_type = payload.get("type", "raw") entry_type = payload.get("type", "raw")

View File

@@ -1,11 +1,11 @@
fastapi>=0.104.0 fastapi==0.135.2
uvicorn[standard]>=0.24.0 uvicorn[standard]==0.42.0
httpx>=0.25.0 httpx==0.28.1
qdrant-client>=1.6.0 qdrant-client==1.17.1
ollama>=0.1.0 ollama==0.6.1
toml>=0.10.2 tiktoken==0.12.0
tiktoken>=0.5.0 apscheduler==3.11.2
apscheduler>=3.10.0 portalocker==3.2.0
pytest>=7.0.0 pytest==9.0.2
pytest-asyncio>=0.21.0 pytest-asyncio==1.3.0
pytest-cov>=4.0.0 pytest-cov==7.1.0

62
tests/conftest.py Normal file
View File

@@ -0,0 +1,62 @@
"""Shared test fixtures using production-realistic data."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from app.config import Config
@pytest.fixture
def production_config():
"""Config matching production deployment on deb8."""
config = MagicMock(spec=Config)
config.ollama_host = "http://10.0.0.10:11434"
config.qdrant_host = "http://10.0.0.22:6333"
config.qdrant_collection = "memories"
config.embedding_model = "snowflake-arctic-embed2"
config.semantic_token_budget = 25000
config.context_token_budget = 22000
config.semantic_search_turns = 2
config.semantic_score_threshold = 0.6
config.run_time = "02:00"
config.curator_model = "gpt-oss:120b"
config.debug = False
config.vector_size = 1024
config.cloud = MagicMock()
config.cloud.enabled = False
config.cloud.models = {}
config.cloud.get_cloud_model.return_value = None
return config
@pytest.fixture
def sample_qdrant_raw_payload():
"""Sample raw payload from production Qdrant."""
return {
"type": "raw",
"text": "User: only change settings, not models\nAssistant: Changed semantic_token_budget from 25000 to 30000\nTimestamp: 2026-03-27T12:50:37.451593Z",
"timestamp": "2026-03-27T12:50:37.451593Z",
"role": "qa",
"content": "User: only change settings, not models\nAssistant: Changed semantic_token_budget from 25000 to 30000\nTimestamp: 2026-03-27T12:50:37.451593Z"
}
@pytest.fixture
def sample_ollama_models():
"""Model list from production Ollama."""
return {
"models": [
{
"name": "snowflake-arctic-embed2:latest",
"model": "snowflake-arctic-embed2:latest",
"modified_at": "2026-02-16T16:43:44Z",
"size": 1160296718,
"details": {"family": "bert", "parameter_size": "566.70M", "quantization_level": "F16"}
},
{
"name": "gpt-oss:120b",
"model": "gpt-oss:120b",
"modified_at": "2026-03-11T12:45:48Z",
"size": 65369818941,
"details": {"family": "gptoss", "parameter_size": "116.8B", "quantization_level": "MXFP4"}
}
]
}

View File

@@ -2,9 +2,9 @@
import pytest import pytest
import json import json
import os import os
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, AsyncMock, patch
def make_curator(): def make_curator():
@@ -77,14 +77,14 @@ class TestIsRecent:
def test_memory_within_window(self): def test_memory_within_window(self):
"""Memory timestamped 1 hour ago is recent (within 24h).""" """Memory timestamped 1 hour ago is recent (within 24h)."""
curator, _ = make_curator() curator, _ = make_curator()
ts = (datetime.utcnow() - timedelta(hours=1)).isoformat() + "Z" ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1)).isoformat() + "Z"
memory = {"timestamp": ts} memory = {"timestamp": ts}
assert curator._is_recent(memory, hours=24) is True assert curator._is_recent(memory, hours=24) is True
def test_memory_outside_window(self): def test_memory_outside_window(self):
"""Memory timestamped 48 hours ago is not recent.""" """Memory timestamped 48 hours ago is not recent."""
curator, _ = make_curator() curator, _ = make_curator()
ts = (datetime.utcnow() - timedelta(hours=48)).isoformat() + "Z" ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=48)).isoformat() + "Z"
memory = {"timestamp": ts} memory = {"timestamp": ts}
assert curator._is_recent(memory, hours=24) is False assert curator._is_recent(memory, hours=24) is False
@@ -109,7 +109,7 @@ class TestIsRecent:
def test_boundary_edge_just_inside(self): def test_boundary_edge_just_inside(self):
"""Memory at exactly hours-1 minutes ago should be recent.""" """Memory at exactly hours-1 minutes ago should be recent."""
curator, _ = make_curator() curator, _ = make_curator()
ts = (datetime.utcnow() - timedelta(hours=23, minutes=59)).isoformat() + "Z" ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=23, minutes=59)).isoformat() + "Z"
memory = {"timestamp": ts} memory = {"timestamp": ts}
assert curator._is_recent(memory, hours=24) is True assert curator._is_recent(memory, hours=24) is True
@@ -197,3 +197,294 @@ class TestAppendRuleToFile:
target = prompts_dir / "newfile.md" target = prompts_dir / "newfile.md"
assert target.exists() assert target.exists()
assert "New rule here." in target.read_text() assert "New rule here." in target.read_text()
class TestFormatExistingMemories:
"""Tests for Curator._format_existing_memories."""
def test_empty_list_returns_no_memories_message(self):
"""Empty list returns a 'no memories' message."""
curator, _ = make_curator()
result = curator._format_existing_memories([])
assert "No existing curated memories" in result
def test_single_memory_formatted(self):
"""Single memory text is included in output."""
curator, _ = make_curator()
memories = [{"text": "User: hello\nAssistant: hi there"}]
result = curator._format_existing_memories(memories)
assert "hello" in result
assert "hi there" in result
def test_limits_to_last_20(self):
"""Only last 20 memories are included."""
curator, _ = make_curator()
memories = [{"text": f"memory {i}"} for i in range(30)]
result = curator._format_existing_memories(memories)
# Should contain memory 10-29 (last 20), not memory 0-9
assert "memory 29" in result
assert "memory 10" in result
class TestCallLlm:
"""Tests for Curator._call_llm."""
@pytest.mark.asyncio
async def test_call_llm_returns_response(self):
"""_call_llm returns the response text from Ollama."""
curator, _ = make_curator()
mock_resp = MagicMock()
mock_resp.json.return_value = {"response": "some LLM output"}
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await curator._call_llm("test prompt")
assert result == "some LLM output"
call_kwargs = mock_client.post.call_args
assert "test-model" in call_kwargs[1]["json"]["model"]
@pytest.mark.asyncio
async def test_call_llm_returns_empty_on_error(self):
"""_call_llm returns empty string when Ollama errors."""
curator, _ = make_curator()
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(side_effect=Exception("connection refused"))
with patch("httpx.AsyncClient", return_value=mock_client):
result = await curator._call_llm("test prompt")
assert result == ""
class TestCuratorRun:
"""Tests for Curator.run() method."""
@pytest.mark.asyncio
async def test_run_no_raw_memories_exits_early(self):
"""run() exits early when no raw memories found."""
curator, mock_qdrant = make_curator()
# Mock scroll to return no points
mock_qdrant.client = AsyncMock()
mock_qdrant.client.scroll = AsyncMock(return_value=([], None))
mock_qdrant.collection = "memories"
await curator.run()
# Should not call LLM since there are no raw memories
# If it got here without error, that's success
@pytest.mark.asyncio
async def test_run_processes_raw_memories(self):
"""run() processes raw memories and stores curated results."""
curator, mock_qdrant = make_curator()
# Create mock points
mock_point = MagicMock()
mock_point.id = "point-1"
mock_point.payload = {
"type": "raw",
"text": "User: hello\nAssistant: hi",
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
mock_qdrant.client = AsyncMock()
mock_qdrant.client.scroll = AsyncMock(return_value=([mock_point], None))
mock_qdrant.collection = "memories"
mock_qdrant.store_turn = AsyncMock(return_value="new-id")
mock_qdrant.delete_points = AsyncMock()
llm_response = json.dumps({
"new_curated_turns": [{"content": "User: hello\nAssistant: hi"}],
"permanent_rules": [],
"deletions": [],
"summary": "Curated one turn"
})
with patch.object(curator, "_call_llm", AsyncMock(return_value=llm_response)):
await curator.run()
mock_qdrant.store_turn.assert_called_once()
mock_qdrant.delete_points.assert_called()
@pytest.mark.asyncio
async def test_run_monthly_mode_on_day_01(self):
"""run() uses monthly mode on day 01, processing all raw memories."""
curator, mock_qdrant = make_curator()
# Create a mock point with an old timestamp (outside 24h window)
old_ts = (datetime.now(timezone.utc) - timedelta(hours=72)).isoformat().replace("+00:00", "Z")
mock_point = MagicMock()
mock_point.id = "old-point"
mock_point.payload = {
"type": "raw",
"text": "User: old question\nAssistant: old answer",
"timestamp": old_ts,
}
mock_qdrant.client = AsyncMock()
mock_qdrant.client.scroll = AsyncMock(return_value=([mock_point], None))
mock_qdrant.collection = "memories"
mock_qdrant.store_turn = AsyncMock(return_value="new-id")
mock_qdrant.delete_points = AsyncMock()
llm_response = json.dumps({
"new_curated_turns": [],
"permanent_rules": [],
"deletions": [],
"summary": "Nothing to curate"
})
# Mock day 01
mock_now = datetime(2026, 4, 1, 2, 0, 0, tzinfo=timezone.utc)
with patch.object(curator, "_call_llm", AsyncMock(return_value=llm_response)), \
patch("app.curator.datetime") as mock_dt:
mock_dt.now.return_value = mock_now
mock_dt.fromisoformat = datetime.fromisoformat
mock_dt.side_effect = lambda *args, **kw: datetime(*args, **kw)
await curator.run()
# In monthly mode, even old memories are processed, so LLM should be called
# and delete_points should be called for the raw memory
mock_qdrant.delete_points.assert_called()
@pytest.mark.asyncio
async def test_run_handles_permanent_rules(self):
"""run() appends permanent rules to prompt files."""
curator, mock_qdrant = make_curator()
mock_point = MagicMock()
mock_point.id = "point-1"
mock_point.payload = {
"type": "raw",
"text": "User: remember this\nAssistant: ok",
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
mock_qdrant.client = AsyncMock()
mock_qdrant.client.scroll = AsyncMock(return_value=([mock_point], None))
mock_qdrant.collection = "memories"
mock_qdrant.store_turn = AsyncMock(return_value="new-id")
mock_qdrant.delete_points = AsyncMock()
llm_response = json.dumps({
"new_curated_turns": [],
"permanent_rules": [{"rule": "Always be concise.", "target_file": "systemprompt.md"}],
"deletions": [],
"summary": "Added a rule"
})
with patch.object(curator, "_call_llm", AsyncMock(return_value=llm_response)), \
patch.object(curator, "_append_rule_to_file", AsyncMock()) as mock_append:
await curator.run()
mock_append.assert_called_once_with("systemprompt.md", "Always be concise.")
@pytest.mark.asyncio
async def test_run_handles_deletions(self):
"""run() deletes specified point IDs when they exist in the database."""
curator, mock_qdrant = make_curator()
mock_point = MagicMock()
mock_point.id = "point-1"
mock_point.payload = {
"type": "raw",
"text": "User: delete me\nAssistant: ok",
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
mock_qdrant.client = AsyncMock()
mock_qdrant.client.scroll = AsyncMock(return_value=([mock_point], None))
mock_qdrant.collection = "memories"
mock_qdrant.store_turn = AsyncMock(return_value="new-id")
mock_qdrant.delete_points = AsyncMock()
llm_response = json.dumps({
"new_curated_turns": [],
"permanent_rules": [],
"deletions": ["point-1"],
"summary": "Deleted one"
})
with patch.object(curator, "_call_llm", AsyncMock(return_value=llm_response)):
await curator.run()
# delete_points should be called at least twice: once for valid deletions, once for processed raw
assert mock_qdrant.delete_points.call_count >= 1
@pytest.mark.asyncio
async def test_run_handles_llm_parse_failure(self):
"""run() handles LLM returning unparseable response gracefully."""
curator, mock_qdrant = make_curator()
mock_point = MagicMock()
mock_point.id = "point-1"
mock_point.payload = {
"type": "raw",
"text": "User: test\nAssistant: ok",
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
mock_qdrant.client = AsyncMock()
mock_qdrant.client.scroll = AsyncMock(return_value=([mock_point], None))
mock_qdrant.collection = "memories"
with patch.object(curator, "_call_llm", AsyncMock(return_value="not json at all!!!")):
# Should not raise - just return early
await curator.run()
# store_turn should NOT be called since parsing failed
mock_qdrant.store_turn = AsyncMock()
mock_qdrant.store_turn.assert_not_called()
class TestLoadCuratorPrompt:
"""Tests for load_curator_prompt function."""
def test_loads_from_prompts_dir(self, tmp_path):
"""load_curator_prompt loads from PROMPTS_DIR."""
import app.curator as curator_module
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "curator_prompt.md").write_text("Test curator prompt")
with patch.object(curator_module, "PROMPTS_DIR", prompts_dir):
from app.curator import load_curator_prompt
result = load_curator_prompt()
assert result == "Test curator prompt"
def test_falls_back_to_static_dir(self, tmp_path):
"""load_curator_prompt falls back to STATIC_DIR."""
import app.curator as curator_module
prompts_dir = tmp_path / "prompts" # does not exist
static_dir = tmp_path / "static"
static_dir.mkdir()
(static_dir / "curator_prompt.md").write_text("Static prompt")
with patch.object(curator_module, "PROMPTS_DIR", prompts_dir), \
patch.object(curator_module, "STATIC_DIR", static_dir):
from app.curator import load_curator_prompt
result = load_curator_prompt()
assert result == "Static prompt"
def test_raises_when_not_found(self, tmp_path):
"""load_curator_prompt raises FileNotFoundError when file missing."""
import app.curator as curator_module
with patch.object(curator_module, "PROMPTS_DIR", tmp_path / "nope"), \
patch.object(curator_module, "STATIC_DIR", tmp_path / "also_nope"):
from app.curator import load_curator_prompt
with pytest.raises(FileNotFoundError):
load_curator_prompt()

View File

@@ -349,3 +349,83 @@ class TestApiChatStreaming:
# Response body should contain both chunks concatenated # Response body should contain both chunks concatenated
body_text = resp.text body_text = resp.text
assert "Hello" in body_text or len(body_text) > 0 assert "Hello" in body_text or len(body_text) > 0
# ---------------------------------------------------------------------------
# Health check edge cases
# ---------------------------------------------------------------------------
class TestHealthCheckEdgeCases:
def test_health_ollama_timeout(self, app_with_mocks):
"""GET / handles Ollama timeout gracefully."""
import httpx
from fastapi.testclient import TestClient
vera_app, _ = app_with_mocks
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.get = AsyncMock(side_effect=httpx.TimeoutException("timeout"))
with patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app, raise_server_exceptions=True) as client:
resp = client.get("/")
assert resp.status_code == 200
assert resp.json()["ollama"] == "unreachable"
# ---------------------------------------------------------------------------
# POST /curator/run
# ---------------------------------------------------------------------------
class TestTriggerCurator:
def test_trigger_curator_endpoint(self, app_with_mocks):
"""POST /curator/run triggers curation and returns status."""
from fastapi.testclient import TestClient
import app.main as main_module
vera_app, _ = app_with_mocks
mock_curator = MagicMock()
mock_curator.run = AsyncMock()
with patch.object(main_module, "curator", mock_curator):
with TestClient(vera_app) as client:
resp = client.post("/curator/run")
assert resp.status_code == 200
assert resp.json()["status"] == "curation completed"
mock_curator.run.assert_called_once()
# ---------------------------------------------------------------------------
# Proxy catch-all
# ---------------------------------------------------------------------------
class TestProxyAll:
def test_non_chat_api_proxied(self, app_with_mocks):
"""Non-chat API paths are proxied to Ollama."""
from fastapi.testclient import TestClient
vera_app, _ = app_with_mocks
async def fake_aiter_bytes():
yield b'{"status": "ok"}'
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.headers = {"content-type": "application/json"}
mock_resp.aiter_bytes = fake_aiter_bytes
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.request = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app) as client:
resp = client.get("/api/show")
assert resp.status_code == 200

View File

@@ -219,3 +219,94 @@ class TestHandleChatNonStreaming:
# The wrapper should be stripped # The wrapper should be stripped
assert "Memory context" not in stored_question assert "Memory context" not in stored_question
assert "What is the answer?" in stored_question assert "What is the answer?" in stored_question
class TestDebugLog:
"""Tests for debug_log function."""
def test_debug_log_writes_json_when_enabled(self, tmp_path):
"""Debug log appends valid JSON line to file when debug=True."""
import json
from unittest.mock import patch, MagicMock
mock_config = MagicMock()
mock_config.debug = True
with patch("app.proxy_handler.config", mock_config), \
patch("app.proxy_handler.DEBUG_LOG_DIR", tmp_path):
from app.proxy_handler import debug_log
debug_log("test_cat", "test message", {"key": "value"})
log_files = list(tmp_path.glob("debug_*.log"))
assert len(log_files) == 1
content = log_files[0].read_text().strip()
entry = json.loads(content)
assert entry["category"] == "test_cat"
assert entry["message"] == "test message"
assert entry["data"]["key"] == "value"
def test_debug_log_skips_when_disabled(self, tmp_path):
"""Debug log does nothing when debug=False."""
from unittest.mock import patch, MagicMock
mock_config = MagicMock()
mock_config.debug = False
with patch("app.proxy_handler.config", mock_config), \
patch("app.proxy_handler.DEBUG_LOG_DIR", tmp_path):
from app.proxy_handler import debug_log
debug_log("test_cat", "test message")
log_files = list(tmp_path.glob("debug_*.log"))
assert len(log_files) == 0
def test_debug_log_without_data(self, tmp_path):
"""Debug log works without optional data parameter."""
import json
from unittest.mock import patch, MagicMock
mock_config = MagicMock()
mock_config.debug = True
with patch("app.proxy_handler.config", mock_config), \
patch("app.proxy_handler.DEBUG_LOG_DIR", tmp_path):
from app.proxy_handler import debug_log
debug_log("simple_cat", "no data here")
log_files = list(tmp_path.glob("debug_*.log"))
assert len(log_files) == 1
entry = json.loads(log_files[0].read_text().strip())
assert "data" not in entry
assert entry["category"] == "simple_cat"
class TestForwardToOllama:
"""Tests for forward_to_ollama function."""
@pytest.mark.asyncio
async def test_forwards_request_to_ollama(self):
"""forward_to_ollama proxies request to Ollama host."""
from app.proxy_handler import forward_to_ollama
from unittest.mock import patch, AsyncMock, MagicMock
mock_request = AsyncMock()
mock_request.body = AsyncMock(return_value=b'{"model": "llama3"}')
mock_request.method = "POST"
mock_request.headers = {"content-type": "application/json", "content-length": "20"}
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.request = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await forward_to_ollama(mock_request, "/api/show")
assert result == mock_resp
mock_client.request.assert_called_once()
call_kwargs = mock_client.request.call_args
assert call_kwargs[1]["method"] == "POST"
assert "/api/show" in call_kwargs[1]["url"]

View File

@@ -0,0 +1,256 @@
"""Tests for QdrantService — all Qdrant and Ollama calls are mocked."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
def make_qdrant_service():
"""Create a QdrantService with mocked AsyncQdrantClient."""
with patch("app.qdrant_service.AsyncQdrantClient") as MockClient:
mock_client = AsyncMock()
MockClient.return_value = mock_client
from app.qdrant_service import QdrantService
svc = QdrantService(
host="http://localhost:6333",
collection="test_memories",
embedding_model="snowflake-arctic-embed2",
vector_size=1024,
ollama_host="http://localhost:11434",
)
return svc, mock_client
class TestEnsureCollection:
"""Tests for _ensure_collection."""
@pytest.mark.asyncio
async def test_creates_collection_when_missing(self):
"""Creates collection if it does not exist."""
svc, mock_client = make_qdrant_service()
mock_client.get_collection = AsyncMock(side_effect=Exception("not found"))
mock_client.create_collection = AsyncMock()
await svc._ensure_collection()
mock_client.create_collection.assert_called_once()
assert svc._collection_ensured is True
@pytest.mark.asyncio
async def test_skips_if_collection_exists(self):
"""Does not create collection if it already exists."""
svc, mock_client = make_qdrant_service()
mock_client.get_collection = AsyncMock(return_value=MagicMock())
mock_client.create_collection = AsyncMock()
await svc._ensure_collection()
mock_client.create_collection.assert_not_called()
assert svc._collection_ensured is True
@pytest.mark.asyncio
async def test_skips_if_already_ensured(self):
"""Skips entirely if _collection_ensured is True."""
svc, mock_client = make_qdrant_service()
svc._collection_ensured = True
mock_client.get_collection = AsyncMock()
await svc._ensure_collection()
mock_client.get_collection.assert_not_called()
class TestGetEmbedding:
"""Tests for get_embedding."""
@pytest.mark.asyncio
async def test_returns_embedding_vector(self):
"""Returns embedding from Ollama response."""
svc, _ = make_qdrant_service()
fake_embedding = [0.1] * 1024
mock_resp = MagicMock()
mock_resp.json.return_value = {"embedding": fake_embedding}
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await svc.get_embedding("test text")
assert result == fake_embedding
assert len(result) == 1024
class TestStoreTurn:
"""Tests for store_turn."""
@pytest.mark.asyncio
async def test_stores_raw_user_turn(self):
"""Stores a user turn with proper payload."""
svc, mock_client = make_qdrant_service()
svc._collection_ensured = True
mock_client.upsert = AsyncMock()
fake_embedding = [0.1] * 1024
with patch.object(svc, "get_embedding", AsyncMock(return_value=fake_embedding)):
point_id = await svc.store_turn(role="user", content="hello world")
assert isinstance(point_id, str)
mock_client.upsert.assert_called_once()
call_args = mock_client.upsert.call_args
point = call_args[1]["points"][0]
assert point.payload["type"] == "raw"
assert point.payload["role"] == "user"
assert "User: hello world" in point.payload["text"]
@pytest.mark.asyncio
async def test_stores_curated_turn(self):
"""Stores a curated turn without role prefix in text."""
svc, mock_client = make_qdrant_service()
svc._collection_ensured = True
mock_client.upsert = AsyncMock()
fake_embedding = [0.1] * 1024
with patch.object(svc, "get_embedding", AsyncMock(return_value=fake_embedding)):
point_id = await svc.store_turn(
role="curated",
content="User: q\nAssistant: a",
entry_type="curated"
)
call_args = mock_client.upsert.call_args
point = call_args[1]["points"][0]
assert point.payload["type"] == "curated"
# Curated text should be the content directly, not prefixed
assert point.payload["text"] == "User: q\nAssistant: a"
@pytest.mark.asyncio
async def test_stores_with_topic_and_metadata(self):
"""Stores turn with optional topic and metadata."""
svc, mock_client = make_qdrant_service()
svc._collection_ensured = True
mock_client.upsert = AsyncMock()
fake_embedding = [0.1] * 1024
with patch.object(svc, "get_embedding", AsyncMock(return_value=fake_embedding)):
await svc.store_turn(
role="assistant",
content="some response",
topic="python",
metadata={"source": "test"}
)
call_args = mock_client.upsert.call_args
point = call_args[1]["points"][0]
assert point.payload["topic"] == "python"
assert point.payload["source"] == "test"
class TestStoreQaTurn:
"""Tests for store_qa_turn."""
@pytest.mark.asyncio
async def test_stores_qa_turn(self):
"""Stores a complete Q&A turn."""
svc, mock_client = make_qdrant_service()
svc._collection_ensured = True
mock_client.upsert = AsyncMock()
fake_embedding = [0.1] * 1024
with patch.object(svc, "get_embedding", AsyncMock(return_value=fake_embedding)):
point_id = await svc.store_qa_turn("What is Python?", "A programming language.")
assert isinstance(point_id, str)
call_args = mock_client.upsert.call_args
point = call_args[1]["points"][0]
assert point.payload["type"] == "raw"
assert point.payload["role"] == "qa"
assert "What is Python?" in point.payload["text"]
assert "A programming language." in point.payload["text"]
class TestSemanticSearch:
"""Tests for semantic_search."""
@pytest.mark.asyncio
async def test_returns_matching_results(self):
"""Returns formatted search results."""
svc, mock_client = make_qdrant_service()
svc._collection_ensured = True
mock_point = MagicMock()
mock_point.id = "result-1"
mock_point.score = 0.85
mock_point.payload = {"text": "User: hello\nAssistant: hi", "type": "curated"}
mock_query_result = MagicMock()
mock_query_result.points = [mock_point]
mock_client.query_points = AsyncMock(return_value=mock_query_result)
fake_embedding = [0.1] * 1024
with patch.object(svc, "get_embedding", AsyncMock(return_value=fake_embedding)):
results = await svc.semantic_search("hello", limit=10, score_threshold=0.6)
assert len(results) == 1
assert results[0]["id"] == "result-1"
assert results[0]["score"] == 0.85
assert results[0]["payload"]["type"] == "curated"
class TestGetRecentTurns:
"""Tests for get_recent_turns."""
@pytest.mark.asyncio
async def test_returns_sorted_recent_turns(self):
"""Returns turns sorted by timestamp descending."""
svc, mock_client = make_qdrant_service()
svc._collection_ensured = True
mock_point1 = MagicMock()
mock_point1.id = "old"
mock_point1.payload = {"timestamp": "2026-01-01T00:00:00Z", "text": "old turn"}
mock_point2 = MagicMock()
mock_point2.id = "new"
mock_point2.payload = {"timestamp": "2026-03-01T00:00:00Z", "text": "new turn"}
# OrderBy returns server-sorted results (newest first)
mock_client.scroll = AsyncMock(return_value=([mock_point2, mock_point1], None))
results = await svc.get_recent_turns(limit=2)
assert len(results) == 2
# Newest first (server-sorted via OrderBy)
assert results[0]["id"] == "new"
assert results[1]["id"] == "old"
class TestDeletePoints:
"""Tests for delete_points."""
@pytest.mark.asyncio
async def test_deletes_by_ids(self):
"""Deletes points by their IDs."""
svc, mock_client = make_qdrant_service()
mock_client.delete = AsyncMock()
await svc.delete_points(["id1", "id2"])
mock_client.delete.assert_called_once()
class TestClose:
"""Tests for close."""
@pytest.mark.asyncio
async def test_closes_client(self):
"""Closes the async Qdrant client."""
svc, mock_client = make_qdrant_service()
mock_client.close = AsyncMock()
await svc.close()
mock_client.close.assert_called_once()

View File

@@ -1,6 +1,7 @@
"""Tests for utility functions.""" """Tests for utility functions."""
import pytest import pytest
from app.utils import count_tokens, truncate_by_tokens, parse_curated_turn from unittest.mock import AsyncMock, MagicMock, patch
from app.utils import count_tokens, truncate_by_tokens, parse_curated_turn, build_augmented_messages, count_messages_tokens
class TestCountTokens: class TestCountTokens:
@@ -85,25 +86,95 @@ Assistant: Yes, very popular."""
assert "Line 3" in result[0]["content"] assert "Line 3" in result[0]["content"]
class TestCountMessagesTokens:
"""Tests for count_messages_tokens function."""
def test_empty_list(self):
"""Empty message list returns 0."""
assert count_messages_tokens([]) == 0
def test_single_message(self):
"""Single message counts tokens of its content."""
msgs = [{"role": "user", "content": "Hello world"}]
result = count_messages_tokens(msgs)
assert result > 0
def test_multiple_messages(self):
"""Multiple messages sum up their token counts."""
msgs = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there, how can I help you today?"},
]
result = count_messages_tokens(msgs)
assert result > count_messages_tokens([msgs[0]])
def test_message_without_content(self):
"""Message without content field contributes 0 tokens."""
msgs = [{"role": "system"}]
assert count_messages_tokens(msgs) == 0
class TestLoadSystemPrompt:
"""Tests for load_system_prompt function."""
def test_loads_from_prompts_dir(self, tmp_path):
"""Loads systemprompt.md from PROMPTS_DIR."""
import app.utils as utils_module
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "systemprompt.md").write_text("You are Vera.")
with patch.object(utils_module, "PROMPTS_DIR", prompts_dir):
result = utils_module.load_system_prompt()
assert result == "You are Vera."
def test_falls_back_to_static_dir(self, tmp_path):
"""Falls back to STATIC_DIR when PROMPTS_DIR has no file."""
import app.utils as utils_module
prompts_dir = tmp_path / "no_prompts" # does not exist
static_dir = tmp_path / "static"
static_dir.mkdir()
(static_dir / "systemprompt.md").write_text("Static Vera.")
with patch.object(utils_module, "PROMPTS_DIR", prompts_dir), \
patch.object(utils_module, "STATIC_DIR", static_dir):
result = utils_module.load_system_prompt()
assert result == "Static Vera."
def test_returns_empty_when_not_found(self, tmp_path):
"""Returns empty string when systemprompt.md not found anywhere."""
import app.utils as utils_module
with patch.object(utils_module, "PROMPTS_DIR", tmp_path / "nope"), \
patch.object(utils_module, "STATIC_DIR", tmp_path / "also_nope"):
result = utils_module.load_system_prompt()
assert result == ""
class TestFilterMemoriesByTime: class TestFilterMemoriesByTime:
"""Tests for filter_memories_by_time function.""" """Tests for filter_memories_by_time function."""
def test_includes_recent_memory(self): def test_includes_recent_memory(self):
"""Memory with timestamp in the last 24h should be included.""" """Memory with timestamp in the last 24h should be included."""
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from app.utils import filter_memories_by_time from app.utils import filter_memories_by_time
ts = (datetime.utcnow() - timedelta(hours=1)).isoformat() ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1)).isoformat()
memories = [{"timestamp": ts, "text": "recent"}] memories = [{"timestamp": ts, "text": "recent"}]
result = filter_memories_by_time(memories, hours=24) result = filter_memories_by_time(memories, hours=24)
assert len(result) == 1 assert len(result) == 1
def test_excludes_old_memory(self): def test_excludes_old_memory(self):
"""Memory older than cutoff should be excluded.""" """Memory older than cutoff should be excluded."""
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from app.utils import filter_memories_by_time from app.utils import filter_memories_by_time
ts = (datetime.utcnow() - timedelta(hours=48)).isoformat() ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=48)).isoformat()
memories = [{"timestamp": ts, "text": "old"}] memories = [{"timestamp": ts, "text": "old"}]
result = filter_memories_by_time(memories, hours=24) result = filter_memories_by_time(memories, hours=24)
assert len(result) == 0 assert len(result) == 0
@@ -124,6 +195,16 @@ class TestFilterMemoriesByTime:
result = filter_memories_by_time(memories, hours=24) result = filter_memories_by_time(memories, hours=24)
assert len(result) == 1 assert len(result) == 1
def test_z_suffix_old_timestamp_excluded(self):
"""Regression: chained .replace() was not properly handling Z suffix on old timestamps."""
from datetime import datetime, timedelta, timezone
from app.utils import filter_memories_by_time
old_ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=48)).isoformat() + "Z"
memories = [{"timestamp": old_ts, "text": "old with Z"}]
result = filter_memories_by_time(memories, hours=24)
assert len(result) == 0, f"Old Z-suffixed timestamp should be excluded but wasn't: {old_ts}"
def test_empty_list(self): def test_empty_list(self):
"""Empty input returns empty list.""" """Empty input returns empty list."""
from app.utils import filter_memories_by_time from app.utils import filter_memories_by_time
@@ -132,10 +213,10 @@ class TestFilterMemoriesByTime:
def test_z_suffix_timestamp(self): def test_z_suffix_timestamp(self):
"""ISO timestamp with Z suffix should be handled correctly.""" """ISO timestamp with Z suffix should be handled correctly."""
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from app.utils import filter_memories_by_time from app.utils import filter_memories_by_time
ts = (datetime.utcnow() - timedelta(hours=1)).isoformat() + "Z" ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1)).isoformat() + "Z"
memories = [{"timestamp": ts, "text": "recent with Z"}] memories = [{"timestamp": ts, "text": "recent with Z"}]
result = filter_memories_by_time(memories, hours=24) result = filter_memories_by_time(memories, hours=24)
assert len(result) == 1 assert len(result) == 1
@@ -190,37 +271,6 @@ class TestMergeMemories:
assert len(result["ids"]) == 2 assert len(result["ids"]) == 2
class TestCalculateTokenBudget:
"""Tests for calculate_token_budget function."""
def test_default_ratios_sum(self):
"""Default ratios should sum to 1.0 (system+semantic+context)."""
from app.utils import calculate_token_budget
result = calculate_token_budget(1000)
assert result["system"] + result["semantic"] + result["context"] == 1000
def test_custom_ratios(self):
"""Custom ratios should produce correct proportional budgets."""
from app.utils import calculate_token_budget
result = calculate_token_budget(
100, system_ratio=0.1, semantic_ratio=0.6, context_ratio=0.3
)
assert result["system"] == 10
assert result["semantic"] == 60
assert result["context"] == 30
def test_zero_budget(self):
"""Zero total budget yields all zeros."""
from app.utils import calculate_token_budget
result = calculate_token_budget(0)
assert result["system"] == 0
assert result["semantic"] == 0
assert result["context"] == 0
class TestBuildAugmentedMessages: class TestBuildAugmentedMessages:
"""Tests for build_augmented_messages function (mocked I/O).""" """Tests for build_augmented_messages function (mocked I/O)."""
@@ -316,4 +366,72 @@ class TestBuildAugmentedMessages:
) )
contents = [m["content"] for m in result] contents = [m["content"] for m in result]
assert any("Old question" in c or "Old answer" in c for c in contents) assert any("Old question" in c or "Old answer" in c for c in contents)
@pytest.mark.asyncio
async def test_system_prompt_appends_to_caller_system(self):
"""systemprompt.md content appends to caller's system message."""
import app.utils as utils_module
mock_qdrant = self._make_qdrant_mock()
with patch.object(utils_module, "load_system_prompt", return_value="Vera memory context"), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
incoming = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"}
]
result = await build_augmented_messages(incoming)
system_msg = result[0]
assert system_msg["role"] == "system"
assert system_msg["content"] == "You are a helpful assistant.\n\nVera memory context"
@pytest.mark.asyncio
async def test_empty_system_prompt_passthrough(self):
"""When systemprompt.md is empty, only caller's system message passes through."""
import app.utils as utils_module
mock_qdrant = self._make_qdrant_mock()
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
incoming = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"}
]
result = await build_augmented_messages(incoming)
system_msg = result[0]
assert system_msg["content"] == "You are a helpful assistant."
@pytest.mark.asyncio
async def test_no_caller_system_with_vera_prompt(self):
"""When caller sends no system message but systemprompt.md exists, use vera prompt."""
import app.utils as utils_module
mock_qdrant = self._make_qdrant_mock()
with patch.object(utils_module, "load_system_prompt", return_value="Vera memory context"), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
incoming = [{"role": "user", "content": "Hello"}]
result = await build_augmented_messages(incoming)
system_msg = result[0]
assert system_msg["role"] == "system"
assert system_msg["content"] == "Vera memory context"
@pytest.mark.asyncio
async def test_no_system_anywhere(self):
"""When neither caller nor systemprompt.md provides system content, no system message."""
import app.utils as utils_module
mock_qdrant = self._make_qdrant_mock()
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
incoming = [{"role": "user", "content": "Hello"}]
result = await build_augmented_messages(incoming)
# First message should be user, not system
assert result[0]["role"] == "user"