#!/usr/bin/env python3 """ TrueRecall Timer Curator: Runs every 30 minutes via cron. - Queries all uncurated memories from memories_tr - Sends batch to qwen3 for gem extraction - Stores gems to gems_tr - Marks processed memories as curated=true Usage: python3 curator_timer.py --config curator_config.json python3 curator_timer.py --config curator_config.json --dry-run """ import os import sys import json import argparse import requests from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Any, Optional import hashlib # Load config def load_config(config_path: str) -> Dict[str, Any]: with open(config_path, 'r') as f: return json.load(f) # Default paths SCRIPT_DIR = Path(__file__).parent DEFAULT_CONFIG = SCRIPT_DIR / "curator_config.json" # Curator prompt path CURATOR_PROMPT_PATH = Path("/root/.openclaw/workspace/.projects/true-recall-v2/curator-prompt.md") def load_curator_prompt() -> str: """Load the curator system prompt.""" try: with open(CURATOR_PROMPT_PATH, 'r') as f: return f.read() except FileNotFoundError: print(f"āš ļø Curator prompt not found at {CURATOR_PROMPT_PATH}") return """You are The Curator. Extract meaningful gems from conversation history. Extract facts, insights, decisions, preferences, and context that would be valuable to remember. Output a JSON array of gems with fields: gem, context, snippet, categories, importance (1-5), confidence (0-0.99).""" def get_uncurated_memories(qdrant_url: str, collection: str, user_id: str, max_batch: int) -> List[Dict[str, Any]]: """Query Qdrant for uncurated memories.""" filter_data = { "must": [ {"key": "user_id", "match": {"value": user_id}}, {"key": "curated", "match": {"value": False}} ] } all_points = [] offset = None iterations = 0 max_iterations = 10 while len(all_points) < max_batch and iterations < max_iterations: iterations += 1 scroll_data = { "limit": min(100, max_batch - len(all_points)), "with_payload": True, "filter": filter_data } if offset: scroll_data["offset"] = offset try: response = requests.post( f"{qdrant_url}/collections/{collection}/points/scroll", json=scroll_data, headers={"Content-Type": "application/json"}, timeout=30 ) response.raise_for_status() result = response.json() points = result.get("result", {}).get("points", []) if not points: break all_points.extend(points) offset = result.get("result", {}).get("next_page_offset") if not offset: break except Exception as e: print(f"Error querying Qdrant: {e}", file=sys.stderr) break # Convert to simple dicts memories = [] for point in all_points: payload = point.get("payload", {}) memories.append({ "id": point.get("id"), "content": payload.get("content", ""), "role": payload.get("role", ""), "timestamp": payload.get("timestamp", ""), "turn": payload.get("turn", 0), **payload }) return memories[:max_batch] def extract_gems(memories: List[Dict[str, Any]], ollama_url: str) -> List[Dict[str, Any]]: """Send memories to qwen3 for gem extraction.""" if not memories: return [] prompt = load_curator_prompt() # Build conversation from memories conversation_lines = [] for mem in memories: role = mem.get("role", "unknown") content = mem.get("content", "") if content: conversation_lines.append(f"{role}: {content}") conversation_text = "\n".join(conversation_lines) try: response = requests.post( f"{ollama_url}/api/generate", json={ "model": "qwen3:30b-a3b-instruct-2507-q8_0", "system": prompt, "prompt": f"## Input Conversation\n\n{conversation_text}\n\n## Output\n", "stream": False, "options": { "temperature": 0.1, "num_predict": 4000 } }, timeout=120 ) response.raise_for_status() except Exception as e: print(f"Error calling Ollama: {e}", file=sys.stderr) return [] result = response.json() output = result.get('response', '').strip() # Extract JSON from output if '```json' in output: output = output.split('```json')[1].split('```')[0].strip() elif '```' in output: output = output.split('```')[1].split('```')[0].strip() try: start_idx = output.find('[') end_idx = output.rfind(']') if start_idx != -1 and end_idx != -1 and end_idx > start_idx: output = output[start_idx:end_idx+1] # Fix common JSON issues from LLM output # Replace problematic escape sequences output = output.replace('\\n', '\n').replace('\\t', '\t') # Fix single quotes in content that break JSON output = output.replace("\\'", "'") gems = json.loads(output) if not isinstance(gems, list): gems = [gems] if gems else [] return gems except json.JSONDecodeError as e: # Try to extract gems with regex fallback import re gem_matches = re.findall(r'"gem"\s*:\s*"([^"]+)"', output) if gem_matches: gems = [] for gem_text in gem_matches: gems.append({ "gem": gem_text, "context": "Extracted via fallback", "categories": ["extracted"], "importance": 3, "confidence": 0.7 }) print(f"āš ļø Fallback extraction: {len(gems)} gems", file=sys.stderr) return gems print(f"Error parsing curator output: {e}", file=sys.stderr) print(f"Raw output: {repr(output[:500])}...", file=sys.stderr) return [] def get_embedding(text: str, ollama_url: str) -> Optional[List[float]]: """Get embedding from Ollama.""" try: response = requests.post( f"{ollama_url}/api/embeddings", json={"model": "mxbai-embed-large", "prompt": text}, timeout=30 ) response.raise_for_status() return response.json()['embedding'] except Exception as e: print(f"Error getting embedding: {e}", file=sys.stderr) return None def store_gem(gem: Dict[str, Any], user_id: str, qdrant_url: str, target_collection: str, ollama_url: str) -> bool: """Store a single gem to Qdrant.""" embedding_text = f"{gem.get('gem', '')} {gem.get('context', '')} {gem.get('snippet', '')}" vector = get_embedding(embedding_text, ollama_url) if vector is None: return False # Generate ID hash_content = f"{user_id}:{gem.get('conversation_id', '')}:{gem.get('turn_range', '')}:{gem.get('gem', '')[:50]}" hash_bytes = hashlib.sha256(hash_content.encode()).digest()[:8] gem_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63) payload = { "user_id": user_id, **gem, "curated_at": datetime.now(timezone.utc).isoformat() } try: response = requests.put( f"{qdrant_url}/collections/{target_collection}/points", json={ "points": [{ "id": abs(gem_id), "vector": vector, "payload": payload }] }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error storing gem: {e}", file=sys.stderr) return False def mark_curated(memory_ids: List, qdrant_url: str, collection: str) -> bool: """Mark memories as curated in Qdrant using POST /points/payload format.""" if not memory_ids: return True try: response = requests.post( f"{qdrant_url}/collections/{collection}/points/payload", json={ "points": memory_ids, "payload": { "curated": True, "curated_at": datetime.now(timezone.utc).isoformat() } }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error marking curated: {e}", file=sys.stderr) return False def main(): parser = argparse.ArgumentParser(description="TrueRecall Timer Curator") parser.add_argument("--config", "-c", default=str(DEFAULT_CONFIG), help="Config file path") parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write, just preview") args = parser.parse_args() config = load_config(args.config) qdrant_url = os.getenv("QDRANT_URL", "http://10.0.0.40:6333") ollama_url = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434") user_id = config.get("user_id", "rob") source_collection = config.get("source_collection", "memories_tr") target_collection = config.get("target_collection", "gems_tr") max_batch = config.get("max_batch_size", 100) print(f"šŸ” TrueRecall Timer Curator") print(f"šŸ‘¤ User: {user_id}") print(f"šŸ“„ Source: {source_collection}") print(f"šŸ’Ž Target: {target_collection}") print(f"šŸ“¦ Max batch: {max_batch}") if args.dry_run: print("šŸƒ DRY RUN MODE") print() # Get uncurated memories print("šŸ“„ Fetching uncurated memories...") memories = get_uncurated_memories(qdrant_url, source_collection, user_id, max_batch) print(f"āœ… Found {len(memories)} uncurated memories") if not memories: print("🤷 Nothing to curate. Exiting.") return # Extract gems print(f"\n🧠 Sending {len(memories)} memories to curator...") gems = extract_gems(memories, ollama_url) print(f"āœ… Extracted {len(gems)} gems") if not gems: print("āš ļø No gems extracted. Nothing to store.") # Still mark as curated so we don't reprocess memory_ids = [m["id"] for m in memories] # Keep as integers mark_curated(memory_ids, qdrant_url, source_collection) return # Preview print("\nšŸ’Ž Gems preview:") for i, gem in enumerate(gems[:3], 1): print(f" {i}. {gem.get('gem', 'N/A')[:80]}...") if len(gems) > 3: print(f" ... and {len(gems) - 3} more") if args.dry_run: print("\nšŸƒ DRY RUN: Not storing gems or marking curated.") return # Store gems print(f"\nšŸ’¾ Storing {len(gems)} gems...") stored = 0 for gem in gems: if store_gem(gem, user_id, qdrant_url, target_collection, ollama_url): stored += 1 print(f"āœ… Stored: {stored}/{len(gems)}") # Mark memories as curated print("\nšŸ“ Marking memories as curated...") memory_ids = [m["id"] for m in memories] # Keep as integers if mark_curated(memory_ids, qdrant_url, source_collection): print(f"āœ… Marked {len(memory_ids)} memories as curated") else: print(f"āš ļø Failed to mark some memories as curated") print("\nšŸŽ‰ Curation complete!") if __name__ == "__main__": main()