#!/usr/bin/env python3 """ TrueRecall v2 - Timer Curator Runs every 5 minutes via cron Extracts gems from uncurated memories and stores them in gems_tr REQUIRES: TrueRecall v1 (provides memories_tr via watcher) """ import sys import json import hashlib import requests from datetime import datetime, timezone from typing import List, Dict, Any, Optional # Configuration - EDIT THESE for your environment QDRANT_URL = "http://:6333" OLLAMA_URL = "http://:11434" SOURCE_COLLECTION = "memories_tr" TARGET_COLLECTION = "gems_tr" EMBEDDING_MODEL = "snowflake-arctic-embed2" MAX_BATCH = 100 USER_ID = "" def get_uncurated_memories(qdrant_url: str, collection: str, user_id: str, max_batch: int = 100) -> List[Dict[str, Any]]: """Fetch uncurated memories from Qdrant.""" try: response = requests.post( f"{qdrant_url}/collections/{collection}/points/scroll", json={ "limit": max_batch, "filter": { "must": [ {"key": "user_id", "match": {"value": user_id}}, {"key": "curated", "match": {"value": False}} ] }, "with_payload": True }, timeout=30 ) response.raise_for_status() data = response.json() return data.get("result", {}).get("points", []) except Exception as e: print(f"Error fetching memories: {e}", file=sys.stderr) return [] def extract_gems(memories: List[Dict[str, Any]], ollama_url: str) -> List[Dict[str, Any]]: """Send memories to LLM for gem extraction.""" if not memories: return [] SKIP_PATTERNS = [ "gems extracted", "curator", "curation complete", "system is running", "validation round", ] conversation_lines = [] for i, mem in enumerate(memories): payload = mem.get("payload", {}) text = payload.get("text", "") or payload.get("content", "") role = payload.get("role", "") if not text: continue text = str(text) if role == "assistant": continue text_lower = text.lower() if len(text) < 20: continue if any(pattern in text_lower for pattern in SKIP_PATTERNS): continue text = text[:500] if len(text) > 500 else text conversation_lines.append(f"[{i+1}] {text}") if not conversation_lines: return [] conversation_text = "\n\n".join(conversation_lines) prompt = """You are a memory curator. Extract atomic facts from the conversation below. For each distinct fact/decision/preference, output a JSON object with: - "text": the atomic fact (1-2 sentences) - use FIRST PERSON ("I" not "User") - "category": one of [decision, preference, technical, project, knowledge, system] - "importance": "high" or "medium" Return ONLY a JSON array. Example: [ {"text": "I decided to use Redis for caching", "category": "decision", "importance": "high"}, {"text": "I prefer dark mode", "category": "preference", "importance": "medium"} ] If no extractable facts, return []. CONVERSATION: """ full_prompt = f"{prompt}{conversation_text}\n\nJSON:" try: response = requests.post( f"{ollama_url}/api/generate", json={ "model": "", "system": prompt, "prompt": full_prompt, "stream": False, "options": { "temperature": 0.1, "num_predict": 4000 } }, timeout=120 ) response.raise_for_status() except Exception as e: print(f"Error calling Ollama: {e}", file=sys.stderr) return [] result = response.json() response_text = result.get("response", "") try: start = response_text.find('[') end = response_text.rfind(']') if start == -1 or end == -1: return [] json_str = response_text[start:end+1] gems = json.loads(json_str) if not isinstance(gems, list): return [] return gems except json.JSONDecodeError as e: print(f"JSON parse error: {e}", file=sys.stderr) return [] def get_embedding(text: str, ollama_url: str) -> Optional[List[float]]: """Get embedding from Ollama.""" try: response = requests.post( f"{ollama_url}/api/embeddings", json={ "model": EMBEDDING_MODEL, "prompt": text }, timeout=30 ) response.raise_for_status() data = response.json() return data.get("embedding") except Exception as e: print(f"Error getting embedding: {e}", file=sys.stderr) return None def store_gem(gem: Dict[str, Any], vector: List[float], qdrant_url: str, target_collection: str, user_id: str) -> bool: """Store a gem in Qdrant.""" embedding_text = gem.get("text", "") or gem.get("gem", "") hash_content = f"{user_id}:{embedding_text[:100]}" hash_bytes = hashlib.sha256(hash_content.encode()).digest()[:8] gem_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63) payload = { "text": embedding_text, "category": gem.get("category", "fact"), "importance": gem.get("importance", "medium"), "user_id": user_id, "created_at": datetime.now(timezone.utc).isoformat() } try: response = requests.put( f"{qdrant_url}/collections/{target_collection}/points", json={ "points": [{ "id": abs(gem_id), "vector": vector, "payload": payload }] }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error storing gem: {e}", file=sys.stderr) return False def mark_curated(memory_ids: List, qdrant_url: str, collection: str) -> bool: """Mark memories as curated.""" if not memory_ids: return True try: response = requests.post( f"{qdrant_url}/collections/{collection}/points/payload", json={ "points": memory_ids, "payload": { "curated": True, "curated_at": datetime.now(timezone.utc).isoformat() } }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error marking curated: {e}", file=sys.stderr) return False def main(): print("TrueRecall v2 - Timer Curator") print(f"User: {USER_ID}") print(f"Source: {SOURCE_COLLECTION}") print(f"Target: {TARGET_COLLECTION}") print(f"Max batch: {MAX_BATCH}\n") print("Fetching uncurated memories...") memories = get_uncurated_memories(QDRANT_URL, SOURCE_COLLECTION, USER_ID, MAX_BATCH) print(f"Found {len(memories)} uncurated memories\n") if not memories: print("Nothing to curate. Exiting.") return print("Sending memories to curator...") gems = extract_gems(memories, OLLAMA_URL) print(f"Extracted {len(gems)} gems\n") if not gems: print("No gems extracted. Exiting.") return print("Gems preview:") for i, gem in enumerate(gems[:3], 1): text = gem.get("text", "N/A")[:50] print(f" {i}. {text}...") if len(gems) > 3: print(f" ... and {len(gems) - 3} more") print() print("Storing gems...") stored = 0 for gem in gems: text = gem.get("text", "") or gem.get("gem", "") if not text: continue vector = get_embedding(text, OLLAMA_URL) if vector: if store_gem(gem, vector, QDRANT_URL, TARGET_COLLECTION, USER_ID): stored += 1 print(f"Stored: {stored}/{len(gems)}\n") print("Marking memories as curated...") memory_ids = [mem.get("id") for mem in memories if mem.get("id")] if mark_curated(memory_ids, QDRANT_URL, SOURCE_COLLECTION): print(f"Marked {len(memory_ids)} memories as curated\n") else: print("Failed to mark memories\n") print("Curation complete!") if __name__ == "__main__": main()