#!/usr/bin/env python3 """ Turn-Based Curator: Extract gems every N new memories (turns). Usage: python3 curator_by_count.py --threshold 10 --dry-run python3 curator_by_count.py --threshold 10 --execute python3 curator_by_count.py --status """ import argparse import json import requests import sys from datetime import datetime, timezone, timedelta from pathlib import Path QDRANT_URL = "http://10.0.0.40:6333" MEMORIES = "memories_tr" GEMS = "gems_tr" OLLAMA = "http://10.0.0.10:11434" MODEL = "ollama-remote/qwen3:30b-a3b-instruct-2507-q8_0" STATE_FILE = Path("/tmp/curator_count_state.json") def load_state(): if STATE_FILE.exists(): with open(STATE_FILE) as f: return json.load(f) return {"last_count": 0, "last_time": None} def save_state(state): with open(STATE_FILE, 'w') as f: json.dump(state, f) def get_total_count(): try: r = requests.get(f"{QDRANT_URL}/collections/{MEMORIES}", timeout=10) return r.json()["result"]["points_count"] except: return 0 def get_recent_memories(hours=1): """Get memories from last N hours.""" since = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() try: r = requests.post( f"{QDRANT_URL}/collections/{MEMORIES}/points/scroll", json={"limit": 1000, "with_payload": True}, timeout=30 ) points = r.json()["result"]["points"] # Filter by timestamp recent = [p for p in points if p.get("payload", {}).get("timestamp", "") > since] return recent except: return [] def extract_gems(memories): """Send to LLM for gem extraction.""" if not memories: return [] # Build conversation parts = [] for m in memories: role = m["payload"].get("role", "unknown") content = m["payload"].get("content", "")[:500] # Limit per message parts.append(f"{role.upper()}: {content}") conversation = "\n\n".join(parts[:20]) # Max 20 messages prompt = f"""Extract 3-5 key gems (insights, decisions, facts) from this conversation. Conversation: {conversation} Return JSON: [{{"text": "gem", "category": "decision|fact|preference"}}]""" try: r = requests.post( f"{OLLAMA}/v1/chat/completions", json={ "model": MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3 }, timeout=120 ) content = r.json()["choices"][0]["message"]["content"] # Parse JSON start = content.find('[') end = content.rfind(']') if start >= 0 and end > start: return json.loads(content[start:end+1]) except: pass return [] def store_gem(gem): """Store gem to gems_tr.""" try: # Get embedding r = requests.post( f"{OLLAMA}/api/embeddings", json={"model": "snowflake-arctic-embed2", "prompt": gem["text"]}, timeout=30 ) vector = r.json()["embedding"] # Store r = requests.put( f"{QDRANT_URL}/collections/{GEMS}/points", json={ "points": [{ "id": abs(hash(gem["text"])) % (2**63), "vector": vector, "payload": { "text": gem["text"], "category": gem.get("category", "other"), "createdAt": datetime.now(timezone.utc).isoformat(), "source": "turn_curator" } }] }, timeout=30 ) return r.status_code == 200 except: return False def main(): parser = argparse.ArgumentParser() parser.add_argument("--threshold", "-t", type=int, default=10) parser.add_argument("--execute", "-e", action="store_true") parser.add_argument("--dry-run", "-n", action="store_true") parser.add_argument("--status", "-s", action="store_true") args = parser.parse_args() state = load_state() current = get_total_count() new_points = current - state.get("last_count", 0) if args.status: print(f"Total memories: {current}") print(f"Last curated: {state.get('last_count', 0)}") print(f"New since last: {new_points}") print(f"Threshold: {args.threshold}") print(f"Ready: {'YES' if new_points >= args.threshold else 'NO'}") return print(f"Curator: {new_points} new / {args.threshold} threshold") if new_points < args.threshold: print("Not enough new memories") return # Get recent memories (last hour should cover the new points) memories = get_recent_memories(hours=1) print(f"Fetched {len(memories)} recent memories") if not memories: print("No memories to process") return if args.dry_run: print(f"[DRY RUN] Would process {len(memories)} memories") return if not args.execute: print("Use --execute to run or --dry-run to preview") return # Extract gems print("Extracting gems...") gems = extract_gems(memories) print(f"Extracted {len(gems)} gems") # Store success = 0 for gem in gems: if store_gem(gem): success += 1 print(f" Stored: {gem['text'][:60]}...") # Update state state["last_count"] = current state["last_time"] = datetime.now(timezone.utc).isoformat() save_state(state) print(f"Done: {success}/{len(gems)} gems stored") if __name__ == "__main__": main()