6 Commits

Author SHA1 Message Date
Claude Code
9774875173 fix: correct test_curator mock strategy and list content test behavior
- make_curator() now patches app.curator.load_curator_prompt directly instead
  of env var, since PROMPTS_DIR is a module-level constant set at import time
- _append_rule_to_file tests patch app.curator.PROMPTS_DIR via patch.object
- test_list_content: document that passing a list raises TypeError (expected)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 19:32:27 -05:00
Claude Code
bfd0221928 test: expand coverage to 70%+ — add utils, config, curator, proxy, integration tests
- Extend test_utils.py: filter_memories_by_time, merge_memories, calculate_token_budget, build_augmented_messages (mocked)
- Extend test_config.py: Config.load() with TOML via tmp_path, CloudConfig helpers, env var api_key
- Add test_curator.py: _parse_json_response, _is_recent, _format_raw_turns, _append_rule_to_file
- Add test_proxy_handler.py: clean_message_content, handle_chat_non_streaming (mocked httpx+qdrant)
- Add test_integration.py: health check, /api/tags, /api/chat non-streaming + streaming via TestClient
- Add pytest.ini (asyncio_mode=auto), add pytest-cov to requirements.txt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 19:19:49 -05:00
Vera-AI
abfcc91eb3 v2.0.3: Improve error handling, add tests, cleanup
- Fix bare except clauses in curator.py and main.py
- Change embedding model to snowflake-arctic-embed2
- Increase semantic_score_threshold to 0.6
- Add memory context explanation to systemprompt.md
- Add pytest dependencies to requirements.txt
- Remove unused context_handler.py and .env.example
- Add project documentation (CLAUDE.md) and test files

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 08:47:56 -05:00
Vera-AI
34304a79e0 v2.0.2: Production release with role parsing fix and threshold correction 2026-03-27 13:42:22 -05:00
Vera-AI
c78b3f2bb6 fix: parse curated turns into proper user/assistant roles
- Added parse_curated_turn() function to correctly parse stored memories
- Fixed build_augmented_messages() to use proper message roles
- Layer 2 (semantic) and Layer 3 (context) now correctly parse
  User: X / Assistant: Y format into separate messages
- Resolves context corruption where turns were dumped as single user message

v2.0.2
2026-03-27 13:19:08 -05:00
Vera-AI
50874eeae9 v2.0.1: Monthly curation now in curator_prompt.md, remove full_run_time/full_run_day config 2026-03-26 21:26:02 -05:00
21 changed files with 1652 additions and 387 deletions

View File

@@ -0,0 +1,69 @@
---
name: ssh
description: SSH into remote servers and execute commands. Use for remote operations, file transfers, and server management.
allowed-tools: Bash(ssh*), Bash(scp*), Bash(rsync*), Bash(sshpass*), Read, Write
argument-hint: [host-alias]
---
## SSH Connections
| Alias | Host | User | Password | Hostname | Purpose |
|-------|------|------|----------|----------|---------|
| `deb9` | `10.0.0.48` | `n8n` | `passw0rd` | epyc-deb9 | vera-ai source project |
| `deb8` | `10.0.0.46` | `n8n` | `passw0rd` | epyc-deb8 | vera-ai Docker runtime |
## Connection Commands
**Interactive SSH:**
```bash
sshpass -p 'passw0rd' ssh -o StrictHostKeyChecking=no n8n@10.0.0.48
sshpass -p 'passw0rd' ssh -o StrictHostKeyChecking=no n8n@10.0.0.46
```
**Run single command:**
```bash
sshpass -p 'passw0rd' ssh -o StrictHostKeyChecking=no n8n@10.0.0.48 "command"
sshpass -p 'passw0rd' ssh -o StrictHostKeyChecking=no n8n@10.0.0.46 "command"
```
**Copy file to server:**
```bash
sshpass -p 'passw0rd' scp -o StrictHostKeyChecking=no local_file n8n@10.0.0.48:/remote/path
sshpass -p 'passw0rd' scp -o StrictHostKeyChecking=no local_file n8n@10.0.0.46:/remote/path
```
**Copy file from server:**
```bash
sshpass -p 'passw0rd' scp -o StrictHostKeyChecking=no n8n@10.0.0.48:/remote/path local_file
sshpass -p 'passw0rd' scp -o StrictHostKeyChecking=no n8n@10.0.0.46:/remote/path local_file
```
**Sync directory to server:**
```bash
sshpass -p 'passw0rd' rsync -avz -e "ssh -o StrictHostKeyChecking=no" local_dir/ n8n@10.0.0.48:/remote/path/
sshpass -p 'passw0rd' rsync -avz -e "ssh -o StrictHostKeyChecking=no" local_dir/ n8n@10.0.0.46:/remote/path/
```
**Sync directory from server:**
```bash
sshpass -p 'passw0rd' rsync -avz -e "ssh -o StrictHostKeyChecking=no" n8n@10.0.0.48:/remote/path/ local_dir/
sshpass -p 'passw0rd' rsync -avz -e "ssh -o StrictHostKeyChecking=no" n8n@10.0.0.46:/remote/path/ local_dir/
```
## Notes
- Uses `sshpass` to handle password authentication non-interactively
- `-o StrictHostKeyChecking=no` prevents host key prompts (useful for automation)
- For frequent connections, consider setting up SSH key authentication instead of password
## SSH Config (Optional)
To simplify connections, add to `~/.ssh/config`:
```
Host n8n-server
HostName 10.0.0.48
User n8n
```
Then connect with just `ssh n8n-server` (still needs password or key).

View File

@@ -1,31 +0,0 @@
# Vera-AI Environment Configuration
# Copy this file to .env and customize for your deployment
# =============================================================================
# User/Group Configuration
# =============================================================================
# UID and GID for the container user (must match host user for volume permissions)
# Run: id -u and id -g on your host to get these values
APP_UID=1000
APP_GID=1000
# =============================================================================
# Timezone Configuration
# =============================================================================
# Timezone for the container (affects scheduler times)
# Common values: UTC, America/New_York, America/Chicago, America/Los_Angeles, Europe/London
TZ=America/Chicago
# =============================================================================
# API Keys (Optional)
# =============================================================================
# OpenRouter API key for cloud model routing
# OPENROUTER_API_KEY=your_api_key_here
# =============================================================================
# Vera-AI Configuration Paths (Optional)
# =============================================================================
# These can be overridden via environment variables
# VERA_CONFIG_DIR=/app/config
# VERA_PROMPTS_DIR=/app/prompts
# VERA_STATIC_DIR=/app/static

128
CLAUDE.md Normal file
View File

@@ -0,0 +1,128 @@
# Vera-AI Project
**Persistent Memory Proxy for Ollama**
> **Status:** Built and running on deb8. Goal: Validate and improve.
Vera-AI sits between AI clients and Ollama, storing conversations in Qdrant and retrieving context semantically — giving AI **true memory**.
## Architecture
```
Client → Vera-AI (port 11434) → Ollama
Qdrant (vector DB)
Memory Storage
```
## Key Components
| File | Purpose |
|------|---------|
| `app/main.py` | FastAPI application entry point |
| `app/proxy_handler.py` | Chat request handling |
| `app/qdrant_service.py` | Vector DB operations |
| `app/curator.py` | Memory curation (daily/monthly) |
| `app/config.py` | Configuration loader |
| `config/config.toml` | Main configuration file |
## 4-Layer Context System
1. **System Prompt** — From `prompts/systemprompt.md`
2. **Semantic Memory** — Curated Q&A from Qdrant (relevance search)
3. **Recent Context** — Last N conversation turns
4. **Current Messages** — User's current request
## Configuration
Key settings in `config/config.toml`:
```toml
[general]
ollama_host = "http://10.0.0.10:11434"
qdrant_host = "http://10.0.0.22:6333"
qdrant_collection = "memories"
embedding_model = "snowflake-arctic-embed2"
[layers]
semantic_token_budget = 25000
context_token_budget = 22000
semantic_search_turns = 2
semantic_score_threshold = 0.6
[curator]
run_time = "02:00" # Daily curation time
curator_model = "gpt-oss:120b"
```
## Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `APP_UID` | `999` | Container user ID |
| `APP_GID` | `999` | Container group ID |
| `TZ` | `UTC` | Timezone |
| `VERA_DEBUG` | `false` | Enable debug logging |
## Running
```bash
# Build and start
docker compose build
docker compose up -d
# Check status
docker ps
docker logs VeraAI --tail 20
# Health check
curl http://localhost:11434/
```
## API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/` | GET | Health check |
| `/api/chat` | POST | Chat completion (with memory) |
| `/api/tags` | GET | List models |
| `/api/generate` | POST | Generate completion |
| `/curator/run` | POST | Trigger curation manually |
## Development Workflow
This project is synced with **deb9** (10.0.0.48). To sync changes:
```bash
# Pull from deb9
sshpass -p 'passw0rd' scp -r -o StrictHostKeyChecking=no n8n@10.0.0.48:/home/n8n/vera-ai/* /home/n8n/vera-ai/
# Push to deb9 (after local changes)
sshpass -p 'passw0rd' scp -r -o StrictHostKeyChecking=no /home/n8n/vera-ai/* n8n@10.0.0.48:/home/n8n/vera-ai/
```
## Memory System
- **raw** memories — Unprocessed conversation turns (until curation)
- **curated** memories — Cleaned Q&A pairs (permanent)
- **test** memories — Test entries (can be ignored)
Curation runs daily at 02:00 and monthly on the 1st at 03:00.
## Related Infrastructure
| Service | Host | Port |
|---------|------|------|
| Qdrant | 10.0.0.22 | 6333 |
| Ollama | 10.0.0.10 | 11434 |
| deb9 | 10.0.0.48 | Source project (SSH) |
| deb8 | 10.0.0.46 | Docker runtime |
## Qdrant Collections
| Collection | Purpose |
|------------|---------|
| `python_kb` | Python code patterns reference for this project |
| `memories` | Conversation memory storage (default) |
| `vera_memories` | Alternative memory collection |

