#!/usr/bin/env python3 """ TrueRecall Timer Curator: Runs every 30 minutes via cron. - Queries all uncurated memories from memories_tr - Sends batch to qwen3 for gem extraction - Stores gems to gems_tr - Marks processed memories as curated=true Usage: python3 curator_timer.py --config curator_config.json python3 curator_timer.py --config curator_config.json --dry-run """ import os import sys import json import argparse import requests from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Any, Optional import hashlib # Load config def load_config(config_path: str) -> Dict[str, Any]: with open(config_path, 'r') as f: return json.load(f) # Default paths SCRIPT_DIR = Path(__file__).parent DEFAULT_CONFIG = SCRIPT_DIR / "curator_config.json" # Curator prompt path CURATOR_PROMPT_PATH = Path("~/.openclaw/workspace/.local_projects/true-recall-v2/curator-prompt.md") def load_curator_prompt() -> str: """Load the curator system prompt.""" try: with open(CURATOR_PROMPT_PATH, 'r') as f: return f.read() except FileNotFoundError: print(f"⚠️ Curator prompt not found at {CURATOR_PROMPT_PATH}") return """You are The Curator. Extract meaningful gems from conversation history. Extract facts, insights, decisions, preferences, and context that would be valuable to remember. Output a JSON array of gems with fields: gem, context, snippet, categories, importance (1-5), confidence (0-0.99).""" def get_uncurated_memories(qdrant_url: str, collection: str, user_id: str, max_batch: int) -> List[Dict[str, Any]]: """Query Qdrant for uncurated memories.""" filter_data = { "must": [ {"key": "user_id", "match": {"value": user_id}}, {"key": "curated", "match": {"value": False}} ] } all_points = [] offset = None iterations = 0 max_iterations = 10 while len(all_points) < max_batch and iterations < max_iterations: iterations += 1 scroll_data = { "limit": min(100, max_batch - len(all_points)), "with_payload": True, "filter": filter_data } if offset: scroll_data["offset"] = offset try: response = requests.post( f"{qdrant_url}/collections/{collection}/points/scroll", json=scroll_data, headers={"Content-Type": "application/json"}, timeout=30 ) response.raise_for_status() result = response.json() points = result.get("result", {}).get("points", []) if not points: break all_points.extend(points) offset = result.get("result", {}).get("next_page_offset") if not offset: break except Exception as e: print(f"Error querying Qdrant: {e}", file=sys.stderr) break # Convert to simple dicts memories = [] for point in all_points: payload = point.get("payload", {}) memories.append({ "id": point.get("id"), "content": payload.get("content", ""), "role": payload.get("role", ""), "timestamp": payload.get("timestamp", ""), "turn": payload.get("turn", 0), **payload }) return memories[:max_batch] def extract_gems(memories: List[Dict[str, Any]], ollama_url: str) -> List[Dict[str, Any]]: """Send memories to qwen3 for gem extraction.""" if not memories: return [] # Build conversation from memories (support both 'text' and 'content' fields) conversation_lines = [] for i, mem in enumerate(memories): # Support both migrated memories (text) and watcher memories (content) text = mem.get("text", "") or mem.get("content", "") if text: # Truncate very long texts text = text[:500] if len(text) > 500 else text conversation_lines.append(f"[{i+1}] {text}") conversation_text = "\n\n".join(conversation_lines) # Simple extraction prompt prompt = """You are a memory curator. Extract atomic facts from the conversation below. For each distinct fact/decision/preference, output a JSON object with: - "text": the atomic fact (1-2 sentences) - "category": one of [decision, preference, technical, project, knowledge, system] - "importance": "high" or "medium" Return ONLY a JSON array. Example: [ {"text": "User decided to use Redis for caching", "category": "decision", "importance": "high"}, {"text": "User prefers dark mode", "category": "preference", "importance": "medium"} ] If no extractable facts, return []. CONVERSATION: """ full_prompt = f"{prompt}{conversation_text}\n\nJSON:" try: response = requests.post( f"{ollama_url}/api/generate", json={ "model": "qwen3:30b-a3b-instruct-2507-q8_0", "system": prompt, "prompt": full_prompt, "stream": False, "options": { "temperature": 0.1, "num_predict": 4000 } }, timeout=120 ) response.raise_for_status() except Exception as e: print(f"Error calling Ollama: {e}", file=sys.stderr) return [] result = response.json() output = result.get('response', '').strip() # Extract JSON from output if '```json' in output: output = output.split('```json')[1].split('```')[0].strip() elif '```' in output: output = output.split('```')[1].split('```')[0].strip() try: # Find JSON array in output start_idx = output.find('[') end_idx = output.rfind(']') if start_idx != -1 and end_idx != -1 and end_idx > start_idx: output = output[start_idx:end_idx+1] gems = json.loads(output) if not isinstance(gems, list): gems = [gems] if gems else [] return gems except json.JSONDecodeError as e: print(f"Error parsing curator output: {e}", file=sys.stderr) print(f"Raw output: {repr(output[:500])}...", file=sys.stderr) return [] def get_embedding(text: str, ollama_url: str) -> Optional[List[float]]: """Get embedding from Ollama.""" try: response = requests.post( f"{ollama_url}/api/embeddings", json={"model": "snowflake-arctic-embed2", "prompt": text}, timeout=30 ) response.raise_for_status() return response.json()['embedding'] except Exception as e: print(f"Error getting embedding: {e}", file=sys.stderr) return None def store_gem(gem: Dict[str, Any], user_id: str, qdrant_url: str, target_collection: str, ollama_url: str) -> bool: """Store a single gem to Qdrant.""" # Support both old format (gem, context, snippet) and new format (text, category, importance) embedding_text = gem.get('text', '') or gem.get('gem', '') if not embedding_text: embedding_text = f"{gem.get('gem', '')} {gem.get('context', '')} {gem.get('snippet', '')}".strip() if not embedding_text: print(f"⚠️ Empty embedding text for gem, skipping", file=sys.stderr) return False vector = get_embedding(embedding_text, ollama_url) if vector is None: print(f"⚠️ Failed to get embedding for gem", file=sys.stderr) return False # Generate ID hash_content = f"{user_id}:{gem.get('conversation_id', '')}:{gem.get('turn_range', '')}:{gem.get('gem', '')[:50]}" hash_bytes = hashlib.sha256(hash_content.encode()).digest()[:8] gem_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63) # Normalize gem fields - ensure we have text field payload = { "user_id": user_id, "text": gem.get('text', gem.get('gem', '')), "category": gem.get('category', 'general'), "importance": gem.get('importance', 'medium'), "curated_at": datetime.now(timezone.utc).isoformat() } # Preserve any other fields from gem for key in ['context', 'snippet', 'confidence', 'conversation_id', 'turn_range']: if key in gem: payload[key] = gem[key] try: response = requests.put( f"{qdrant_url}/collections/{target_collection}/points", json={ "points": [{ "id": abs(gem_id), "vector": vector, "payload": payload }] }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error storing gem: {e}", file=sys.stderr) return False def mark_curated(memory_ids: List, qdrant_url: str, collection: str) -> bool: """Mark memories as curated in Qdrant using POST /points/payload format.""" if not memory_ids: return True try: response = requests.post( f"{qdrant_url}/collections/{collection}/points/payload", json={ "points": memory_ids, "payload": { "curated": True, "curated_at": datetime.now(timezone.utc).isoformat() } }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error marking curated: {e}", file=sys.stderr) return False def main(): parser = argparse.ArgumentParser(description="TrueRecall Timer Curator") parser.add_argument("--config", "-c", default=str(DEFAULT_CONFIG), help="Config file path") parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write, just preview") args = parser.parse_args() config = load_config(args.config) qdrant_url = os.getenv("QDRANT_URL", "http://:6333") ollama_url = os.getenv("OLLAMA_URL", "http://:11434") user_id = config.get("user_id", "rob") source_collection = config.get("source_collection", "memories_tr") target_collection = config.get("target_collection", "gems_tr") max_batch = config.get("max_batch_size", 100) print(f"🔍 TrueRecall Timer Curator") print(f"👤 User: {user_id}") print(f"📥 Source: {source_collection}") print(f"💎 Target: {target_collection}") print(f"📦 Max batch: {max_batch}") if args.dry_run: print("🏃 DRY RUN MODE") print() # Get uncurated memories print("📥 Fetching uncurated memories...") memories = get_uncurated_memories(qdrant_url, source_collection, user_id, max_batch) print(f"✅ Found {len(memories)} uncurated memories") if not memories: print("🤷 Nothing to curate. Exiting.") return # Extract gems print(f"\n🧠 Sending {len(memories)} memories to curator...") gems = extract_gems(memories, ollama_url) print(f"✅ Extracted {len(gems)} gems") if not gems: print("⚠️ No gems extracted. Nothing to store.") # Still mark as curated so we don't reprocess memory_ids = [m["id"] for m in memories] # Keep as integers mark_curated(memory_ids, qdrant_url, source_collection) return # Preview print("\n💎 Gems preview:") for i, gem in enumerate(gems[:3], 1): print(f" {i}. {gem.get('gem', 'N/A')[:80]}...") if len(gems) > 3: print(f" ... and {len(gems) - 3} more") if args.dry_run: print("\n🏃 DRY RUN: Not storing gems or marking curated.") return # Store gems print(f"\n💾 Storing {len(gems)} gems...") stored = 0 for gem in gems: if store_gem(gem, user_id, qdrant_url, target_collection, ollama_url): stored += 1 print(f"✅ Stored: {stored}/{len(gems)}") # Mark memories as curated print("\n📝 Marking memories as curated...") memory_ids = [m["id"] for m in memories] # Keep as integers if mark_curated(memory_ids, qdrant_url, source_collection): print(f"✅ Marked {len(memory_ids)} memories as curated") else: print(f"⚠️ Failed to mark some memories as curated") print("\n🎉 Curation complete!") if __name__ == "__main__": main()