#!/usr/bin/env python3 """ True-Recall v2 Curator: Reads from Qdrant kimi_memories Reads 24 hours of conversation from Qdrant kimi_memories collection, extracts contextual gems using qwen3, stores to Qdrant gems_tr with mxbai embeddings. Usage: python curate_from_qdrant.py --user-id rob python curate_from_qdrant.py --user-id rob --date 2026-02-23 """ import json import argparse import requests import urllib.request from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Any, Optional import hashlib # Configuration QDRANT_URL = "http://10.0.0.40:6333" SOURCE_COLLECTION = "memories_tr" TARGET_COLLECTION = "gems_tr" OLLAMA_URL = "http://10.0.0.10:11434" EMBEDDING_MODEL = "mxbai-embed-large" CURATION_MODEL = "qwen3:4b-instruct" # Load curator prompt CURATOR_PROMPT_PATH = "/root/.openclaw/workspace/.projects/true-recall/curator-prompt.md" def load_curator_prompt() -> str: """Load the curator system prompt.""" try: with open(CURATOR_PROMPT_PATH, 'r') as f: return f.read() except FileNotFoundError: # Fallback to v2 location CURATOR_PROMPT_PATH_V2 = "/root/.openclaw/workspace/.projects/true-recall-v2/curator-prompt.md" with open(CURATOR_PROMPT_PATH_V2, 'r') as f: return f.read() def get_turns_from_qdrant(user_id: str, date_str: str) -> List[Dict[str, Any]]: """ Get all conversation turns from Qdrant for a specific user and date. Returns turns sorted by conversation_id and turn_number. """ # Build filter for user_id and date filter_data = { "must": [ {"key": "user_id", "match": {"value": user_id}}, {"key": "date", "match": {"value": date_str}} ] } # Use scroll API to get all matching points all_points = [] offset = None max_iterations = 100 # Safety limit iterations = 0 while iterations < max_iterations: iterations += 1 scroll_data = { "limit": 100, "with_payload": True, "filter": filter_data } if offset: scroll_data["offset"] = offset req = urllib.request.Request( f"{QDRANT_URL}/collections/{SOURCE_COLLECTION}/points/scroll", data=json.dumps(scroll_data).encode(), headers={"Content-Type": "application/json"}, method="POST" ) try: with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read().decode()) points = result.get("result", {}).get("points", []) if not points: break all_points.extend(points) # Check if there's more offset = result.get("result", {}).get("next_page_offset") if not offset: break except urllib.error.HTTPError as e: if e.code == 404: print(f"āš ļø Collection {SOURCE_COLLECTION} not found") return [] raise # Convert points to turn format (harvested summaries) turns = [] for point in all_points: payload = point.get("payload", {}) # Extract user and AI messages user_msg = payload.get("user_message", "") ai_msg = payload.get("ai_response", "") # Get timestamp from created_at created_at = payload.get("created_at", "") turn = { "turn": payload.get("turn_number", 0), "user_id": payload.get("user_id", user_id), "user": user_msg, "ai": ai_msg, "conversation_id": payload.get("conversation_id", ""), "session_id": payload.get("session_id", ""), "timestamp": created_at, "date": payload.get("date", date_str), "content_hash": payload.get("content_hash", "") } # Skip if no content if turn["user"] or turn["ai"]: turns.append(turn) # Sort by conversation_id, then by turn number turns.sort(key=lambda x: (x.get("conversation_id", ""), x.get("turn", 0))) return turns def extract_gems_with_curator(turns: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Use qwen3 to extract gems from conversation turns.""" if not turns: return [] prompt = load_curator_prompt() # Build the conversation input conversation_json = json.dumps(turns, indent=2) # Call Ollama with native system prompt response = requests.post( f"{OLLAMA_URL}/api/generate", json={ "model": CURATION_MODEL, "system": prompt, "prompt": f"## Input Conversation\n\n```json\n{conversation_json}\n```\n\n## Output\n", "stream": False, "options": { "temperature": 0.1, "num_predict": 4000 } } ) if response.status_code != 200: raise RuntimeError(f"Curation failed: {response.text}") result = response.json() output = result.get('response', '').strip() # Extract JSON from output (handle markdown code blocks) if '```json' in output: output = output.split('```json')[1].split('```')[0].strip() elif '```' in output: output = output.split('```')[1].split('```')[0].strip() try: # Extract JSON array - find first [ and last ] start_idx = output.find('[') end_idx = output.rfind(']') if start_idx != -1 and end_idx != -1 and end_idx > start_idx: output = output[start_idx:end_idx+1] gems = json.loads(output) if not isinstance(gems, list): print(f"Warning: Curator returned non-list, wrapping: {type(gems)}") gems = [gems] if gems else [] return gems except json.JSONDecodeError as e: print(f"Error parsing curator output: {e}") print(f"Raw output: {output[:500]}...") return [] def get_embedding(text: str) -> List[float]: """Get embedding vector from Ollama using mxbai-embed-large.""" response = requests.post( f"{OLLAMA_URL}/api/embeddings", json={ "model": EMBEDDING_MODEL, "prompt": text } ) if response.status_code != 200: raise RuntimeError(f"Embedding failed: {response.text}") return response.json()['embedding'] def get_gem_id(gem: Dict[str, Any], user_id: str) -> int: """Generate deterministic integer ID for a gem.""" hash_bytes = hashlib.sha256( f"{user_id}:{gem.get('conversation_id', '')}:{gem.get('turn_range', '')}".encode() ).digest()[:8] return int.from_bytes(hash_bytes, byteorder='big') % (2**63) def check_duplicate(gem: Dict[str, Any], user_id: str) -> bool: """Check if a similar gem already exists in gems_tr.""" gem_id = get_gem_id(gem, user_id) # Check if point exists try: req = urllib.request.Request( f"{QDRANT_URL}/collections/{TARGET_COLLECTION}/points/{gem_id}", headers={"Content-Type": "application/json"}, method="GET" ) with urllib.request.urlopen(req, timeout=10) as response: return True # Point exists except urllib.error.HTTPError as e: if e.code == 404: return False # Point doesn't exist raise def store_gem_to_qdrant(gem: Dict[str, Any], user_id: str) -> bool: """Store a gem to Qdrant with embedding.""" # Create embedding from gem text embedding_text = f"{gem.get('gem', '')} {gem.get('context', '')} {gem.get('snippet', '')}" vector = get_embedding(embedding_text) # Prepare payload payload = { "user_id": user_id, **gem } # Generate deterministic integer ID gem_id = get_gem_id(gem, user_id) # Store to Qdrant response = requests.put( f"{QDRANT_URL}/collections/{TARGET_COLLECTION}/points", json={ "points": [{ "id": gem_id, "vector": vector, "payload": payload }] } ) return response.status_code == 200 def main(): parser = argparse.ArgumentParser(description="True-Recall Curator v2 - Reads from Qdrant") parser.add_argument("--user-id", required=True, help="User ID to process") parser.add_argument("--date", help="Specific date to process (YYYY-MM-DD), defaults to yesterday") parser.add_argument("--dry-run", action="store_true", help="Don't store, just preview") args = parser.parse_args() # Determine date (yesterday by default) if args.date: date_str = args.date else: yesterday = datetime.now() - timedelta(days=1) date_str = yesterday.strftime("%Y-%m-%d") print(f"šŸ” True-Recall Curator v2 for {args.user_id}") print(f"šŸ“… Processing date: {date_str}") print(f"🧠 Embedding model: {EMBEDDING_MODEL}") print(f"šŸ’Ž Target collection: {TARGET_COLLECTION}") print() # Get turns from Qdrant print(f"šŸ“„ Fetching conversation turns from {SOURCE_COLLECTION}...") turns = get_turns_from_qdrant(args.user_id, date_str) print(f"āœ… Found {len(turns)} turns") if not turns: print("āš ļø No turns to process. Exiting.") return # Show sample print("\nšŸ“„ Sample turns:") for i, turn in enumerate(turns[:3], 1): user_msg = turn.get("user", "")[:60] ai_msg = turn.get("ai", "")[:60] print(f" Turn {turn.get('turn')}: User: {user_msg}...") print(f" AI: {ai_msg}...") if len(turns) > 3: print(f" ... and {len(turns) - 3} more") # Extract gems print("\n🧠 Extracting gems with The Curator (qwen3)...") gems = extract_gems_with_curator(turns) print(f"āœ… Extracted {len(gems)} gems") if not gems: print("āš ļø No gems extracted. Exiting.") return # Preview gems print("\nšŸ’Ž Preview of extracted gems:") for i, gem in enumerate(gems[:3], 1): print(f"\n--- Gem {i} ---") print(f"Gem: {gem.get('gem', 'N/A')[:100]}...") print(f"Categories: {gem.get('categories', [])}") print(f"Importance: {gem.get('importance', 'N/A')}") print(f"Confidence: {gem.get('confidence', 'N/A')}") if len(gems) > 3: print(f"\n... and {len(gems) - 3} more gems") if args.dry_run: print("\nšŸƒ DRY RUN: Not storing gems.") return # Check for duplicates and store print("\nšŸ’¾ Storing gems to Qdrant...") stored = 0 skipped = 0 failed = 0 for gem in gems: # Check for duplicates if check_duplicate(gem, args.user_id): print(f" ā­ļø Skipping duplicate: {gem.get('gem', 'N/A')[:50]}...") skipped += 1 continue if store_gem_to_qdrant(gem, args.user_id): stored += 1 else: print(f" āš ļø Failed to store gem: {gem.get('gem', 'N/A')[:50]}...") failed += 1 print(f"\nāœ… Stored: {stored}") print(f"ā­ļø Skipped (duplicates): {skipped}") print(f"āŒ Failed: {failed}") print("\nšŸŽ‰ Curation complete!") if __name__ == "__main__": main()