"""Memory curator - runs daily (recent 24h) and monthly (full DB) to clean and maintain memory database. Creates INDIVIDUAL cleaned turns (one per raw turn), not merged summaries. Parses JSON response from curator_prompt.md format. """ import logging import os from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from pathlib import Path import httpx import json import re from .qdrant_service import QdrantService logger = logging.getLogger(__name__) # Configurable prompts directory (can be overridden via environment) PROMPTS_DIR = Path(os.environ.get("VERA_PROMPTS_DIR", "/app/prompts")) STATIC_DIR = Path(os.environ.get("VERA_STATIC_DIR", "/app/static")) def load_curator_prompt() -> str: """Load curator prompt from prompts directory.""" # Try prompts directory first, then static for backward compatibility prompts_path = PROMPTS_DIR / "curator_prompt.md" static_path = STATIC_DIR / "curator_prompt.md" if prompts_path.exists(): return prompts_path.read_text().strip() elif static_path.exists(): return static_path.read_text().strip() else: raise FileNotFoundError(f"curator_prompt.md not found in {PROMPTS_DIR} or {STATIC_DIR}") class Curator: def __init__(self, qdrant_service: QdrantService, model: str = "gpt-oss:120b", ollama_host: str = "http://10.0.0.10:11434"): self.qdrant = qdrant_service self.model = model self.ollama_host = ollama_host self.curator_prompt = load_curator_prompt() async def run(self, full: bool = False): """Run the curation process. Args: full: If True, process ALL raw memories (monthly full run). If False, process only recent 24h (daily run). """ logger.info(f"Starting memory curation (full={full})...") try: current_date = datetime.utcnow().strftime("%Y-%m-%d") # Get all memories (async) points, _ = await self.qdrant.client.scroll( collection_name=self.qdrant.collection, limit=10000, with_payload=True, with_vectors=False ) memories = [] for point in points: payload = point.payload or {} memories.append({ "id": str(point.id), "text": payload.get("text", ""), "type": payload.get("type", "raw"), "timestamp": payload.get("timestamp", ""), "payload": payload }) raw_memories = [m for m in memories if m["type"] == "raw"] curated_memories = [m for m in memories if m["type"] == "curated"] logger.info(f"Found {len(raw_memories)} raw, {len(curated_memories)} curated") # Filter by time for daily runs, process all for full runs if full: # Monthly full run: process ALL raw memories recent_raw = raw_memories logger.info(f"FULL RUN: Processing all {len(recent_raw)} raw memories") else: # Daily run: process only recent 24h recent_raw = [m for m in raw_memories if self._is_recent(m, hours=24)] logger.info(f"DAILY RUN: Processing {len(recent_raw)} recent raw memories") existing_sample = curated_memories[-50:] if len(curated_memories) > 50 else curated_memories if not recent_raw: logger.info("No raw memories to process") return raw_turns_text = self._format_raw_turns(recent_raw) existing_text = self._format_existing_memories(existing_sample) prompt = self.curator_prompt.replace("{CURRENT_DATE}", current_date) full_prompt = f"""{prompt} ## {'All' if full else 'Recent'} Raw Turns ({'full database' if full else 'last 24 hours'}): {raw_turns_text} ## Existing Memories (sample): {existing_text} Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the JSON object.""" logger.info(f"Sending {len(recent_raw)} raw turns to LLM...") response_text = await self._call_llm(full_prompt) result = self._parse_json_response(response_text) if not result: logger.error("Failed to parse JSON response from LLM") return new_turns = result.get("new_curated_turns", []) permanent_rules = result.get("permanent_rules", []) deletions = result.get("deletions", []) summary = result.get("summary", "") logger.info(f"Parsed: {len(new_turns)} turns, {len(permanent_rules)} rules, {len(deletions)} deletions") logger.info(f"Summary: {summary}") for turn in new_turns: content = turn.get("content", "") if content: await self.qdrant.store_turn( role="curated", content=content, entry_type="curated" ) logger.info(f"Stored curated turn: {content[:100]}...") for rule in permanent_rules: rule_text = rule.get("rule", "") target_file = rule.get("target_file", "systemprompt.md") if rule_text: await self._append_rule_to_file(target_file, rule_text) logger.info(f"Appended rule to {target_file}: {rule_text[:80]}...") if deletions: valid_deletions = [d for d in deletions if d in [m["id"] for m in memories]] if valid_deletions: await self.qdrant.delete_points(valid_deletions) logger.info(f"Deleted {len(valid_deletions)} points") raw_ids_to_delete = [m["id"] for m in recent_raw] if raw_ids_to_delete: await self.qdrant.delete_points(raw_ids_to_delete) logger.info(f"Deleted {len(raw_ids_to_delete)} processed raw memories") logger.info(f"Memory curation completed successfully (full={full})") except Exception as e: logger.error(f"Error during curation: {e}") raise async def run_full(self): """Run full curation (all raw memories). Convenience method.""" await self.run(full=True) async def run_daily(self): """Run daily curation (recent 24h only). Convenience method.""" await self.run(full=False) def _is_recent(self, memory: Dict, hours: int = 24) -> bool: """Check if memory is within the specified hours.""" timestamp = memory.get("timestamp", "") if not timestamp: return True try: mem_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) cutoff = datetime.utcnow() - timedelta(hours=hours) return mem_time.replace(tzinfo=None) > cutoff except: return True def _format_raw_turns(self, turns: List[Dict]) -> str: """Format raw turns for the LLM prompt.""" formatted = [] for i, turn in enumerate(turns, 1): text = turn.get("text", "") formatted.append(f"--- RAW TURN {i} (ID: {turn.get('id', 'unknown')}) ---\n{text}\n") return "\n".join(formatted) def _format_existing_memories(self, memories: List[Dict]) -> str: """Format existing curated memories for the LLM prompt.""" if not memories: return "No existing curated memories." formatted = [] for i, mem in enumerate(memories[-20:], 1): text = mem.get("text", "") formatted.append(f"{text}\n") return "\n".join(formatted) async def _call_llm(self, prompt: str) -> str: """Call Ollama LLM with the prompt.""" try: async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"{self.ollama_host}/api/generate", json={ "model": self.model, "prompt": prompt, "stream": False, "options": { "temperature": 0.3, "num_predict": 8192 } } ) result = response.json() return result.get("response", "") except Exception as e: logger.error(f"Failed to call LLM: {e}") return "" def _parse_json_response(self, response: str) -> Optional[Dict]: """Parse JSON from LLM response.""" if not response: return None try: return json.loads(response) except json.JSONDecodeError: pass try: start = response.find("{") end = response.rfind("}") + 1 if start >= 0 and end > start: return json.loads(response[start:end]) except json.JSONDecodeError: pass json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response) if json_match: try: return json.loads(json_match.group(1).strip()) except json.JSONDecodeError: pass logger.error(f"Could not parse JSON: {response[:500]}...") return None async def _append_rule_to_file(self, filename: str, rule: str): """Append a permanent rule to a prompts file.""" # Try prompts directory first, then static for backward compatibility prompts_path = PROMPTS_DIR / filename static_path = STATIC_DIR / filename # Use whichever directory is writable target_path = prompts_path if prompts_path.parent.exists() else static_path try: # Ensure parent directory exists target_path.parent.mkdir(parents=True, exist_ok=True) with open(target_path, "a") as f: f.write(f"\n{rule}\n") logger.info(f"Appended rule to {target_path}") except Exception as e: logger.error(f"Failed to append rule to {filename}: {e}")