Files
vera-ai-v2/app/curator.py

263 lines
10 KiB
Python
Raw Permalink Normal View History

"""Memory curator - runs daily to clean and maintain memory database.
On day 01 of each month, processes ALL raw memories (monthly mode).
Otherwise, processes recent 24h of raw memories (daily mode).
The prompt determines behavior based on current date.
"""
import logging
import os
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from pathlib import Path
import httpx
import json
import re
from .qdrant_service import QdrantService
logger = logging.getLogger(__name__)
# Configurable prompts directory (can be overridden via environment)
PROMPTS_DIR = Path(os.environ.get("VERA_PROMPTS_DIR", "/app/prompts"))
STATIC_DIR = Path(os.environ.get("VERA_STATIC_DIR", "/app/static"))
def load_curator_prompt() -> str:
"""Load curator prompt from prompts directory."""
prompts_path = PROMPTS_DIR / "curator_prompt.md"
static_path = STATIC_DIR / "curator_prompt.md"
if prompts_path.exists():
return prompts_path.read_text().strip()
elif static_path.exists():
return static_path.read_text().strip()
else:
raise FileNotFoundError(f"curator_prompt.md not found in {PROMPTS_DIR} or {STATIC_DIR}")
class Curator:
def __init__(self, qdrant_service: QdrantService, model: str = "gpt-oss:120b", ollama_host: str = "http://10.0.0.10:11434"):
self.qdrant = qdrant_service
self.model = model
self.ollama_host = ollama_host
self.curator_prompt = load_curator_prompt()
async def run(self):
"""Run the curation process.
Automatically detects day 01 for monthly mode (processes ALL raw memories).
Otherwise runs daily mode (processes recent 24h only).
The prompt determines behavior based on current date.
"""
current_date = datetime.utcnow()
is_monthly = current_date.day == 1
mode = "MONTHLY" if is_monthly else "DAILY"
logger.info(f"Starting memory curation ({mode} mode)...")
try:
current_date_str = current_date.strftime("%Y-%m-%d")
# Get all memories (async)
points, _ = await self.qdrant.client.scroll(
collection_name=self.qdrant.collection,
limit=10000,
with_payload=True,
with_vectors=False
)
memories = []
for point in points:
payload = point.payload or {}
memories.append({
"id": str(point.id),
"text": payload.get("text", ""),
"type": payload.get("type", "raw"),
"timestamp": payload.get("timestamp", ""),
"payload": payload
})
raw_memories = [m for m in memories if m["type"] == "raw"]
curated_memories = [m for m in memories if m["type"] == "curated"]
logger.info(f"Found {len(raw_memories)} raw, {len(curated_memories)} curated")
# Filter by time for daily mode, process all for monthly mode
if is_monthly:
# Monthly full run: process ALL raw memories
recent_raw = raw_memories
logger.info(f"MONTHLY MODE: Processing all {len(recent_raw)} raw memories")
else:
# Daily run: process only recent 24h
recent_raw = [m for m in raw_memories if self._is_recent(m, hours=24)]
logger.info(f"DAILY MODE: Processing {len(recent_raw)} recent raw memories")
existing_sample = curated_memories[-50:] if len(curated_memories) > 50 else curated_memories
if not recent_raw:
logger.info("No raw memories to process")
return
raw_turns_text = self._format_raw_turns(recent_raw)
existing_text = self._format_existing_memories(existing_sample)
prompt = self.curator_prompt.replace("{CURRENT_DATE}", current_date_str)
full_prompt = f"""{prompt}
## {'All' if is_monthly else 'Recent'} Raw Turns ({'full database' if is_monthly else 'last 24 hours'}):
{raw_turns_text}
## Existing Memories (sample):
{existing_text}
Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the JSON object."""
logger.info(f"Sending {len(recent_raw)} raw turns to LLM...")
response_text = await self._call_llm(full_prompt)
result = self._parse_json_response(response_text)
if not result:
logger.error("Failed to parse JSON response from LLM")
return
new_turns = result.get("new_curated_turns", [])
permanent_rules = result.get("permanent_rules", [])
deletions = result.get("deletions", [])
summary = result.get("summary", "")
logger.info(f"Parsed: {len(new_turns)} turns, {len(permanent_rules)} rules, {len(deletions)} deletions")
logger.info(f"Summary: {summary}")
for turn in new_turns:
content = turn.get("content", "")
if content:
await self.qdrant.store_turn(
role="curated",
content=content,
entry_type="curated"
)
logger.info(f"Stored curated turn: {content[:100]}...")
for rule in permanent_rules:
rule_text = rule.get("rule", "")
target_file = rule.get("target_file", "systemprompt.md")
if rule_text:
await self._append_rule_to_file(target_file, rule_text)
logger.info(f"Appended rule to {target_file}: {rule_text[:80]}...")
if deletions:
valid_deletions = [d for d in deletions if d in [m["id"] for m in memories]]
if valid_deletions:
await self.qdrant.delete_points(valid_deletions)
logger.info(f"Deleted {len(valid_deletions)} points")
raw_ids_to_delete = [m["id"] for m in recent_raw]
if raw_ids_to_delete:
await self.qdrant.delete_points(raw_ids_to_delete)
logger.info(f"Deleted {len(raw_ids_to_delete)} processed raw memories")
logger.info(f"Memory curation completed successfully ({mode} mode)")
except Exception as e:
logger.error(f"Error during curation: {e}")
raise
def _is_recent(self, memory: Dict, hours: int = 24) -> bool:
"""Check if memory is within the specified hours."""
timestamp = memory.get("timestamp", "")
if not timestamp:
return True
try:
mem_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
cutoff = datetime.utcnow() - timedelta(hours=hours)
return mem_time.replace(tzinfo=None) > cutoff
except:
return True
def _format_raw_turns(self, turns: List[Dict]) -> str:
"""Format raw turns for the LLM prompt."""
formatted = []
for i, turn in enumerate(turns, 1):
text = turn.get("text", "")
formatted.append(f"--- RAW TURN {i} (ID: {turn.get('id', 'unknown')}) ---\n{text}\n")
return "\n".join(formatted)
def _format_existing_memories(self, memories: List[Dict]) -> str:
"""Format existing curated memories for the LLM prompt."""
if not memories:
return "No existing curated memories."
formatted = []
for i, mem in enumerate(memories[-20:], 1):
text = mem.get("text", "")
formatted.append(f"{text}\n")
return "\n".join(formatted)
async def _call_llm(self, prompt: str) -> str:
"""Call Ollama LLM with the prompt."""
try:
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{self.ollama_host}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3,
"num_predict": 8192
}
}
)
result = response.json()
return result.get("response", "")
except Exception as e:
logger.error(f"Failed to call LLM: {e}")
return ""
def _parse_json_response(self, response: str) -> Optional[Dict]:
"""Parse JSON from LLM response."""
if not response:
return None
try:
return json.loads(response)
except json.JSONDecodeError:
pass
try:
start = response.find("{")
end = response.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(response[start:end])
except json.JSONDecodeError:
pass
# Try to find JSON in code blocks
pattern = r'```(?:json)?\s*([\s\S]*?)```'
json_match = re.search(pattern, response)
if json_match:
try:
return json.loads(json_match.group(1).strip())
except json.JSONDecodeError:
pass
logger.error(f"Could not parse JSON: {response[:500]}...")
return None
async def _append_rule_to_file(self, filename: str, rule: str):
"""Append a permanent rule to a prompts file."""
prompts_path = PROMPTS_DIR / filename
static_path = STATIC_DIR / filename
# Use whichever directory is writable
target_path = prompts_path if prompts_path.parent.exists() else static_path
try:
# Ensure parent directory exists
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "a") as f:
f.write(f"\n{rule}\n")
logger.info(f"Appended rule to {target_path}")
except Exception as e:
logger.error(f"Failed to append rule to {filename}: {e}")