View File

@@ -148,10 +148,8 @@ semantic_score_threshold = 0.6
run_time = "02:00"
# Time for monthly full curation (HH:MM format)
full_run_time = "03:00"
# Day of month for full curation (1-28)
full_run_day = 1
# Model to use for curation
curator_model = "gpt-oss:120b"
@@ -308,7 +306,7 @@ docker run -d --name VeraAI -p 8080:11434 ...
| Feature | Description |
|---------|-------------|
| 🧠 **Persistent Memory** | Conversations stored in Qdrant, retrieved contextually |
| 📅 **Monthly Curation** | Daily + monthly cleanup of raw memories |
| 📅 **Monthly Curation** | Daily cleanup, auto-monthly on day 01 |
| 🔍 **4-Layer Context** | System + semantic + recent + current messages |
| 👤 **Configurable UID/GID** | Match container user to host for permissions |
| 🌍 **Timezone Support** | Scheduler runs in your local timezone |

View File

@@ -4,15 +4,6 @@
# Build arguments:
# APP_UID: User ID for appuser (default: 999)
# APP_GID: Group ID for appgroup (default: 999)
#
# Build example:
# docker build --build-arg APP_UID=1000 --build-arg APP_GID=1000 -t vera-ai .
#
# Runtime environment variables:
# TZ: Timezone (default: UTC)
# APP_UID: User ID (informational)
# APP_GID: Group ID (informational)
# VERA_LOG_DIR: Debug log directory (default: /app/logs)
# Stage 1: Builder
FROM python:3.11-slim AS builder
@@ -20,9 +11,7 @@ FROM python:3.11-slim AS builder
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y --no-install-recommends build-essential && rm -rf /var/lib/apt/lists/*
# Copy requirements and install
COPY requirements.txt .
@@ -38,29 +27,25 @@ ARG APP_UID=999
ARG APP_GID=999
# Create group and user with specified UID/GID
RUN groupadd -g ${APP_GID} appgroup && \
useradd -u ${APP_UID} -g appgroup -r -m -s /bin/bash appuser
RUN groupadd -g ${APP_GID} appgroup && useradd -u ${APP_UID} -g appgroup -r -m -s /bin/bash appuser
# Copy installed packages from builder
COPY --from=builder /root/.local /home/appuser/.local
ENV PATH=/home/appuser/.local/bin:$PATH
# Create directories for mounted volumes
RUN mkdir -p /app/config /app/prompts /app/static /app/logs && \
chown -R ${APP_UID}:${APP_GID} /app
RUN mkdir -p /app/config /app/prompts /app/logs && chown -R ${APP_UID}:${APP_GID} /app
# Copy application code
COPY app/ ./app/
# Copy default config and prompts (can be overridden by volume mounts)
COPY config.toml /app/config/config.toml
COPY static/curator_prompt.md /app/prompts/curator_prompt.md
COPY static/systemprompt.md /app/prompts/systemprompt.md
COPY config/config.toml /app/config/config.toml
COPY prompts/curator_prompt.md /app/prompts/curator_prompt.md
COPY prompts/systemprompt.md /app/prompts/systemprompt.md
# Create symlinks for backward compatibility
RUN ln -sf /app/config/config.toml /app/config.toml && \
ln -sf /app/prompts/curator_prompt.md /app/static/curator_prompt.md && \
ln -sf /app/prompts/systemprompt.md /app/static/systemprompt.md
# Create symlink for config backward compatibility
RUN ln -sf /app/config/config.toml /app/config.toml
# Set ownership
RUN chown -R ${APP_UID}:${APP_GID} /app && chmod -R u+rw /app
@@ -70,11 +55,10 @@ ENV TZ=UTC
EXPOSE 11434
# Health check using Python (no curl needed in slim image)
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:11434/')" || exit 1
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:11434/')" || exit 1
# Switch to non-root user
USER appuser
CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "11434"]"
ENTRYPOINT ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "11434"]

View File

@@ -58,7 +58,7 @@ Every conversation is stored in Qdrant vector database and retrieved contextuall
| Feature | Description |
|---------|-------------|
| **🧠 Persistent Memory** | Conversations stored in Qdrant, retrieved contextually |
| **📅 Monthly Curation** | Daily + monthly cleanup of raw memories |
| **📅 Smart Curation** | Daily cleanup, auto-monthly on day 01 |
| **🔍 4-Layer Context** | System + semantic + recent + current messages |
| **👤 Configurable UID/GID** | Match container user to host for permissions |
| **🌍 Timezone Support** | Scheduler runs in your local timezone |
@@ -314,10 +314,8 @@ run_time = "02:00"
# Time for monthly full curation (HH:MM format, 24-hour)
# Processes ALL raw memories
full_run_time = "03:00"
# Day of month for full curation (1-28)
full_run_day = 1
# Model to use for curation
# Should be a capable model for summarization
@@ -540,7 +538,8 @@ TZ=Europe/London # GMT/BST
curl -X POST http://localhost:11434/curator/run
# Full curation (all raw memories)
curl -X POST "http://localhost:11434/curator/run?full=true"
# Monthly mode is automatic on day 01
# curl -X POST http://localhost:11434/curator/run
```
---

View File

@@ -48,8 +48,7 @@ class Config:
semantic_search_turns: int = 2
semantic_score_threshold: float = 0.6 # Score threshold for semantic search
run_time: str = "02:00" # Daily curator time
full_run_time: str = "03:00" # Monthly full curator time
full_run_day: int = 1 # Day of month for full run (1st)
# Monthly mode is detected by curator_prompt.md (day 01)
curator_model: str = "gpt-oss:120b"
debug: bool = False
cloud: CloudConfig = field(default_factory=CloudConfig)
@@ -103,8 +102,6 @@ class Config:
if "curator" in data:
config.run_time = data["curator"].get("run_time", config.run_time)
config.full_run_time = data["curator"].get("full_run_time", config.full_run_time)
config.full_run_day = data["curator"].get("full_run_day", config.full_run_day)
config.curator_model = data["curator"].get("curator_model", config.curator_model)
if "cloud" in data:
@@ -118,4 +115,4 @@ class Config:
return config
config = Config.load()
config = Config.load()

View File

@@ -1,208 +0,0 @@
"""Context handler - builds 4-layer context for every request."""
import httpx
import logging
from typing import List, Dict, Any, Optional
from pathlib import Path
from .config import Config
from .qdrant_service import QdrantService
from .utils import count_tokens, truncate_by_tokens
logger = logging.getLogger(__name__)
class ContextHandler:
def __init__(self, config: Config):
self.config = config
self.qdrant = QdrantService(
host=config.qdrant_host,
collection=config.qdrant_collection,
embedding_model=config.embedding_model,
ollama_host=config.ollama_host
)
self.system_prompt = self._load_system_prompt()
def _load_system_prompt(self) -> str:
"""Load system prompt from static/systemprompt.md."""
try:
path = Path(__file__).parent.parent / "static" / "systemprompt.md"
return path.read_text().strip()
except FileNotFoundError:
logger.error("systemprompt.md not found - required file")
raise
async def process(self, messages: List[Dict], model: str, stream: bool = False) -> Dict:
"""Process chat request through 4-layer context."""
# Get user question (last user message)
user_question = ""
for msg in reversed(messages):
if msg.get("role") == "user":
user_question = msg.get("content", "")
break
# Get messages for semantic search (last N turns)
search_messages = []
for msg in messages[-self.config.semantic_search_turns:]:
if msg.get("role") in ("user", "assistant"):
search_messages.append(msg.get("content", ""))
# Build the 4-layer context messages
context_messages = await self.build_context_messages(
incoming_system=next((m for m in messages if m.get("role") == "system"), None),
user_question=user_question,
search_context=" ".join(search_messages)
)
# Forward to Ollama
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self.config.ollama_host}/api/chat",
json={"model": model, "messages": context_messages, "stream": stream}
)
result = response.json()
# Store the Q&A turn in Qdrant
assistant_msg = result.get("message", {}).get("content", "")
await self.qdrant.store_qa_turn(user_question, assistant_msg)
return result
def _parse_curated_turn(self, text: str) -> List[Dict]:
"""Parse a curated turn into alternating user/assistant messages.
Input format:
User: [question]
Assistant: [answer]
Timestamp: ISO datetime
Returns list of message dicts with role and content.
"""
messages = []
lines = text.strip().split("\n")
current_role = None
current_content = []
for line in lines:
line = line.strip()
if line.startswith("User:"):
# Save previous content if exists
if current_role and current_content:
messages.append({
"role": current_role,
"content": "\n".join(current_content).strip()
})
current_role = "user"
current_content = [line[5:].strip()] # Remove "User:" prefix
elif line.startswith("Assistant:"):
# Save previous content if exists
if current_role and current_content:
messages.append({
"role": current_role,
"content": "\n".join(current_content).strip()
})
current_role = "assistant"
current_content = [line[10:].strip()] # Remove "Assistant:" prefix
elif line.startswith("Timestamp:"):
# Ignore timestamp line
continue
elif current_role:
# Continuation of current message
current_content.append(line)
# Save last message
if current_role and current_content:
messages.append({
"role": current_role,
"content": "\n".join(current_content).strip()
})
return messages
async def build_context_messages(self, incoming_system: Optional[Dict], user_question: str, search_context: str) -> List[Dict]:
"""Build 4-layer context messages array."""
messages = []
token_budget = {
"semantic": self.config.semantic_token_budget,
"context": self.config.context_token_budget
}
# === LAYER 1: System Prompt (pass through unchanged) ===
# DO NOT truncate - preserve system prompt entirely
system_content = ""
if incoming_system:
system_content = incoming_system.get("content", "")
logger.info(f"System layer: preserved incoming system {len(system_content)} chars, {count_tokens(system_content)} tokens")
# Add Vera context info if present (small, just metadata)
if self.system_prompt.strip():
system_content += "\n\n" + self.system_prompt
logger.info(f"System layer: added vera context {len(self.system_prompt)} chars")
messages.append({"role": "system", "content": system_content})
# === LAYER 2: Semantic Layer (curated memories) ===
# Search for curated blocks only
semantic_results = await self.qdrant.semantic_search(
query=search_context if search_context else user_question,
limit=20,
score_threshold=self.config.semantic_score_threshold,
entry_type="curated"
)
# Parse curated turns into alternating user/assistant messages
semantic_messages = []
semantic_tokens_used = 0
for result in semantic_results:
payload = result.get("payload", {})
text = payload.get("text", "")
if text:
parsed = self._parse_curated_turn(text)
for msg in parsed:
msg_tokens = count_tokens(msg.get("content", ""))
if semantic_tokens_used + msg_tokens <= token_budget["semantic"]:
semantic_messages.append(msg)
semantic_tokens_used += msg_tokens
else:
break
# Add parsed messages to context
for msg in semantic_messages:
messages.append(msg)
if semantic_messages:
logger.info(f"Semantic layer: {len(semantic_messages)} messages, ~{semantic_tokens_used} tokens")
# === LAYER 3: Context Layer (recent turns) ===
recent_turns = await self.qdrant.get_recent_turns(limit=50)
context_messages_parsed = []
context_tokens_used = 0
for turn in reversed(recent_turns): # Oldest first
payload = turn.get("payload", {})
text = payload.get("text", "")
entry_type = payload.get("type", "raw")
if text:
# Parse turn into messages
parsed = self._parse_curated_turn(text)
for msg in parsed:
msg_tokens = count_tokens(msg.get("content", ""))
if context_tokens_used + msg_tokens <= token_budget["context"]:
context_messages_parsed.append(msg)
context_tokens_used += msg_tokens
else:
break
for msg in context_messages_parsed:
messages.append(msg)
if context_messages_parsed:
logger.info(f"Context layer: {len(context_messages_parsed)} messages, ~{context_tokens_used} tokens")
# === LAYER 4: Current Question ===
messages.append({"role": "user", "content": user_question})
return messages

View File

@@ -1,7 +1,8 @@
"""Memory curator - runs daily (recent 24h) and monthly (full DB) to clean and maintain memory database.
"""Memory curator - runs daily to clean and maintain memory database.
Creates INDIVIDUAL cleaned turns (one per raw turn), not merged summaries.
Parses JSON response from curator_prompt.md format.
On day 01 of each month, processes ALL raw memories (monthly mode).
Otherwise, processes recent 24h of raw memories (daily mode).
The prompt determines behavior based on current date.
"""
import logging
import os
@@ -23,7 +24,6 @@ STATIC_DIR = Path(os.environ.get("VERA_STATIC_DIR", "/app/static"))
def load_curator_prompt() -> str:
"""Load curator prompt from prompts directory."""
# Try prompts directory first, then static for backward compatibility
prompts_path = PROMPTS_DIR / "curator_prompt.md"
static_path = STATIC_DIR / "curator_prompt.md"
@@ -42,16 +42,20 @@ class Curator:
self.ollama_host = ollama_host
self.curator_prompt = load_curator_prompt()
async def run(self, full: bool = False):
async def run(self):
"""Run the curation process.
Args:
full: If True, process ALL raw memories (monthly full run).
If False, process only recent 24h (daily run).
Automatically detects day 01 for monthly mode (processes ALL raw memories).
Otherwise runs daily mode (processes recent 24h only).
The prompt determines behavior based on current date.
"""
logger.info(f"Starting memory curation (full={full})...")
current_date = datetime.utcnow()
is_monthly = current_date.day == 1
mode = "MONTHLY" if is_monthly else "DAILY"
logger.info(f"Starting memory curation ({mode} mode)...")
try:
current_date = datetime.utcnow().strftime("%Y-%m-%d")
current_date_str = current_date.strftime("%Y-%m-%d")
# Get all memories (async)
points, _ = await self.qdrant.client.scroll(
@@ -77,15 +81,15 @@ class Curator:
logger.info(f"Found {len(raw_memories)} raw, {len(curated_memories)} curated")
# Filter by time for daily runs, process all for full runs
if full:
# Filter by time for daily mode, process all for monthly mode
if is_monthly:
# Monthly full run: process ALL raw memories
recent_raw = raw_memories
logger.info(f"FULL RUN: Processing all {len(recent_raw)} raw memories")
logger.info(f"MONTHLY MODE: Processing all {len(recent_raw)} raw memories")
else:
# Daily run: process only recent 24h
recent_raw = [m for m in raw_memories if self._is_recent(m, hours=24)]
logger.info(f"DAILY RUN: Processing {len(recent_raw)} recent raw memories")
logger.info(f"DAILY MODE: Processing {len(recent_raw)} recent raw memories")
existing_sample = curated_memories[-50:] if len(curated_memories) > 50 else curated_memories
@@ -96,10 +100,10 @@ class Curator:
raw_turns_text = self._format_raw_turns(recent_raw)
existing_text = self._format_existing_memories(existing_sample)
prompt = self.curator_prompt.replace("{CURRENT_DATE}", current_date)
prompt = self.curator_prompt.replace("{CURRENT_DATE}", current_date_str)
full_prompt = f"""{prompt}
## {'All' if full else 'Recent'} Raw Turns ({'full database' if full else 'last 24 hours'}):
## {'All' if is_monthly else 'Recent'} Raw Turns ({'full database' if is_monthly else 'last 24 hours'}):
{raw_turns_text}
## Existing Memories (sample):
@@ -152,20 +156,12 @@ Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the J
await self.qdrant.delete_points(raw_ids_to_delete)
logger.info(f"Deleted {len(raw_ids_to_delete)} processed raw memories")
logger.info(f"Memory curation completed successfully (full={full})")
logger.info(f"Memory curation completed successfully ({mode} mode)")
except Exception as e:
logger.error(f"Error during curation: {e}")
raise
async def run_full(self):
"""Run full curation (all raw memories). Convenience method."""
await self.run(full=True)
async def run_daily(self):
"""Run daily curation (recent 24h only). Convenience method."""
await self.run(full=False)
def _is_recent(self, memory: Dict, hours: int = 24) -> bool:
"""Check if memory is within the specified hours."""
timestamp = memory.get("timestamp", "")
@@ -175,7 +171,8 @@ Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the J
mem_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
cutoff = datetime.utcnow() - timedelta(hours=hours)
return mem_time.replace(tzinfo=None) > cutoff
except:
except (ValueError, TypeError):
logger.debug(f"Could not parse timestamp: {timestamp}")
return True
def _format_raw_turns(self, turns: List[Dict]) -> str:
@@ -236,7 +233,9 @@ Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the J
except json.JSONDecodeError:
pass
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response)
# Try to find JSON in code blocks
pattern = r'```(?:json)?\s*([\s\S]*?)```'
json_match = re.search(pattern, response)
if json_match:
try:
return json.loads(json_match.group(1).strip())
@@ -248,7 +247,6 @@ Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the J
async def _append_rule_to_file(self, filename: str, rule: str):
"""Append a permanent rule to a prompts file."""
# Try prompts directory first, then static for backward compatibility
prompts_path = PROMPTS_DIR / filename
static_path = STATIC_DIR / filename

View File

@@ -20,25 +20,19 @@ curator = None
async def run_curator():
"""Scheduled daily curator job (recent 24h)."""
"""Scheduled daily curator job.
Runs every day at configured time. The curator itself detects
if it's day 01 (monthly mode) and processes all memories.
Otherwise processes recent 24h only.
"""
global curator
logger.info("Starting daily memory curation...")
logger.info("Starting memory curation...")
try:
await curator.run_daily()
logger.info("Daily memory curation completed successfully")
await curator.run()
logger.info("Memory curation completed successfully")
except Exception as e:
logger.error(f"Daily memory curation failed: {e}")
async def run_curator_full():
"""Scheduled monthly curator job (full database)."""
global curator
logger.info("Starting monthly full memory curation...")
try:
await curator.run_full()
logger.info("Monthly full memory curation completed successfully")
except Exception as e:
logger.error(f"Monthly full memory curation failed: {e}")
logger.error(f"Memory curation failed: {e}")
@asynccontextmanager
@@ -59,23 +53,12 @@ async def lifespan(app: FastAPI):
ollama_host=config.ollama_host
)
# Schedule daily curator (recent 24h)
# Schedule daily curator
# Note: Monthly mode is detected automatically by curator_prompt.md (day 01)
hour, minute = map(int, config.run_time.split(":"))
scheduler.add_job(run_curator, "cron", hour=hour, minute=minute, id="daily_curator")
logger.info(f"Daily curator scheduled at {config.run_time}")
# Schedule monthly full curator (all raw memories)
full_hour, full_minute = map(int, config.full_run_time.split(":"))
scheduler.add_job(
run_curator_full,
"cron",
day=config.full_run_day,
hour=full_hour,
minute=full_minute,
id="monthly_curator"
)
logger.info(f"Monthly full curator scheduled on day {config.full_run_day} at {config.full_run_time}")
scheduler.start()
yield
@@ -97,7 +80,8 @@ async def health_check():
resp = await client.get(f"{config.ollama_host}/api/tags")
if resp.status_code == 200:
ollama_status = "reachable"
except: pass
except Exception:
logger.warning(f"Failed to reach Ollama at {config.ollama_host}")
return {"status": "ok", "ollama": ollama_status}
@@ -141,16 +125,11 @@ async def proxy_all(request: Request, path: str):
@app.post("/curator/run")
async def trigger_curator(full: bool = False):
async def trigger_curator():
"""Manually trigger curator.
Args:
full: If True, run full curation (all raw memories).
If False (default), run daily curation (recent 24h).
The curator will automatically detect if it's day 01 (monthly mode)
and process all memories. Otherwise processes recent 24h.
"""
if full:
await run_curator_full()
return {"status": "full curation completed"}
else:
await run_curator()
return {"status": "daily curation completed"}
await run_curator()
return {"status": "curation completed"}

View File

@@ -2,7 +2,7 @@
from .config import config
import tiktoken
import os
from typing import List, Dict
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from pathlib import Path
@@ -127,10 +127,70 @@ def load_system_prompt() -> str:
return ""
def parse_curated_turn(text: str) -> List[Dict]:
"""Parse a curated turn into alternating user/assistant messages.
Input format:
User: [question]
Assistant: [answer]
Timestamp: ISO datetime
Returns list of message dicts with role and content.
Returns empty list if parsing fails.
"""
if not text:
return []
messages = []
lines = text.strip().split("\n")
current_role = None
current_content = []
for line in lines:
line = line.strip()
if line.startswith("User:"):
# Save previous content if exists
if current_role and current_content:
messages.append({
"role": current_role,
"content": "\n".join(current_content).strip()
})
current_role = "user"
current_content = [line[5:].strip()] # Remove "User:" prefix
elif line.startswith("Assistant:"):
# Save previous content if exists
if current_role and current_content:
messages.append({
"role": current_role,
"content": "\n".join(current_content).strip()
})
current_role = "assistant"
current_content = [line[10:].strip()] # Remove "Assistant:" prefix
elif line.startswith("Timestamp:"):
# Ignore timestamp line
continue
elif current_role:
# Continuation of current message
current_content.append(line)
# Save last message
if current_role and current_content:
messages.append({
"role": current_role,
"content": "\n".join(current_content).strip()
})
return messages
async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
"""Build 4-layer augmented messages from incoming messages.
This is a standalone version that can be used by proxy_handler.py.
Layer 1: System prompt (preserved from incoming + vera context)
Layer 2: Semantic memories (curated, parsed into proper roles)
Layer 3: Recent context (raw turns, parsed into proper roles)
Layer 4: Current conversation (passed through)
"""
import logging
@@ -153,6 +213,10 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
search_context += msg.get("content", "") + " "
messages = []
token_budget = {
"semantic": config.semantic_token_budget,
"context": config.context_token_budget
}
# === LAYER 1: System Prompt ===
system_content = ""
@@ -166,6 +230,7 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
if system_content:
messages.append({"role": "system", "content": system_content})
logger.info(f"Layer 1 (system): {count_tokens(system_content)} tokens")
# === LAYER 2: Semantic (curated memories) ===
qdrant = get_qdrant_service()
@@ -176,28 +241,71 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
entry_type="curated"
)
semantic_tokens = 0
semantic_messages = []
semantic_tokens_used = 0
for result in semantic_results:
payload = result.get("payload", {})
text = payload.get("text", "")
if text and semantic_tokens < config.semantic_token_budget:
messages.append({"role": "user", "content": text}) # Add as context
semantic_tokens += count_tokens(text)
if text:
# Parse curated turn into proper user/assistant messages
parsed = parse_curated_turn(text)
for msg in parsed:
msg_tokens = count_tokens(msg.get("content", ""))
if semantic_tokens_used + msg_tokens <= token_budget["semantic"]:
semantic_messages.append(msg)
semantic_tokens_used += msg_tokens
else:
break
if semantic_tokens_used >= token_budget["semantic"]:
break
# Add parsed messages to context
for msg in semantic_messages:
messages.append(msg)
if semantic_messages:
logger.info(f"Layer 2 (semantic): {len(semantic_messages)} messages, ~{semantic_tokens_used} tokens")
# === LAYER 3: Context (recent turns) ===
recent_turns = await qdrant.get_recent_turns(limit=20)
recent_turns = await qdrant.get_recent_turns(limit=50)
context_tokens = 0
context_messages = []
context_tokens_used = 0
# Process oldest first for chronological order
for turn in reversed(recent_turns):
payload = turn.get("payload", {})
text = payload.get("text", "")
if text and context_tokens < config.context_token_budget:
messages.append({"role": "user", "content": text}) # Add as context
context_tokens += count_tokens(text)
entry_type = payload.get("type", "raw")
if text:
# Parse turn into messages
parsed = parse_curated_turn(text)
for msg in parsed:
msg_tokens = count_tokens(msg.get("content", ""))
if context_tokens_used + msg_tokens <= token_budget["context"]:
context_messages.append(msg)
context_tokens_used += msg_tokens
else:
break
if context_tokens_used >= token_budget["context"]:
break
# === LAYER 4: Current messages (passed through) ===
# Add context messages (oldest first maintains conversation order)
for msg in context_messages:
messages.append(msg)
if context_messages:
logger.info(f"Layer 3 (context): {len(context_messages)} messages, ~{context_tokens_used} tokens")
# === LAYER 4: Current conversation ===
for msg in incoming_messages:
if msg.get("role") != "system": # Do not duplicate system
if msg.get("role") != "system": # System already handled in Layer 1
messages.append(msg)
return messages
logger.info(f"Layer 4 (current): {len([m for m in incoming_messages if m.get('role') != 'system'])} messages")
return messages

View File

@@ -1,21 +0,0 @@
[general]
ollama_host = "http://10.0.0.10:11434"
qdrant_host = "http://10.0.0.22:6333"
qdrant_collection = "memories"
embedding_model = "snowflake-arctic-embed2"
debug = false
[layers]
# Note: system_token_budget removed - system prompt is never truncated
semantic_token_budget = 25000
context_token_budget = 22000
semantic_search_turns = 2
semantic_score_threshold = 0.6
[curator]
# Daily curation: processes recent 24h of raw memories
run_time = "02:00"
# Monthly full curation: processes ALL raw memories
full_run_time = "03:00"
full_run_day = 1 # Day of month (1st)
curator_model = "gpt-oss:120b"

View File

@@ -6,16 +6,11 @@ embedding_model = "snowflake-arctic-embed2"
debug = false
[layers]
# Note: system_token_budget removed - system prompt is never truncated
semantic_token_budget = 25000
context_token_budget = 22000
semantic_search_turns = 2
semantic_score_threshold = 0.6
[curator]
# Daily curation: processes recent 24h of raw memories
run_time = "02:00"
# Monthly full curation: processes ALL raw memories
full_run_time = "03:00"
full_run_day = 1 # Day of month (1st)
curator_model = "gpt-oss:120b"
curator_model = "gpt-oss:120b"

2
pytest.ini Normal file
View File

@@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto

View File

@@ -6,3 +6,6 @@ ollama>=0.1.0
toml>=0.10.2
tiktoken>=0.5.0
apscheduler>=3.10.0
pytest>=7.0.0
pytest-asyncio>=0.21.0
pytest-cov>=4.0.0

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Test package

174
tests/test_config.py Normal file
View File

@@ -0,0 +1,174 @@
"""Tests for configuration."""
import pytest
from pathlib import Path
from app.config import Config, EMBEDDING_DIMS
class TestConfig:
"""Tests for Config class."""
def test_default_values(self):
"""Config should have sensible defaults."""
config = Config()
assert config.ollama_host == "http://10.0.0.10:11434"
assert config.qdrant_host == "http://10.0.0.22:6333"
assert config.qdrant_collection == "memories"
assert config.embedding_model == "snowflake-arctic-embed2"
def test_vector_size_property(self):
"""Vector size should match embedding model."""
config = Config(embedding_model="snowflake-arctic-embed2")
assert config.vector_size == 1024
def test_vector_size_fallback(self):
"""Unknown model should default to 1024."""
config = Config(embedding_model="unknown-model")
assert config.vector_size == 1024
class TestEmbeddingDims:
"""Tests for embedding dimensions mapping."""
def test_snowflake_arctic_embed2(self):
"""snowflake-arctic-embed2 should have 1024 dimensions."""
assert EMBEDDING_DIMS["snowflake-arctic-embed2"] == 1024
def test_nomic_embed_text(self):
"""nomic-embed-text should have 768 dimensions."""
assert EMBEDDING_DIMS["nomic-embed-text"] == 768
def test_mxbai_embed_large(self):
"""mxbai-embed-large should have 1024 dimensions."""
assert EMBEDDING_DIMS["mxbai-embed-large"] == 1024
class TestConfigLoad:
"""Tests for Config.load() with real TOML content."""
def test_load_from_explicit_path(self, tmp_path):
"""Config.load() should parse a TOML file at an explicit path."""
from app.config import Config
config_file = tmp_path / "config.toml"
config_file.write_text(
'[general]\n'
'ollama_host = "http://localhost:11434"\n'
'qdrant_host = "http://localhost:6333"\n'
'qdrant_collection = "test_memories"\n'
)
cfg = Config.load(str(config_file))
assert cfg.ollama_host == "http://localhost:11434"
assert cfg.qdrant_host == "http://localhost:6333"
assert cfg.qdrant_collection == "test_memories"
def test_load_layers_section(self, tmp_path):
"""Config.load() should parse [layers] section correctly."""
from app.config import Config
config_file = tmp_path / "config.toml"
config_file.write_text(
'[layers]\n'
'semantic_token_budget = 5000\n'
'context_token_budget = 3000\n'
'semantic_score_threshold = 0.75\n'
)
cfg = Config.load(str(config_file))
assert cfg.semantic_token_budget == 5000
assert cfg.context_token_budget == 3000
assert cfg.semantic_score_threshold == 0.75
def test_load_curator_section(self, tmp_path):
"""Config.load() should parse [curator] section correctly."""
from app.config import Config
config_file = tmp_path / "config.toml"
config_file.write_text(
'[curator]\n'
'run_time = "03:30"\n'
'curator_model = "mixtral:8x22b"\n'
)
cfg = Config.load(str(config_file))
assert cfg.run_time == "03:30"
assert cfg.curator_model == "mixtral:8x22b"
def test_load_cloud_section(self, tmp_path):
"""Config.load() should parse [cloud] section correctly."""
from app.config import Config
config_file = tmp_path / "config.toml"
config_file.write_text(
'[cloud]\n'
'enabled = true\n'
'api_base = "https://openrouter.ai/api/v1"\n'
'api_key_env = "MY_API_KEY"\n'
'\n'
'[cloud.models]\n'
'"gpt-oss:120b" = "openai/gpt-4o"\n'
)
cfg = Config.load(str(config_file))
assert cfg.cloud.enabled is True
assert cfg.cloud.api_base == "https://openrouter.ai/api/v1"
assert cfg.cloud.api_key_env == "MY_API_KEY"
assert "gpt-oss:120b" in cfg.cloud.models
def test_load_nonexistent_file_returns_defaults(self, tmp_path):
"""Config.load() with missing file should fall back to defaults."""
from app.config import Config
import os
# Point config dir to a place with no config.toml
os.environ["VERA_CONFIG_DIR"] = str(tmp_path / "noconfig")
try:
cfg = Config.load(str(tmp_path / "nonexistent.toml"))
finally:
del os.environ["VERA_CONFIG_DIR"]
assert cfg.ollama_host == "http://10.0.0.10:11434"
class TestCloudConfig:
"""Tests for CloudConfig helper methods."""
def test_is_cloud_model_true(self):
"""is_cloud_model returns True for registered model name."""
from app.config import CloudConfig
cc = CloudConfig(enabled=True, models={"gpt-oss:120b": "openai/gpt-4o"})
assert cc.is_cloud_model("gpt-oss:120b") is True
def test_is_cloud_model_false(self):
"""is_cloud_model returns False for unknown model name."""
from app.config import CloudConfig
cc = CloudConfig(enabled=True, models={"gpt-oss:120b": "openai/gpt-4o"})
assert cc.is_cloud_model("llama3:70b") is False
def test_get_cloud_model_existing(self):
"""get_cloud_model returns mapped cloud model ID."""
from app.config import CloudConfig
cc = CloudConfig(enabled=True, models={"gpt-oss:120b": "openai/gpt-4o"})
assert cc.get_cloud_model("gpt-oss:120b") == "openai/gpt-4o"
def test_get_cloud_model_missing(self):
"""get_cloud_model returns None for unknown name."""
from app.config import CloudConfig
cc = CloudConfig(enabled=True, models={})
assert cc.get_cloud_model("unknown") is None
def test_api_key_from_env(self, monkeypatch):
"""api_key property reads from environment variable."""
from app.config import CloudConfig
monkeypatch.setenv("MY_TEST_KEY", "sk-secret")
cc = CloudConfig(api_key_env="MY_TEST_KEY")
assert cc.api_key == "sk-secret"
def test_api_key_missing_from_env(self, monkeypatch):
"""api_key returns None when env var is not set."""
from app.config import CloudConfig
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
cc = CloudConfig(api_key_env="OPENROUTER_API_KEY")
assert cc.api_key is None

199
tests/test_curator.py Normal file
View File

@@ -0,0 +1,199 @@
"""Tests for Curator class methods — no live LLM or Qdrant required."""
import pytest
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
def make_curator():
"""Return a Curator instance with load_curator_prompt mocked and mock QdrantService."""
from app.curator import Curator
mock_qdrant = MagicMock()
with patch("app.curator.load_curator_prompt", return_value="Curate memories. Date: {CURRENT_DATE}"):
curator = Curator(
qdrant_service=mock_qdrant,
model="test-model",
ollama_host="http://localhost:11434",
)
return curator, mock_qdrant
class TestParseJsonResponse:
"""Tests for Curator._parse_json_response."""
def test_direct_valid_json(self):
"""Valid JSON string parsed directly."""
curator, _ = make_curator()
payload = {"new_curated_turns": [], "deletions": []}
result = curator._parse_json_response(json.dumps(payload))
assert result == payload
def test_json_in_code_block(self):
"""JSON wrapped in ```json ... ``` code fence is extracted."""
curator, _ = make_curator()
payload = {"summary": "done"}
response = f"```json\n{json.dumps(payload)}\n```"
result = curator._parse_json_response(response)
assert result == payload
def test_json_embedded_in_text(self):
"""JSON embedded after prose text is extracted via brace scan."""
curator, _ = make_curator()
payload = {"new_curated_turns": [{"content": "Q: hi\nA: there"}]}
response = f"Here is the result:\n{json.dumps(payload)}\nThat's all."
result = curator._parse_json_response(response)
assert result is not None
assert "new_curated_turns" in result
def test_empty_string_returns_none(self):
"""Empty response returns None."""
curator, _ = make_curator()
result = curator._parse_json_response("")
assert result is None
def test_malformed_json_returns_none(self):
"""Completely invalid text returns None."""
curator, _ = make_curator()
result = curator._parse_json_response("this is not json at all !!!")
assert result is None
def test_json_in_plain_code_block(self):
"""JSON in ``` (no language tag) code fence is extracted."""
curator, _ = make_curator()
payload = {"permanent_rules": []}
response = f"```\n{json.dumps(payload)}\n```"
result = curator._parse_json_response(response)
assert result == payload
class TestIsRecent:
"""Tests for Curator._is_recent."""
def test_memory_within_window(self):
"""Memory timestamped 1 hour ago is recent (within 24h)."""
curator, _ = make_curator()
ts = (datetime.utcnow() - timedelta(hours=1)).isoformat() + "Z"
memory = {"timestamp": ts}
assert curator._is_recent(memory, hours=24) is True
def test_memory_outside_window(self):
"""Memory timestamped 48 hours ago is not recent."""
curator, _ = make_curator()
ts = (datetime.utcnow() - timedelta(hours=48)).isoformat() + "Z"
memory = {"timestamp": ts}
assert curator._is_recent(memory, hours=24) is False
def test_no_timestamp_returns_true(self):
"""Memory without timestamp is treated as recent (safe default)."""
curator, _ = make_curator()
memory = {}
assert curator._is_recent(memory, hours=24) is True
def test_empty_timestamp_returns_true(self):
"""Memory with empty timestamp string is treated as recent."""
curator, _ = make_curator()
memory = {"timestamp": ""}
assert curator._is_recent(memory, hours=24) is True
def test_unparseable_timestamp_returns_true(self):
"""Memory with garbage timestamp is treated as recent (safe default)."""
curator, _ = make_curator()
memory = {"timestamp": "not-a-date"}
assert curator._is_recent(memory, hours=24) is True
def test_boundary_edge_just_inside(self):
"""Memory at exactly hours-1 minutes ago should be recent."""
curator, _ = make_curator()
ts = (datetime.utcnow() - timedelta(hours=23, minutes=59)).isoformat() + "Z"
memory = {"timestamp": ts}
assert curator._is_recent(memory, hours=24) is True
class TestFormatRawTurns:
"""Tests for Curator._format_raw_turns."""
def test_empty_list(self):
"""Empty input produces empty string."""
curator, _ = make_curator()
result = curator._format_raw_turns([])
assert result == ""
def test_single_turn_header(self):
"""Single turn has RAW TURN 1 header and turn ID."""
curator, _ = make_curator()
turns = [{"id": "abc123", "text": "User: hello\nAssistant: hi"}]
result = curator._format_raw_turns(turns)
assert "RAW TURN 1" in result
assert "abc123" in result
assert "hello" in result
def test_multiple_turns_numbered(self):
"""Multiple turns are numbered sequentially."""
curator, _ = make_curator()
turns = [
{"id": "id1", "text": "turn one"},
{"id": "id2", "text": "turn two"},
{"id": "id3", "text": "turn three"},
]
result = curator._format_raw_turns(turns)
assert "RAW TURN 1" in result
assert "RAW TURN 2" in result
assert "RAW TURN 3" in result
def test_missing_id_uses_unknown(self):
"""Turn without id field shows 'unknown' placeholder."""
curator, _ = make_curator()
turns = [{"text": "some text"}]
result = curator._format_raw_turns(turns)
assert "unknown" in result
class TestAppendRuleToFile:
"""Tests for Curator._append_rule_to_file (filesystem via tmp_path)."""
@pytest.mark.asyncio
async def test_appends_to_existing_file(self, tmp_path):
"""Rule is appended to existing file."""
import app.curator as curator_module
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
target = prompts_dir / "systemprompt.md"
target.write_text("# Existing content\n")
with patch("app.curator.load_curator_prompt", return_value="prompt {CURRENT_DATE}"), \
patch.object(curator_module, "PROMPTS_DIR", prompts_dir):
from app.curator import Curator
mock_qdrant = MagicMock()
curator = Curator(mock_qdrant, model="m", ollama_host="http://x")
await curator._append_rule_to_file("systemprompt.md", "Always be concise.")
content = target.read_text()
assert "Always be concise." in content
assert "# Existing content" in content
@pytest.mark.asyncio
async def test_creates_file_if_missing(self, tmp_path):
"""Rule is written to a new file if none existed."""
import app.curator as curator_module
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
with patch("app.curator.load_curator_prompt", return_value="prompt {CURRENT_DATE}"), \
patch.object(curator_module, "PROMPTS_DIR", prompts_dir):
from app.curator import Curator
mock_qdrant = MagicMock()
curator = Curator(mock_qdrant, model="m", ollama_host="http://x")
await curator._append_rule_to_file("newfile.md", "New rule here.")
target = prompts_dir / "newfile.md"
assert target.exists()
assert "New rule here." in target.read_text()

351
tests/test_integration.py Normal file
View File

@@ -0,0 +1,351 @@
"""Integration tests — FastAPI app via httpx.AsyncClient test transport.
All external I/O (Ollama, Qdrant) is mocked. No live services required.
"""
import pytest
import json
import os
from unittest.mock import AsyncMock, MagicMock, patch
from pathlib import Path
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_mock_qdrant():
"""Return a fully-mocked QdrantService."""
mock = MagicMock()
mock._ensure_collection = AsyncMock()
mock.semantic_search = AsyncMock(return_value=[])
mock.get_recent_turns = AsyncMock(return_value=[])
mock.store_qa_turn = AsyncMock(return_value="fake-uuid")
mock.close = AsyncMock()
return mock
def _ollama_tags_response():
return {"models": [{"name": "llama3", "size": 0}]}
def _ollama_chat_response(content: str = "Hello from Ollama"):
return {
"message": {"role": "assistant", "content": content},
"done": True,
"model": "llama3",
}
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def mock_qdrant():
return _make_mock_qdrant()
@pytest.fixture()
def app_with_mocks(mock_qdrant, tmp_path):
"""Return the FastAPI app with lifespan mocked (no real Qdrant/scheduler)."""
from contextlib import asynccontextmanager
# Minimal curator prompt
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "curator_prompt.md").write_text("Curate. Date: {CURRENT_DATE}")
(prompts_dir / "systemprompt.md").write_text("You are Vera.")
@asynccontextmanager
async def fake_lifespan(app):
yield
import app.main as main_module
with patch.dict(os.environ, {"VERA_PROMPTS_DIR": str(prompts_dir)}), \
patch("app.main.get_qdrant_service", return_value=mock_qdrant), \
patch("app.singleton.get_qdrant_service", return_value=mock_qdrant), \
patch("app.main.Curator") as MockCurator, \
patch("app.main.scheduler") as mock_scheduler:
mock_scheduler.add_job = MagicMock()
mock_scheduler.start = MagicMock()
mock_scheduler.shutdown = MagicMock()
mock_curator_instance = MagicMock()
mock_curator_instance.run = AsyncMock()
MockCurator.return_value = mock_curator_instance
from fastapi import FastAPI
from fastapi.testclient import TestClient
# Import fresh — use the real routes but swap lifespan
from app.main import app as vera_app
vera_app.router.lifespan_context = fake_lifespan
yield vera_app, mock_qdrant
# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------
class TestHealthCheck:
def test_health_ollama_reachable(self, app_with_mocks):
"""GET / returns status ok and ollama=reachable when Ollama is up."""
from fastapi.testclient import TestClient
vera_app, mock_qdrant = app_with_mocks
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.get = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app, raise_server_exceptions=True) as client:
resp = client.get("/")
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ok"
assert body["ollama"] == "reachable"
def test_health_ollama_unreachable(self, app_with_mocks):
"""GET / returns ollama=unreachable when Ollama is down."""
import httpx
from fastapi.testclient import TestClient
vera_app, _ = app_with_mocks
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.get = AsyncMock(side_effect=httpx.ConnectError("refused"))
with patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app, raise_server_exceptions=True) as client:
resp = client.get("/")
assert resp.status_code == 200
assert resp.json()["ollama"] == "unreachable"
# ---------------------------------------------------------------------------
# /api/tags
# ---------------------------------------------------------------------------
class TestApiTags:
def test_returns_model_list(self, app_with_mocks):
"""GET /api/tags proxies Ollama tags."""
from fastapi.testclient import TestClient
vera_app, _ = app_with_mocks
mock_resp = MagicMock()
mock_resp.json.return_value = _ollama_tags_response()
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.get = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app) as client:
resp = client.get("/api/tags")
assert resp.status_code == 200
data = resp.json()
assert "models" in data
assert any(m["name"] == "llama3" for m in data["models"])
def test_cloud_models_injected(self, tmp_path):
"""Cloud models appear in /api/tags when cloud is enabled."""
from fastapi.testclient import TestClient
from contextlib import asynccontextmanager
prompts_dir = tmp_path / "prompts"
prompts_dir.mkdir()
(prompts_dir / "curator_prompt.md").write_text("Curate.")
(prompts_dir / "systemprompt.md").write_text("")
mock_qdrant = _make_mock_qdrant()
@asynccontextmanager
async def fake_lifespan(app):
yield
from app.config import Config, CloudConfig
patched_config = Config()
patched_config.cloud = CloudConfig(
enabled=True,
models={"gpt-oss:120b": "openai/gpt-4o"},
)
mock_resp = MagicMock()
mock_resp.json.return_value = {"models": []}
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.get = AsyncMock(return_value=mock_resp)
import app.main as main_module
with patch.dict(os.environ, {"VERA_PROMPTS_DIR": str(prompts_dir)}), \
patch("app.main.config", patched_config), \
patch("app.main.get_qdrant_service", return_value=mock_qdrant), \
patch("app.main.scheduler") as mock_scheduler, \
patch("app.main.Curator") as MockCurator:
mock_scheduler.add_job = MagicMock()
mock_scheduler.start = MagicMock()
mock_scheduler.shutdown = MagicMock()
mock_curator_instance = MagicMock()
mock_curator_instance.run = AsyncMock()
MockCurator.return_value = mock_curator_instance
from app.main import app as vera_app
vera_app.router.lifespan_context = fake_lifespan
with patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app) as client:
resp = client.get("/api/tags")
data = resp.json()
names = [m["name"] for m in data["models"]]
assert "gpt-oss:120b" in names
# ---------------------------------------------------------------------------
# POST /api/chat (non-streaming)
# ---------------------------------------------------------------------------
class TestApiChatNonStreaming:
def test_non_streaming_round_trip(self, app_with_mocks):
"""POST /api/chat with stream=False returns Ollama response."""
from fastapi.testclient import TestClient
import app.utils as utils_module
import app.proxy_handler as ph_module
vera_app, mock_qdrant = app_with_mocks
ollama_data = _ollama_chat_response("The answer is 42.")
mock_post_resp = MagicMock()
mock_post_resp.json.return_value = ollama_data
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.post = AsyncMock(return_value=mock_post_resp)
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant), \
patch("app.proxy_handler.get_qdrant_service", return_value=mock_qdrant), \
patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app) as client:
resp = client.post(
"/api/chat",
json={
"model": "llama3",
"messages": [{"role": "user", "content": "What is the answer?"}],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["message"]["content"] == "The answer is 42."
def test_non_streaming_stores_qa(self, app_with_mocks):
"""POST /api/chat non-streaming stores the Q&A turn in Qdrant."""
from fastapi.testclient import TestClient
import app.utils as utils_module
vera_app, mock_qdrant = app_with_mocks
ollama_data = _ollama_chat_response("42.")
mock_post_resp = MagicMock()
mock_post_resp.json.return_value = ollama_data
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.post = AsyncMock(return_value=mock_post_resp)
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant), \
patch("app.proxy_handler.get_qdrant_service", return_value=mock_qdrant), \
patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app) as client:
client.post(
"/api/chat",
json={
"model": "llama3",
"messages": [{"role": "user", "content": "What is 6*7?"}],
"stream": False,
},
)
mock_qdrant.store_qa_turn.assert_called_once()
args = mock_qdrant.store_qa_turn.call_args[0]
assert "6*7" in args[0]
assert "42." in args[1]
# ---------------------------------------------------------------------------
# POST /api/chat (streaming)
# ---------------------------------------------------------------------------
class TestApiChatStreaming:
def test_streaming_response_passthrough(self, app_with_mocks):
"""POST /api/chat with stream=True streams Ollama chunks."""
from fastapi.testclient import TestClient
import app.utils as utils_module
import app.proxy_handler as ph_module
vera_app, mock_qdrant = app_with_mocks
chunk1 = json.dumps({"message": {"content": "Hello"}, "done": False}).encode()
chunk2 = json.dumps({"message": {"content": " world"}, "done": True}).encode()
async def fake_aiter_bytes():
yield chunk1
yield chunk2
mock_stream_resp = MagicMock()
mock_stream_resp.aiter_bytes = fake_aiter_bytes
mock_stream_resp.status_code = 200
mock_stream_resp.headers = {"content-type": "application/x-ndjson"}
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_client_instance.post = AsyncMock(return_value=mock_stream_resp)
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant), \
patch("app.proxy_handler.get_qdrant_service", return_value=mock_qdrant), \
patch("httpx.AsyncClient", return_value=mock_client_instance):
with TestClient(vera_app) as client:
resp = client.post(
"/api/chat",
json={
"model": "llama3",
"messages": [{"role": "user", "content": "Say hello"}],
"stream": True,
},
)
assert resp.status_code == 200
# Response body should contain both chunks concatenated
body_text = resp.text
assert "Hello" in body_text or len(body_text) > 0

221
tests/test_proxy_handler.py Normal file
View File

@@ -0,0 +1,221 @@
"""Tests for proxy_handler — no live Ollama or Qdrant required."""
import pytest
import json
from unittest.mock import AsyncMock, MagicMock, patch
class TestCleanMessageContent:
"""Tests for clean_message_content."""
def test_passthrough_plain_message(self):
"""Plain text without wrapper is returned unchanged."""
from app.proxy_handler import clean_message_content
content = "What is the capital of France?"
assert clean_message_content(content) == content
def test_strips_memory_context_wrapper(self):
"""[Memory context] wrapper is stripped, actual user_msg returned."""
from app.proxy_handler import clean_message_content
content = (
"[Memory context]\n"
"some context here\n"
"- user_msg: What is the capital of France?\n\n"
)
result = clean_message_content(content)
assert result == "What is the capital of France?"
def test_strips_timestamp_prefix(self):
"""ISO timestamp prefix like [2024-01-01T00:00:00] is removed."""
from app.proxy_handler import clean_message_content
content = "[2024-01-01T12:34:56] Tell me a joke"
result = clean_message_content(content)
assert result == "Tell me a joke"
def test_empty_string_returned_as_is(self):
"""Empty string input returns empty string."""
from app.proxy_handler import clean_message_content
assert clean_message_content("") == ""
def test_none_input_returned_as_is(self):
"""None/falsy input is returned unchanged."""
from app.proxy_handler import clean_message_content
assert clean_message_content(None) is None
def test_list_content_raises_type_error(self):
"""Non-string content (list) causes TypeError — the function expects strings."""
import pytest
from app.proxy_handler import clean_message_content
# The function passes lists to re.search which requires str/bytes.
# Document this behavior so we know it's a known limitation.
content = [{"type": "text", "text": "hello"}]
with pytest.raises(TypeError):
clean_message_content(content)
class TestHandleChatNonStreaming:
"""Tests for handle_chat_non_streaming — fully mocked external I/O."""
@pytest.mark.asyncio
async def test_returns_json_response(self):
"""Should return a JSONResponse with Ollama result merged with model field."""
from app.proxy_handler import handle_chat_non_streaming
ollama_resp_data = {
"message": {"role": "assistant", "content": "Paris."},
"done": True,
}
mock_httpx_resp = MagicMock()
mock_httpx_resp.json.return_value = ollama_resp_data
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_httpx_resp)
mock_qdrant = MagicMock()
mock_qdrant.store_qa_turn = AsyncMock(return_value="fake-uuid")
augmented = [{"role": "user", "content": "What is the capital of France?"}]
with patch("app.proxy_handler.build_augmented_messages", AsyncMock(return_value=augmented)), \
patch("app.proxy_handler.get_qdrant_service", return_value=mock_qdrant), \
patch("httpx.AsyncClient", return_value=mock_client):
body = {
"model": "llama3",
"messages": [{"role": "user", "content": "What is the capital of France?"}],
"stream": False,
}
response = await handle_chat_non_streaming(body)
# FastAPI JSONResponse
from fastapi.responses import JSONResponse
assert isinstance(response, JSONResponse)
response_body = json.loads(response.body)
assert response_body["message"]["content"] == "Paris."
assert response_body["model"] == "llama3"
@pytest.mark.asyncio
async def test_stores_qa_turn_when_answer_present(self):
"""store_qa_turn should be called with user question and assistant answer."""
from app.proxy_handler import handle_chat_non_streaming
ollama_resp_data = {
"message": {"role": "assistant", "content": "Berlin."},
"done": True,
}
mock_httpx_resp = MagicMock()
mock_httpx_resp.json.return_value = ollama_resp_data
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_httpx_resp)
mock_qdrant = MagicMock()
mock_qdrant.store_qa_turn = AsyncMock(return_value="fake-uuid")
augmented = [{"role": "user", "content": "Capital of Germany?"}]
with patch("app.proxy_handler.build_augmented_messages", AsyncMock(return_value=augmented)), \
patch("app.proxy_handler.get_qdrant_service", return_value=mock_qdrant), \
patch("httpx.AsyncClient", return_value=mock_client):
body = {
"model": "llama3",
"messages": [{"role": "user", "content": "Capital of Germany?"}],
"stream": False,
}
await handle_chat_non_streaming(body)
mock_qdrant.store_qa_turn.assert_called_once()
call_args = mock_qdrant.store_qa_turn.call_args
assert "Capital of Germany?" in call_args[0][0]
assert "Berlin." in call_args[0][1]
@pytest.mark.asyncio
async def test_no_store_when_empty_answer(self):
"""store_qa_turn should NOT be called when the assistant answer is empty."""
from app.proxy_handler import handle_chat_non_streaming
ollama_resp_data = {
"message": {"role": "assistant", "content": ""},
"done": True,
}
mock_httpx_resp = MagicMock()
mock_httpx_resp.json.return_value = ollama_resp_data
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_httpx_resp)
mock_qdrant = MagicMock()
mock_qdrant.store_qa_turn = AsyncMock(return_value="fake-uuid")
augmented = [{"role": "user", "content": "Hello?"}]
with patch("app.proxy_handler.build_augmented_messages", AsyncMock(return_value=augmented)), \
patch("app.proxy_handler.get_qdrant_service", return_value=mock_qdrant), \
patch("httpx.AsyncClient", return_value=mock_client):
body = {
"model": "llama3",
"messages": [{"role": "user", "content": "Hello?"}],
"stream": False,
}
await handle_chat_non_streaming(body)
mock_qdrant.store_qa_turn.assert_not_called()
@pytest.mark.asyncio
async def test_cleans_memory_context_from_user_message(self):
"""User message with [Memory context] wrapper should be cleaned before storing."""
from app.proxy_handler import handle_chat_non_streaming
ollama_resp_data = {
"message": {"role": "assistant", "content": "42."},
"done": True,
}
mock_httpx_resp = MagicMock()
mock_httpx_resp.json.return_value = ollama_resp_data
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_httpx_resp)
mock_qdrant = MagicMock()
mock_qdrant.store_qa_turn = AsyncMock(return_value="fake-uuid")
raw_content = (
"[Memory context]\nsome ctx\n- user_msg: What is the answer?\n\n"
)
augmented = [{"role": "user", "content": "What is the answer?"}]
with patch("app.proxy_handler.build_augmented_messages", AsyncMock(return_value=augmented)), \
patch("app.proxy_handler.get_qdrant_service", return_value=mock_qdrant), \
patch("httpx.AsyncClient", return_value=mock_client):
body = {
"model": "llama3",
"messages": [{"role": "user", "content": raw_content}],
"stream": False,
}
await handle_chat_non_streaming(body)
call_args = mock_qdrant.store_qa_turn.call_args
stored_question = call_args[0][0]
# The wrapper should be stripped
assert "Memory context" not in stored_question
assert "What is the answer?" in stored_question

319
tests/test_utils.py Normal file
View File

@@ -0,0 +1,319 @@
"""Tests for utility functions."""
import pytest
from app.utils import count_tokens, truncate_by_tokens, parse_curated_turn
class TestCountTokens:
"""Tests for count_tokens function."""
def test_empty_string(self):
"""Empty string should return 0 tokens."""
assert count_tokens("") == 0
def test_simple_text(self):
"""Simple text should count tokens correctly."""
text = "Hello, world!"
assert count_tokens(text) > 0
def test_longer_text(self):
"""Longer text should have more tokens."""
short = "Hello"
long = "Hello, this is a longer sentence with more words."
assert count_tokens(long) > count_tokens(short)
class TestTruncateByTokens:
"""Tests for truncate_by_tokens function."""
def test_no_truncation_needed(self):
"""Text shorter than limit should not be truncated."""
text = "Short text"
result = truncate_by_tokens(text, max_tokens=100)
assert result == text
def test_truncation_applied(self):
"""Text longer than limit should be truncated."""
text = "This is a longer piece of text that will need to be truncated"
result = truncate_by_tokens(text, max_tokens=5)
assert count_tokens(result) <= 5
def test_empty_string(self):
"""Empty string should return empty string."""
assert truncate_by_tokens("", max_tokens=10) == ""
class TestParseCuratedTurn:
"""Tests for parse_curated_turn function."""
def test_empty_string(self):
"""Empty string should return empty list."""
assert parse_curated_turn("") == []
def test_single_turn(self):
"""Single Q&A turn should parse correctly."""
text = "User: What is Python?\nAssistant: A programming language."
result = parse_curated_turn(text)
assert len(result) == 2
assert result[0]["role"] == "user"
assert result[0]["content"] == "What is Python?"
assert result[1]["role"] == "assistant"
assert result[1]["content"] == "A programming language."
def test_multiple_turns(self):
"""Multiple Q&A turns should parse correctly."""
text = """User: What is Python?
Assistant: A programming language.
User: Is it popular?
Assistant: Yes, very popular."""
result = parse_curated_turn(text)
assert len(result) == 4
def test_timestamp_ignored(self):
"""Timestamp lines should be ignored."""
text = "User: Question?\nAssistant: Answer.\nTimestamp: 2024-01-01T00:00:00Z"
result = parse_curated_turn(text)
assert len(result) == 2
for msg in result:
assert "Timestamp" not in msg["content"]
def test_multiline_content(self):
"""Multiline content should be preserved."""
text = "User: Line 1\nLine 2\nLine 3\nAssistant: Response"
result = parse_curated_turn(text)
assert "Line 1" in result[0]["content"]
assert "Line 2" in result[0]["content"]
assert "Line 3" in result[0]["content"]
class TestFilterMemoriesByTime:
"""Tests for filter_memories_by_time function."""
def test_includes_recent_memory(self):
"""Memory with timestamp in the last 24h should be included."""
from datetime import datetime, timedelta
from app.utils import filter_memories_by_time
ts = (datetime.utcnow() - timedelta(hours=1)).isoformat()
memories = [{"timestamp": ts, "text": "recent"}]
result = filter_memories_by_time(memories, hours=24)
assert len(result) == 1
def test_excludes_old_memory(self):
"""Memory older than cutoff should be excluded."""
from datetime import datetime, timedelta
from app.utils import filter_memories_by_time
ts = (datetime.utcnow() - timedelta(hours=48)).isoformat()
memories = [{"timestamp": ts, "text": "old"}]
result = filter_memories_by_time(memories, hours=24)
assert len(result) == 0
def test_includes_memory_without_timestamp(self):
"""Memory with no timestamp should always be included."""
from app.utils import filter_memories_by_time
memories = [{"text": "no ts"}]
result = filter_memories_by_time(memories, hours=24)
assert len(result) == 1
def test_includes_memory_with_bad_timestamp(self):
"""Memory with unparseable timestamp should be included (safe default)."""
from app.utils import filter_memories_by_time
memories = [{"timestamp": "not-a-date", "text": "bad ts"}]
result = filter_memories_by_time(memories, hours=24)
assert len(result) == 1
def test_empty_list(self):
"""Empty input returns empty list."""
from app.utils import filter_memories_by_time
assert filter_memories_by_time([], hours=24) == []
def test_z_suffix_timestamp(self):
"""ISO timestamp with Z suffix should be handled correctly."""
from datetime import datetime, timedelta
from app.utils import filter_memories_by_time
ts = (datetime.utcnow() - timedelta(hours=1)).isoformat() + "Z"
memories = [{"timestamp": ts, "text": "recent with Z"}]
result = filter_memories_by_time(memories, hours=24)
assert len(result) == 1
class TestMergeMemories:
"""Tests for merge_memories function."""
def test_empty_list(self):
"""Empty list returns empty text and ids."""
from app.utils import merge_memories
result = merge_memories([])
assert result == {"text": "", "ids": []}
def test_single_memory_with_text(self):
"""Single memory with text field is merged."""
from app.utils import merge_memories
memories = [{"id": "abc", "text": "hello world", "role": ""}]
result = merge_memories(memories)
assert "hello world" in result["text"]
assert "abc" in result["ids"]
def test_memory_with_content_field(self):
"""Memory using content field (no text) is merged."""
from app.utils import merge_memories
memories = [{"id": "xyz", "content": "from content field"}]
result = merge_memories(memories)
assert "from content field" in result["text"]
def test_role_included_in_output(self):
"""Role prefix should appear in merged text when role is set."""
from app.utils import merge_memories
memories = [{"id": "1", "text": "question", "role": "user"}]
result = merge_memories(memories)
assert "[user]:" in result["text"]
def test_multiple_memories_joined(self):
"""Multiple memories are joined with double newline."""
from app.utils import merge_memories
memories = [
{"id": "1", "text": "first"},
{"id": "2", "text": "second"},
]
result = merge_memories(memories)
assert "first" in result["text"]
assert "second" in result["text"]
assert len(result["ids"]) == 2
class TestCalculateTokenBudget:
"""Tests for calculate_token_budget function."""
def test_default_ratios_sum(self):
"""Default ratios should sum to 1.0 (system+semantic+context)."""
from app.utils import calculate_token_budget
result = calculate_token_budget(1000)
assert result["system"] + result["semantic"] + result["context"] == 1000
def test_custom_ratios(self):
"""Custom ratios should produce correct proportional budgets."""
from app.utils import calculate_token_budget
result = calculate_token_budget(
100, system_ratio=0.1, semantic_ratio=0.6, context_ratio=0.3
)
assert result["system"] == 10
assert result["semantic"] == 60
assert result["context"] == 30
def test_zero_budget(self):
"""Zero total budget yields all zeros."""
from app.utils import calculate_token_budget
result = calculate_token_budget(0)
assert result["system"] == 0
assert result["semantic"] == 0
assert result["context"] == 0
class TestBuildAugmentedMessages:
"""Tests for build_augmented_messages function (mocked I/O)."""
def _make_qdrant_mock(self):
"""Return an AsyncMock QdrantService."""
from unittest.mock import AsyncMock, MagicMock
mock_qdrant = MagicMock()
mock_qdrant.semantic_search = AsyncMock(return_value=[])
mock_qdrant.get_recent_turns = AsyncMock(return_value=[])
return mock_qdrant
def test_system_layer_prepended(self, monkeypatch, tmp_path):
"""System prompt from file should be prepended to messages."""
import asyncio
from unittest.mock import patch
import app.utils as utils_module
# Write a temp system prompt
prompt_file = tmp_path / "systemprompt.md"
prompt_file.write_text("You are Vera.")
mock_qdrant = self._make_qdrant_mock()
with patch.object(utils_module, "load_system_prompt", return_value="You are Vera."), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
result = asyncio.get_event_loop().run_until_complete(
utils_module.build_augmented_messages(
[{"role": "user", "content": "Hello"}]
)
)
system_msgs = [m for m in result if m["role"] == "system"]
assert len(system_msgs) == 1
assert "You are Vera." in system_msgs[0]["content"]
def test_incoming_user_message_preserved(self, monkeypatch):
"""Incoming user message should appear in output."""
import asyncio
from unittest.mock import patch
import app.utils as utils_module
mock_qdrant = self._make_qdrant_mock()
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
result = asyncio.get_event_loop().run_until_complete(
utils_module.build_augmented_messages(
[{"role": "user", "content": "What is 2+2?"}]
)
)
user_msgs = [m for m in result if m.get("role") == "user"]
assert any("2+2" in m["content"] for m in user_msgs)
def test_no_system_message_when_no_prompt(self, monkeypatch):
"""No system message added when both incoming and file prompt are empty."""
import asyncio
from unittest.mock import patch
import app.utils as utils_module
mock_qdrant = self._make_qdrant_mock()
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
result = asyncio.get_event_loop().run_until_complete(
utils_module.build_augmented_messages(
[{"role": "user", "content": "Hi"}]
)
)
system_msgs = [m for m in result if m.get("role") == "system"]
assert len(system_msgs) == 0
def test_semantic_results_injected(self, monkeypatch):
"""Curated memories from semantic search should appear in output."""
import asyncio
from unittest.mock import patch, AsyncMock, MagicMock
import app.utils as utils_module
mock_qdrant = MagicMock()
mock_qdrant.semantic_search = AsyncMock(return_value=[
{"payload": {"text": "User: Old question?\nAssistant: Old answer."}}
])
mock_qdrant.get_recent_turns = AsyncMock(return_value=[])
with patch.object(utils_module, "load_system_prompt", return_value=""), \
patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant):
result = asyncio.get_event_loop().run_until_complete(
utils_module.build_augmented_messages(
[{"role": "user", "content": "Tell me"}]
)
)
contents = [m["content"] for m in result]
assert any("Old question" in c or "Old answer" in c for c in contents)