Files
vera-ai-v2/app/utils.py
Vera-AI 50593e200d Initial commit: Vera-AI v2 with async Qdrant, singleton pattern, monthly curation, and configurable UID/GID/TZ
Features:
- AsyncQdrantClient for non-blocking Qdrant operations
- Singleton pattern for QdrantService
- Monthly full curation (day 1 at 03:00)
- Configurable UID/GID for Docker
- Timezone support via TZ env var
- Configurable log directory (VERA_LOG_DIR)
- Volume mounts for config/, prompts/, logs/
- Standard Docker format with .env file

Fixes:
- Removed unused system_token_budget
- Added semantic_score_threshold config
- Fixed streaming response handling
- Python-based healthcheck (no curl dependency)
2026-03-26 12:37:25 -05:00

203 lines
6.6 KiB
Python

"""Utility functions for vera-ai."""
from .config import config
import tiktoken
import os
from typing import List, Dict
from datetime import datetime, timedelta
from pathlib import Path
# Use cl100k_base encoding (GPT-4 compatible)
ENCODING = tiktoken.get_encoding("cl100k_base")
# Configurable paths (can be overridden via environment)
PROMPTS_DIR = Path(os.environ.get("VERA_PROMPTS_DIR", "/app/prompts"))
STATIC_DIR = Path(os.environ.get("VERA_STATIC_DIR", "/app/static"))
# Global qdrant_service instance for utils
_qdrant_service = None
def get_qdrant_service():
"""Get or create the QdrantService singleton."""
global _qdrant_service
if _qdrant_service is None:
from .config import config
from .qdrant_service import QdrantService
_qdrant_service = QdrantService(
host=config.qdrant_host,
collection=config.qdrant_collection,
embedding_model=config.embedding_model,
vector_size=config.vector_size,
ollama_host=config.ollama_host
)
return _qdrant_service
def count_tokens(text: str) -> int:
"""Count tokens in text."""
if not text:
return 0
return len(ENCODING.encode(text))
def count_messages_tokens(messages: List[dict]) -> int:
"""Count total tokens in messages."""
total = 0
for msg in messages:
if "content" in msg:
total += count_tokens(msg["content"])
return total
def truncate_by_tokens(text: str, max_tokens: int) -> str:
"""Truncate text to fit within token budget."""
if not text:
return text
tokens = ENCODING.encode(text)
if len(tokens) <= max_tokens:
return text
return ENCODING.decode(tokens[:max_tokens])
def filter_memories_by_time(memories: List[Dict], hours: int = 24) -> List[Dict]:
"""Filter memories from the last N hours."""
cutoff = datetime.utcnow() - timedelta(hours=hours)
filtered = []
for mem in memories:
ts = mem.get("timestamp")
if ts:
try:
# Parse ISO timestamp
if isinstance(ts, str):
mem_time = datetime.fromisoformat(ts.replace("Z", "+00:00").replace("+00:00", ""))
else:
mem_time = ts
if mem_time > cutoff:
filtered.append(mem)
except Exception:
# If timestamp parsing fails, include the memory
filtered.append(mem)
else:
# Include memories without timestamp
filtered.append(mem)
return filtered
def merge_memories(memories: List[Dict]) -> Dict:
"""Merge multiple memories into one combined text."""
if not memories:
return {"text": "", "ids": []}
texts = []
ids = []
for mem in memories:
text = mem.get("text", "") or mem.get("content", "")
if text:
# Include role if available
role = mem.get("role", "")
if role:
texts.append(f"[{role}]: {text}")
else:
texts.append(text)
ids.append(mem.get("id"))
return {
"text": "\n\n".join(texts),
"ids": ids
}
def calculate_token_budget(total_budget: int, system_ratio: float = 0.2,
semantic_ratio: float = 0.5, context_ratio: float = 0.3) -> Dict[int, int]:
"""Calculate token budgets for each layer."""
return {
"system": int(total_budget * system_ratio),
"semantic": int(total_budget * semantic_ratio),
"context": int(total_budget * context_ratio)
}
def load_system_prompt() -> str:
"""Load system prompt from prompts directory."""
import logging
logger = logging.getLogger(__name__)
# Try prompts directory first, then static for backward compatibility
prompts_path = PROMPTS_DIR / "systemprompt.md"
static_path = STATIC_DIR / "systemprompt.md"
if prompts_path.exists():
return prompts_path.read_text().strip()
elif static_path.exists():
return static_path.read_text().strip()
else:
logger.warning("systemprompt.md not found")
return ""
async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
"""Build 4-layer augmented messages from incoming messages.
This is a standalone version that can be used by proxy_handler.py.
"""
import logging
logger = logging.getLogger(__name__)
# Load system prompt
system_prompt = load_system_prompt()
# Get user question (last user message)
user_question = ""
for msg in reversed(incoming_messages):
if msg.get("role") == "user":
user_question = msg.get("content", "")
break
# Get search context (last few turns)
search_context = ""
for msg in incoming_messages[-6:]:
if msg.get("role") in ("user", "assistant"):
search_context += msg.get("content", "") + " "
messages = []
# === LAYER 1: System Prompt ===
system_content = ""
for msg in incoming_messages:
if msg.get("role") == "system":
system_content = msg.get("content", "")
break
if system_prompt:
system_content += "\n\n" + system_prompt
if system_content:
messages.append({"role": "system", "content": system_content})
# === LAYER 2: Semantic (curated memories) ===
qdrant = get_qdrant_service()
semantic_results = await qdrant.semantic_search(
query=search_context if search_context else user_question,
limit=20,
score_threshold=config.semantic_score_threshold,
entry_type="curated"
)
semantic_tokens = 0
for result in semantic_results:
payload = result.get("payload", {})
text = payload.get("text", "")
if text and semantic_tokens < config.semantic_token_budget:
messages.append({"role": "user", "content": text}) # Add as context
semantic_tokens += count_tokens(text)
# === LAYER 3: Context (recent turns) ===
recent_turns = await qdrant.get_recent_turns(limit=20)
context_tokens = 0
for turn in reversed(recent_turns):
payload = turn.get("payload", {})
text = payload.get("text", "")
if text and context_tokens < config.context_token_budget:
messages.append({"role": "user", "content": text}) # Add as context
context_tokens += count_tokens(text)
# === LAYER 4: Current messages (passed through) ===
for msg in incoming_messages:
if msg.get("role") != "system": # Do not duplicate system
messages.append(msg)
return messages