#!/usr/bin/env python3 """ Real-time Qdrant Watcher: Monitors OpenClaw session JSONL and stores to Qdrant instantly. This daemon watches the active session file, embeds each conversation turn, and stores directly to Qdrant memories_tr collection (real-time, no Redis). Usage: # Run as daemon python3 realtime_qdrant_watcher.py --daemon # Run once (process current session then exit) python3 realtime_qdrant_watcher.py --once # Test mode (print to stdout, don't write to Qdrant) python3 realtime_qdrant_watcher.py --dry-run Systemd service: # Copy to /etc/systemd/system/mem-qdrant-watcher.service # systemctl enable --now mem-qdrant-watcher """ import os import sys import json import time import signal import hashlib import argparse import requests from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, Optional, List # Config - Set via environment variables or use placeholders QDRANT_URL = os.getenv("QDRANT_URL", "http://:6333") QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "memories_tr") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://:11434") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2") USER_ID = os.getenv("USER_ID", "") # Paths SESSIONS_DIR = Path(os.getenv("SESSIONS_DIR", "~/.openclaw/agents/main/sessions")).expanduser() # State running = True last_position = 0 current_file = None turn_counter = 0 def signal_handler(signum, frame): """Handle shutdown gracefully.""" global running print(f"\nReceived signal {signum}, shutting down...", file=sys.stderr) running = False def get_embedding(text: str) -> List[float]: """Get embedding vector from Ollama.""" try: response = requests.post( f"{OLLAMA_URL}/api/embeddings", json={"model": EMBEDDING_MODEL, "prompt": text}, timeout=30 ) response.raise_for_status() return response.json()["embedding"] except Exception as e: print(f"Error getting embedding: {e}", file=sys.stderr) return None def clean_content(text: str) -> str: """Clean content - remove metadata, markdown, keep only plain text.""" import re # Remove metadata JSON blocks text = re.sub(r'Conversation info \(untrusted metadata\):\s*```json\s*\{[\s\S]*?\}\s*```', '', text) # Remove thinking tags text = re.sub(r'\[thinking:[^\]]*\]', '', text) # Remove timestamp lines text = re.sub(r'\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [A-Z]{3}\]', '', text) # Remove markdown tables text = re.sub(r'\|[^\n]*\|', '', text) # Table rows text = re.sub(r'\|[-:]+\|', '', text) # Table separators # Remove markdown formatting text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # Bold **text** text = re.sub(r'\*([^*]+)\*', r'\1', text) # Italic *text* text = re.sub(r'`([^`]+)`', r'\1', text) # Inline code `text` text = re.sub(r'```[\s\S]*?```', '', text) # Code blocks # Remove horizontal rules text = re.sub(r'---+', '', text) text = re.sub(r'\*\*\*+', '', text) # Remove excess whitespace text = re.sub(r'\n{3,}', '\n', text) text = re.sub(r'[ \t]+', ' ', text) # Multiple spaces -> single return text.strip() def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool: """Store a single turn to Qdrant with embedding.""" if dry_run: print(f"[DRY RUN] Would store turn {turn['turn']} ({turn['role']}): {turn['content'][:60]}...") return True # Get embedding vector = get_embedding(turn['content']) if vector is None: print(f"Failed to get embedding for turn {turn['turn']}", file=sys.stderr) return False # Prepare payload payload = { "user_id": turn.get('user_id', USER_ID), "role": turn['role'], "content": turn['content'], "turn": turn['turn'], "timestamp": turn.get('timestamp', datetime.now(timezone.utc).isoformat()), "date": datetime.now(timezone.utc).strftime('%Y-%m-%d'), "source": "realtime_watcher", "curated": False } # Generate deterministic ID turn_id = turn.get('turn', 0) hash_bytes = hashlib.sha256(f"{USER_ID}:turn:{turn_id}:{datetime.now().strftime('%H%M%S')}".encode()).digest()[:8] point_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63) # Store to Qdrant try: response = requests.put( f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points", json={ "points": [{ "id": abs(point_id), "vector": vector, "payload": payload }] }, timeout=30 ) response.raise_for_status() return True except Exception as e: print(f"Error writing to Qdrant: {e}", file=sys.stderr) return False def get_current_session_file(): """Find the most recently modified session JSONL file.""" if not SESSIONS_DIR.exists(): return None files = list(SESSIONS_DIR.glob("*.jsonl")) if not files: return None return max(files, key=lambda p: p.stat().st_mtime) def parse_turn(line: str, session_name: str) -> Optional[Dict[str, Any]]: """Parse a single JSONL line into a turn dict.""" global turn_counter try: entry = json.loads(line.strip()) except json.JSONDecodeError: return None # OpenClaw format: {"type": "message", "message": {...}} if entry.get('type') != 'message' or 'message' not in entry: return None msg = entry['message'] role = msg.get('role') # Skip tool results, system, developer messages if role in ('toolResult', 'system', 'developer'): return None if role not in ('user', 'assistant'): return None # Extract content content = "" if isinstance(msg.get('content'), list): for item in msg['content']: if isinstance(item, dict) and 'text' in item: content += item['text'] elif isinstance(msg.get('content'), str): content = msg['content'] if not content: return None # Clean content content = clean_content(content) if not content or len(content) < 5: return None turn_counter += 1 return { 'turn': turn_counter, 'role': role, 'content': content[:2000], # Limit size 'timestamp': entry.get('timestamp', datetime.now(timezone.utc).isoformat()), 'user_id': USER_ID } def process_new_lines(f, session_name: str, dry_run: bool = False): """Process any new lines added to the file.""" global last_position f.seek(last_position) for line in f: line = line.strip() if not line: continue turn = parse_turn(line, session_name) if turn: if store_to_qdrant(turn, dry_run): print(f"✅ Turn {turn['turn']} ({turn['role']}) → Qdrant") last_position = f.tell() def watch_session(session_file: Path, dry_run: bool = False): """Watch a specific session file for new lines.""" global last_position, turn_counter session_name = session_file.name.replace('.jsonl', '') print(f"Watching session: {session_file.name}") try: with open(session_file, 'r') as f: for line in f: turn_counter += 1 last_position = session_file.stat().st_size print(f"Session has {turn_counter} existing turns, starting from position {last_position}") except Exception as e: print(f"Warning: Could not read existing turns: {e}", file=sys.stderr) last_position = 0 with open(session_file, 'r') as f: while running: if not session_file.exists(): print("Session file removed, looking for new session...") return None process_new_lines(f, session_name, dry_run) time.sleep(0.1) return session_file def watch_loop(dry_run: bool = False): """Main watch loop - handles session rotation.""" global current_file, turn_counter while running: session_file = get_current_session_file() if session_file is None: print("No active session found, waiting...") time.sleep(1) continue if current_file != session_file: print(f"\nNew session detected: {session_file.name}") current_file = session_file turn_counter = 0 last_position = 0 result = watch_session(session_file, dry_run) if result is None: current_file = None time.sleep(0.5) def main(): global USER_ID parser = argparse.ArgumentParser( description="Real-time OpenClaw session watcher → Qdrant" ) parser.add_argument("--daemon", "-d", action="store_true", help="Run as daemon") parser.add_argument("--once", "-o", action="store_true", help="Process once then exit") parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write to Qdrant") parser.add_argument("--user-id", "-u", default=USER_ID, help=f"User ID (default: {USER_ID})") args = parser.parse_args() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if args.user_id: USER_ID = args.user_id print(f"🔍 Real-time Qdrant Watcher") print(f"📍 Qdrant: {QDRANT_URL}/{QDRANT_COLLECTION}") print(f"🧠 Ollama: {OLLAMA_URL}/{EMBEDDING_MODEL}") print(f"👤 User: {USER_ID}") print(f"📝 Sessions: {SESSIONS_DIR}") print() if args.once: print("Running once...") session_file = get_current_session_file() if session_file: watch_session(session_file, args.dry_run) else: print("No session found") else: print("Running as daemon (Ctrl+C to stop)...") watch_loop(args.dry_run) if __name__ == "__main__": main()