#!/usr/bin/env python3 """ Turn-Based Curator: Extract gems every N turns (instead of daily). Usage: python3 curator_turn_based.py --threshold 10 --dry-run python3 curator_turn_based.py --threshold 10 --execute python3 curator_turn_based.py --status # Show turn counts This tracks turn count since last curation and runs when threshold is reached. """ import argparse import json import os import requests import sys from datetime import datetime, timezone, timedelta from pathlib import Path from typing import List, Dict, Any, Optional # Config QDRANT_URL = "http://10.0.0.40:6333" MEMORIES_COLLECTION = "memories_tr" GEMS_COLLECTION = "gems_tr" OLLAMA_URL = "http://10.0.0.10:11434" CURATOR_MODEL = "ollama-remote/qwen3:30b-a3b-instruct-2507-q8_0" # State file tracks last curation STATE_FILE = Path("/tmp/curator_turn_state.json") def get_curator_prompt(conversation_text: str) -> str: """Generate prompt for gem extraction.""" return f"""You are a memory curator. Extract only the most valuable gems (key insights) from this conversation. Rules: 1. Extract only genuinely important information (decisions, preferences, key facts) 2. Skip transient/trivial content (greetings, questions, temporary requests) 3. Each gem should be self-contained and useful for future context 4. Format: concise, factual statements 5. Max 3-5 gems total Conversation to curate: --- {conversation_text} --- Return ONLY a JSON array of gems like: [{{"text": "User decided to use X approach for Y", "category": "decision"}}] Categories: preference, fact, decision, entity, other JSON:""" def load_state() -> Dict[str, Any]: """Load curation state.""" if STATE_FILE.exists(): try: with open(STATE_FILE) as f: return json.load(f) except: pass return {"last_turn": 0, "last_curation": None} def save_state(state: Dict[str, Any]): """Save curation state.""" with open(STATE_FILE, 'w') as f: json.dump(state, f, indent=2) def get_point_count_since(last_time: str) -> int: """Get count of points since last curation time.""" try: response = requests.post( f"{QDRANT_URL}/collections/{MEMORIES_COLLECTION}/points/count", json={ "filter": { "must": [ { "key": "timestamp", "range": { "gt": last_time } } ] } }, timeout=30 ) response.raise_for_status() return response.json().get("result", {}).get("count", 0) except Exception as e: print(f"Error getting count: {e}", file=sys.stderr) return 0 def get_turns_since(last_turn: int, limit: int = 100) -> List[Dict[str, Any]]: """Get all turns since last curation.""" try: response = requests.post( f"{QDRANT_URL}/collections/{MEMORIES_COLLECTION}/points/scroll", json={"limit": limit, "with_payload": True}, timeout=30 ) response.raise_for_status() data = response.json() turns = [] for point in data.get("result", {}).get("points", []): turn_num = point.get("payload", {}).get("turn", 0) if turn_num > last_turn: turns.append(point) # Sort by turn number turns.sort(key=lambda x: x.get("payload", {}).get("turn", 0)) return turns except Exception as e: print(f"Error fetching turns: {e}", file=sys.stderr) return [] def extract_gems_with_llm(conversation_text: str) -> List[Dict[str, str]]: """Send conversation to LLM for gem extraction.""" prompt = get_curator_prompt(conversation_text) try: response = requests.post( f"{OLLAMA_URL}/v1/chat/completions", json={ "model": CURATOR_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "max_tokens": 1000 }, timeout=120 ) response.raise_for_status() data = response.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "[]") # Extract JSON from response try: # Try to find JSON array in response start = content.find('[') end = content.rfind(']') if start != -1 and end != -1: json_str = content[start:end+1] gems = json.loads(json_str) if isinstance(gems, list): return gems except: pass return [] except Exception as e: print(f"Error calling LLM: {e}", file=sys.stderr) return [] def store_gem(gem: Dict[str, str]) -> bool: """Store a single gem to gems_tr.""" try: # Get embedding for gem response = requests.post( f"{OLLAMA_URL}/api/embeddings", json={"model": "snowflake-arctic-embed2", "prompt": gem["text"]}, timeout=30 ) response.raise_for_status() vector = response.json().get("embedding", []) if not vector: return False # Store to gems_tr response = requests.put( f"{QDRANT_URL}/collections/{GEMS_COLLECTION}/points", json={ "points": [{ "id": hash(gem["text"]) % (2**63), "vector": vector, "payload": { "text": gem["text"], "category": gem.get("category", "other"), "createdAt": datetime.now(timezone.utc).isoformat(), "source": "turn_based_curator" } }] }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error storing gem: {e}", file=sys.stderr) return False def main(): parser = argparse.ArgumentParser(description="Turn-based curator") parser.add_argument("--threshold", "-t", type=int, default=10, help="Run curation every N turns (default: 10)") parser.add_argument("--execute", "-e", action="store_true", help="Execute curation") parser.add_argument("--dry-run", "-n", action="store_true", help="Preview what would be curated") parser.add_argument("--status", "-s", action="store_true", help="Show current turn status") args = parser.parse_args() # Load state state = load_state() current_turn = get_current_turn_count() turns_since = current_turn - state["last_turn"] if args.status: print(f"Current turn: {current_turn}") print(f"Last curation: {state['last_turn']}") print(f"Turns since last curation: {turns_since}") print(f"Threshold: {args.threshold}") print(f"Ready to curate: {'YES' if turns_since >= args.threshold else 'NO'}") return print(f"Turn-based Curator") print(f"Current turn: {current_turn}") print(f"Last curation: {state['last_turn']}") print(f"Turns since: {turns_since}") print(f"Threshold: {args.threshold}") print() if turns_since < args.threshold: print(f"Not enough turns. Need {args.threshold}, have {turns_since}") return # Get turns to process print(f"Fetching {turns_since} turns...") turns = get_turns_since(state["last_turn"], limit=turns_since + 10) if not turns: print("No new turns found") return # Build conversation text conversation_parts = [] for turn in turns: role = turn.get("payload", {}).get("role", "unknown") content = turn.get("payload", {}).get("content", "") conversation_parts.append(f"{role.upper()}: {content}") conversation_text = "\n\n".join(conversation_parts) print(f"Processing {len(turns)} turns ({len(conversation_text)} chars)") print() if args.dry_run: print("=== CONVERSATION TEXT ===") print(conversation_text[:500] + "..." if len(conversation_text) > 500 else conversation_text) print() print("[DRY RUN] Would extract gems and store to gems_tr") return if not args.execute: print("Use --execute to run curation or --dry-run to preview") return # Extract gems print("Extracting gems with LLM...") gems = extract_gems_with_llm(conversation_text) if not gems: print("No gems extracted") return print(f"Extracted {len(gems)} gems:") for i, gem in enumerate(gems, 1): print(f" {i}. [{gem.get('category', 'other')}] {gem['text'][:80]}...") print() # Store gems print("Storing gems...") success = 0 for gem in gems: if store_gem(gem): success += 1 # Update state state["last_turn"] = current_turn state["last_curation"] = datetime.now(timezone.utc).isoformat() save_state(state) print(f"Done! Stored {success}/{len(gems)} gems") if __name__ == "__main__": main()