#!/usr/bin/env python3 """ TrueRecall v1.2 - Real-time Qdrant Watcher Monitors OpenClaw sessions and stores to memories_tr instantly. This is the CAPTURE component. For curation and injection, install v2. Changelog: - v1.2: Fixed session rotation bug - added inactivity detection (30s threshold) and improved file scoring to properly detect new sessions on /new or /reset - v1.1: Added 1-second mtime polling for session rotation - v1.0: Initial release """ import os import sys import json import time import signal import hashlib import argparse import requests from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, Optional, List # Config QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333") QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "memories_tr") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2") USER_ID = os.getenv("USER_ID", "rob") # Paths SESSIONS_DIR = Path(os.getenv("OPENCLAW_SESSIONS_DIR", "/root/.openclaw/agents/main/sessions")) # State running = True last_position = 0 current_file = None turn_counter = 0 def signal_handler(signum, frame): global running print(f"\nReceived signal {signum}, shutting down...", file=sys.stderr) running = False def get_embedding(text: str) -> List[float]: try: response = requests.post( f"{OLLAMA_URL}/api/embeddings", json={"model": EMBEDDING_MODEL, "prompt": text}, timeout=30 ) response.raise_for_status() return response.json()["embedding"] except Exception as e: print(f"Error getting embedding: {e}", file=sys.stderr) return None def clean_content(text: str) -> str: import re # Remove metadata JSON blocks text = re.sub(r'Conversation info \(untrusted metadata\):\s*```json\s*\{[\s\S]*?\}\s*```', '', text) # Remove thinking tags text = re.sub(r'\[thinking:[^\]]*\]', '', text) # Remove timestamp lines text = re.sub(r'\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [A-Z]{3}\]', '', text) # Remove markdown tables text = re.sub(r'\|[^\n]*\|', '', text) text = re.sub(r'\|[-:]+\|', '', text) # Remove markdown formatting text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) text = re.sub(r'\*([^*]+)\*', r'\1', text) text = re.sub(r'`([^`]+)`', r'\1', text) text = re.sub(r'```[\s\S]*?```', '', text) # Remove horizontal rules text = re.sub(r'---+', '', text) text = re.sub(r'\*\*\*+', '', text) # Remove excess whitespace text = re.sub(r'\n{3,}', '\n', text) text = re.sub(r'[ \t]+', ' ', text) return text.strip() def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool: if dry_run: print(f"[DRY RUN] Would store turn {turn['turn']} ({turn['role']}): {turn['content'][:60]}...") return True vector = get_embedding(turn['content']) if vector is None: print(f"Failed to get embedding for turn {turn['turn']}", file=sys.stderr) return False payload = { "user_id": turn.get('user_id', USER_ID), "role": turn['role'], "content": turn['content'], "turn": turn['turn'], "timestamp": turn.get('timestamp', datetime.now(timezone.utc).isoformat()), "date": datetime.now(timezone.utc).strftime('%Y-%m-%d'), "source": "true-recall-base", "curated": False } # Generate deterministic ID turn_id = turn.get('turn', 0) hash_bytes = hashlib.sha256(f"{USER_ID}:turn:{turn_id}:{datetime.now().strftime('%H%M%S')}".encode()).digest()[:8] point_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63) try: response = requests.put( f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points", json={ "points": [{ "id": abs(point_id), "vector": vector, "payload": payload }] }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error writing to Qdrant: {e}", file=sys.stderr) return False def get_current_session_file(): """Find the most recently active session file. Uses a combination of creation time and modification time to handle session rotation when /new or /reset is used. """ if not SESSIONS_DIR.exists(): return None files = list(SESSIONS_DIR.glob("*.jsonl")) if not files: return None # Score files by: recency (mtime) + size activity # Files with very recent mtime AND non-zero size are likely active def file_score(p: Path) -> float: try: stat = p.stat() mtime = stat.st_mtime size = stat.st_size # Prefer files with recent mtime and non-zero size # Add small bonus for larger files (active sessions grow) return mtime + (size / 1e9) # size bonus is tiny vs mtime except Exception: return 0 return max(files, key=file_score) def parse_turn(line: str, session_name: str) -> Optional[Dict[str, Any]]: global turn_counter try: entry = json.loads(line.strip()) except json.JSONDecodeError: return None if entry.get('type') != 'message' or 'message' not in entry: return None msg = entry['message'] role = msg.get('role') if role in ('toolResult', 'system', 'developer'): return None if role not in ('user', 'assistant'): return None content = "" if isinstance(msg.get('content'), list): for item in msg['content']: if isinstance(item, dict) and 'text' in item: content += item['text'] elif isinstance(msg.get('content'), str): content = msg['content'] if not content: return None content = clean_content(content) if not content or len(content) < 5: return None turn_counter += 1 return { 'turn': turn_counter, 'role': role, 'content': content[:2000], 'timestamp': entry.get('timestamp', datetime.now(timezone.utc).isoformat()), 'user_id': USER_ID } def process_new_lines(f, session_name: str, dry_run: bool = False): global last_position f.seek(last_position) for line in f: line = line.strip() if not line: continue turn = parse_turn(line, session_name) if turn: if store_to_qdrant(turn, dry_run): print(f"✅ Turn {turn['turn']} ({turn['role']}) → Qdrant") last_position = f.tell() def watch_session(session_file: Path, dry_run: bool = False): global last_position, turn_counter session_name = session_file.name.replace('.jsonl', '') print(f"Watching session: {session_file.name}") try: with open(session_file, 'r') as f: for line in f: turn_counter += 1 last_position = session_file.stat().st_size print(f"Session has {turn_counter} existing turns, starting from position {last_position}") except Exception as e: print(f"Warning: Could not read existing turns: {e}", file=sys.stderr) last_position = 0 last_session_check = time.time() last_data_time = time.time() # Track when we last saw new data last_file_size = session_file.stat().st_size if session_file.exists() else 0 INACTIVITY_THRESHOLD = 30 # seconds - if no data for 30s, check for new session with open(session_file, 'r') as f: while running: if not session_file.exists(): print("Session file removed, looking for new session...") return None current_time = time.time() # Check for newer session every 1 second if current_time - last_session_check > 1.0: last_session_check = current_time newest_session = get_current_session_file() if newest_session and newest_session != session_file: print(f"Newer session detected: {newest_session.name}") return newest_session # Check if current file is stale (no new data for threshold) if current_time - last_data_time > INACTIVITY_THRESHOLD: try: current_size = session_file.stat().st_size # If file hasn't grown, check if another session is active if current_size == last_file_size: newest_session = get_current_session_file() if newest_session and newest_session != session_file: print(f"Current session inactive, switching to: {newest_session.name}") return newest_session else: # File grew, update tracking last_file_size = current_size last_data_time = current_time except Exception: pass # Process new lines and update activity tracking old_position = last_position process_new_lines(f, session_name, dry_run) # If we processed new data, update activity timestamp if last_position > old_position: last_data_time = current_time try: last_file_size = session_file.stat().st_size except Exception: pass time.sleep(0.1) return session_file def watch_loop(dry_run: bool = False): global current_file, turn_counter while running: session_file = get_current_session_file() if session_file is None: print("No active session found, waiting...") time.sleep(1) continue if current_file != session_file: print(f"\nNew session detected: {session_file.name}") current_file = session_file turn_counter = 0 last_position = 0 result = watch_session(session_file, dry_run) if result is None: current_file = None time.sleep(0.5) def main(): global USER_ID parser = argparse.ArgumentParser(description="TrueRecall v1.1 - Real-time Memory Capture") parser.add_argument("--daemon", "-d", action="store_true", help="Run as daemon") parser.add_argument("--once", "-o", action="store_true", help="Process once then exit") parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write to Qdrant") parser.add_argument("--user-id", "-u", default=USER_ID, help=f"User ID (default: {USER_ID})") args = parser.parse_args() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if args.user_id: USER_ID = args.user_id print(f"🔍 TrueRecall v1.1 - Real-time Memory Capture") print(f"📍 Qdrant: {QDRANT_URL}/{QDRANT_COLLECTION}") print(f"🧠 Ollama: {OLLAMA_URL}/{EMBEDDING_MODEL}") print(f"👤 User: {USER_ID}") print() if args.once: print("Running once...") session_file = get_current_session_file() if session_file: watch_session(session_file, args.dry_run) else: print("No session found") else: print("Running as daemon (Ctrl+C to stop)...") watch_loop(args.dry_run) if __name__ == "__main__": main()