2026-03-26 21:26:02 -05:00
|
|
|
"""Memory curator - runs daily to clean and maintain memory database.
|
2026-03-26 12:37:25 -05:00
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
On day 01 of each month, processes ALL raw memories (monthly mode).
|
|
|
|
|
Otherwise, processes recent 24h of raw memories (daily mode).
|
|
|
|
|
The prompt determines behavior based on current date.
|
2026-03-26 12:37:25 -05:00
|
|
|
"""
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
2026-04-01 16:06:00 -05:00
|
|
|
from datetime import datetime, timedelta, timezone
|
2026-03-26 12:37:25 -05:00
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import httpx
|
|
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
from .qdrant_service import QdrantService
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
# Configurable prompts directory (can be overridden via environment)
|
|
|
|
|
PROMPTS_DIR = Path(os.environ.get("VERA_PROMPTS_DIR", "/app/prompts"))
|
|
|
|
|
STATIC_DIR = Path(os.environ.get("VERA_STATIC_DIR", "/app/static"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_curator_prompt() -> str:
|
|
|
|
|
"""Load curator prompt from prompts directory."""
|
|
|
|
|
prompts_path = PROMPTS_DIR / "curator_prompt.md"
|
|
|
|
|
static_path = STATIC_DIR / "curator_prompt.md"
|
|
|
|
|
|
|
|
|
|
if prompts_path.exists():
|
|
|
|
|
return prompts_path.read_text().strip()
|
|
|
|
|
elif static_path.exists():
|
|
|
|
|
return static_path.read_text().strip()
|
|
|
|
|
else:
|
|
|
|
|
raise FileNotFoundError(f"curator_prompt.md not found in {PROMPTS_DIR} or {STATIC_DIR}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Curator:
|
|
|
|
|
def __init__(self, qdrant_service: QdrantService, model: str = "gpt-oss:120b", ollama_host: str = "http://10.0.0.10:11434"):
|
|
|
|
|
self.qdrant = qdrant_service
|
|
|
|
|
self.model = model
|
|
|
|
|
self.ollama_host = ollama_host
|
|
|
|
|
self.curator_prompt = load_curator_prompt()
|
|
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
async def run(self):
|
2026-03-26 12:37:25 -05:00
|
|
|
"""Run the curation process.
|
|
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
Automatically detects day 01 for monthly mode (processes ALL raw memories).
|
|
|
|
|
Otherwise runs daily mode (processes recent 24h only).
|
|
|
|
|
The prompt determines behavior based on current date.
|
2026-03-26 12:37:25 -05:00
|
|
|
"""
|
2026-04-01 16:06:00 -05:00
|
|
|
current_date = datetime.now(timezone.utc)
|
2026-03-26 21:26:02 -05:00
|
|
|
is_monthly = current_date.day == 1
|
|
|
|
|
mode = "MONTHLY" if is_monthly else "DAILY"
|
|
|
|
|
|
|
|
|
|
logger.info(f"Starting memory curation ({mode} mode)...")
|
2026-03-26 12:37:25 -05:00
|
|
|
try:
|
2026-03-26 21:26:02 -05:00
|
|
|
current_date_str = current_date.strftime("%Y-%m-%d")
|
2026-03-26 12:37:25 -05:00
|
|
|
|
|
|
|
|
# Get all memories (async)
|
|
|
|
|
points, _ = await self.qdrant.client.scroll(
|
|
|
|
|
collection_name=self.qdrant.collection,
|
|
|
|
|
limit=10000,
|
|
|
|
|
with_payload=True,
|
|
|
|
|
with_vectors=False
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
memories = []
|
|
|
|
|
for point in points:
|
|
|
|
|
payload = point.payload or {}
|
|
|
|
|
memories.append({
|
|
|
|
|
"id": str(point.id),
|
|
|
|
|
"text": payload.get("text", ""),
|
|
|
|
|
"type": payload.get("type", "raw"),
|
|
|
|
|
"timestamp": payload.get("timestamp", ""),
|
|
|
|
|
"payload": payload
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
raw_memories = [m for m in memories if m["type"] == "raw"]
|
|
|
|
|
curated_memories = [m for m in memories if m["type"] == "curated"]
|
|
|
|
|
|
|
|
|
|
logger.info(f"Found {len(raw_memories)} raw, {len(curated_memories)} curated")
|
|
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
# Filter by time for daily mode, process all for monthly mode
|
|
|
|
|
if is_monthly:
|
2026-03-26 12:37:25 -05:00
|
|
|
# Monthly full run: process ALL raw memories
|
|
|
|
|
recent_raw = raw_memories
|
2026-03-26 21:26:02 -05:00
|
|
|
logger.info(f"MONTHLY MODE: Processing all {len(recent_raw)} raw memories")
|
2026-03-26 12:37:25 -05:00
|
|
|
else:
|
|
|
|
|
# Daily run: process only recent 24h
|
|
|
|
|
recent_raw = [m for m in raw_memories if self._is_recent(m, hours=24)]
|
2026-03-26 21:26:02 -05:00
|
|
|
logger.info(f"DAILY MODE: Processing {len(recent_raw)} recent raw memories")
|
2026-03-26 12:37:25 -05:00
|
|
|
|
|
|
|
|
existing_sample = curated_memories[-50:] if len(curated_memories) > 50 else curated_memories
|
|
|
|
|
|
|
|
|
|
if not recent_raw:
|
|
|
|
|
logger.info("No raw memories to process")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
raw_turns_text = self._format_raw_turns(recent_raw)
|
|
|
|
|
existing_text = self._format_existing_memories(existing_sample)
|
|
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
prompt = self.curator_prompt.replace("{CURRENT_DATE}", current_date_str)
|
2026-03-26 12:37:25 -05:00
|
|
|
full_prompt = f"""{prompt}
|
|
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
## {'All' if is_monthly else 'Recent'} Raw Turns ({'full database' if is_monthly else 'last 24 hours'}):
|
2026-03-26 12:37:25 -05:00
|
|
|
{raw_turns_text}
|
|
|
|
|
|
|
|
|
|
## Existing Memories (sample):
|
|
|
|
|
{existing_text}
|
|
|
|
|
|
|
|
|
|
Remember: Respond with ONLY valid JSON. No markdown, no explanations, just the JSON object."""
|
|
|
|
|
|
|
|
|
|
logger.info(f"Sending {len(recent_raw)} raw turns to LLM...")
|
|
|
|
|
response_text = await self._call_llm(full_prompt)
|
|
|
|
|
|
|
|
|
|
result = self._parse_json_response(response_text)
|
|
|
|
|
|
|
|
|
|
if not result:
|
|
|
|
|
logger.error("Failed to parse JSON response from LLM")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
new_turns = result.get("new_curated_turns", [])
|
|
|
|
|
permanent_rules = result.get("permanent_rules", [])
|
|
|
|
|
deletions = result.get("deletions", [])
|
|
|
|
|
summary = result.get("summary", "")
|
|
|
|
|
|
|
|
|
|
logger.info(f"Parsed: {len(new_turns)} turns, {len(permanent_rules)} rules, {len(deletions)} deletions")
|
|
|
|
|
logger.info(f"Summary: {summary}")
|
|
|
|
|
|
|
|
|
|
for turn in new_turns:
|
|
|
|
|
content = turn.get("content", "")
|
|
|
|
|
if content:
|
|
|
|
|
await self.qdrant.store_turn(
|
|
|
|
|
role="curated",
|
|
|
|
|
content=content,
|
|
|
|
|
entry_type="curated"
|
|
|
|
|
)
|
|
|
|
|
logger.info(f"Stored curated turn: {content[:100]}...")
|
|
|
|
|
|
|
|
|
|
for rule in permanent_rules:
|
|
|
|
|
rule_text = rule.get("rule", "")
|
|
|
|
|
target_file = rule.get("target_file", "systemprompt.md")
|
|
|
|
|
if rule_text:
|
|
|
|
|
await self._append_rule_to_file(target_file, rule_text)
|
|
|
|
|
logger.info(f"Appended rule to {target_file}: {rule_text[:80]}...")
|
|
|
|
|
|
|
|
|
|
if deletions:
|
|
|
|
|
valid_deletions = [d for d in deletions if d in [m["id"] for m in memories]]
|
|
|
|
|
if valid_deletions:
|
|
|
|
|
await self.qdrant.delete_points(valid_deletions)
|
|
|
|
|
logger.info(f"Deleted {len(valid_deletions)} points")
|
|
|
|
|
|
|
|
|
|
raw_ids_to_delete = [m["id"] for m in recent_raw]
|
|
|
|
|
if raw_ids_to_delete:
|
|
|
|
|
await self.qdrant.delete_points(raw_ids_to_delete)
|
|
|
|
|
logger.info(f"Deleted {len(raw_ids_to_delete)} processed raw memories")
|
|
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
logger.info(f"Memory curation completed successfully ({mode} mode)")
|
2026-03-26 12:37:25 -05:00
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error during curation: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def _is_recent(self, memory: Dict, hours: int = 24) -> bool:
|
|
|
|
|
"""Check if memory is within the specified hours."""
|
|
|
|
|
timestamp = memory.get("timestamp", "")
|
|
|
|
|
if not timestamp:
|
|
|
|
|
return True
|
|
|
|
|
try:
|
|
|
|
|
mem_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
|
2026-04-01 16:06:00 -05:00
|
|
|
cutoff = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=hours)
|
2026-03-26 12:37:25 -05:00
|
|
|
return mem_time.replace(tzinfo=None) > cutoff
|
2026-03-30 08:47:56 -05:00
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
logger.debug(f"Could not parse timestamp: {timestamp}")
|
2026-03-26 12:37:25 -05:00
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def _format_raw_turns(self, turns: List[Dict]) -> str:
|
|
|
|
|
"""Format raw turns for the LLM prompt."""
|
|
|
|
|
formatted = []
|
|
|
|
|
for i, turn in enumerate(turns, 1):
|
|
|
|
|
text = turn.get("text", "")
|
|
|
|
|
formatted.append(f"--- RAW TURN {i} (ID: {turn.get('id', 'unknown')}) ---\n{text}\n")
|
|
|
|
|
return "\n".join(formatted)
|
|
|
|
|
|
|
|
|
|
def _format_existing_memories(self, memories: List[Dict]) -> str:
|
|
|
|
|
"""Format existing curated memories for the LLM prompt."""
|
|
|
|
|
if not memories:
|
|
|
|
|
return "No existing curated memories."
|
|
|
|
|
formatted = []
|
|
|
|
|
for i, mem in enumerate(memories[-20:], 1):
|
|
|
|
|
text = mem.get("text", "")
|
|
|
|
|
formatted.append(f"{text}\n")
|
|
|
|
|
return "\n".join(formatted)
|
|
|
|
|
|
|
|
|
|
async def _call_llm(self, prompt: str) -> str:
|
|
|
|
|
"""Call Ollama LLM with the prompt."""
|
|
|
|
|
try:
|
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
|
|
|
response = await client.post(
|
|
|
|
|
f"{self.ollama_host}/api/generate",
|
|
|
|
|
json={
|
|
|
|
|
"model": self.model,
|
|
|
|
|
"prompt": prompt,
|
|
|
|
|
"stream": False,
|
|
|
|
|
"options": {
|
|
|
|
|
"temperature": 0.3,
|
|
|
|
|
"num_predict": 8192
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
result = response.json()
|
|
|
|
|
return result.get("response", "")
|
|
|
|
|
except Exception as e:
|
2026-04-01 16:15:56 -05:00
|
|
|
logger.error(f"LLM call failed: {e}", exc_info=True)
|
2026-03-26 12:37:25 -05:00
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
def _parse_json_response(self, response: str) -> Optional[Dict]:
|
|
|
|
|
"""Parse JSON from LLM response."""
|
|
|
|
|
if not response:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
return json.loads(response)
|
|
|
|
|
except json.JSONDecodeError:
|
2026-04-01 16:15:56 -05:00
|
|
|
logger.debug("Direct JSON parse failed, trying brace extraction")
|
2026-03-26 12:37:25 -05:00
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
start = response.find("{")
|
|
|
|
|
end = response.rfind("}") + 1
|
|
|
|
|
if start >= 0 and end > start:
|
|
|
|
|
return json.loads(response[start:end])
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
pass
|
|
|
|
|
|
2026-03-26 21:26:02 -05:00
|
|
|
# Try to find JSON in code blocks
|
|
|
|
|
pattern = r'```(?:json)?\s*([\s\S]*?)```'
|
|
|
|
|
json_match = re.search(pattern, response)
|
2026-03-26 12:37:25 -05:00
|
|
|
if json_match:
|
|
|
|
|
try:
|
|
|
|
|
return json.loads(json_match.group(1).strip())
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
logger.error(f"Could not parse JSON: {response[:500]}...")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
async def _append_rule_to_file(self, filename: str, rule: str):
|
|
|
|
|
"""Append a permanent rule to a prompts file."""
|
|
|
|
|
prompts_path = PROMPTS_DIR / filename
|
|
|
|
|
static_path = STATIC_DIR / filename
|
|
|
|
|
|
|
|
|
|
# Use whichever directory is writable
|
|
|
|
|
target_path = prompts_path if prompts_path.parent.exists() else static_path
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Ensure parent directory exists
|
|
|
|
|
target_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
with open(target_path, "a") as f:
|
|
|
|
|
f.write(f"\n{rule}\n")
|
|
|
|
|
logger.info(f"Appended rule to {target_path}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to append rule to {filename}: {e}")
|