diff --git a/scripts/backfill_memory_to_q.py b/scripts/backfill_memory_to_q.py new file mode 100644 index 0000000..6322479 --- /dev/null +++ b/scripts/backfill_memory_to_q.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +""" +Backfill memories_tr collection from memory markdown files. + +Processes all .md files in /root/.openclaw/workspace/memory/ +and stores them to Qdrant memories_tr collection. + +Usage: + python3 backfill_memory_to_q.py [--dry-run] +""" + +import argparse +import hashlib +import json +import os +import re +import sys +from pathlib import Path +from datetime import datetime, timezone +from typing import List, Optional, Dict, Any + +import requests + +# Config +QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333") +COLLECTION_NAME = "memories_tr" +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434") +EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2") +MEMORY_DIR = Path("/root/.openclaw/workspace/memory") +USER_ID = "rob" + +def get_embedding(text: str) -> Optional[List[float]]: + """Generate embedding using Ollama""" + try: + response = requests.post( + f"{OLLAMA_URL}/api/embeddings", + json={"model": EMBEDDING_MODEL, "prompt": text[:4000]}, + timeout=30 + ) + response.raise_for_status() + return response.json()["embedding"] + except Exception as e: + print(f"Error getting embedding: {e}", file=sys.stderr) + return None + +def clean_content(text: str) -> str: + """Clean markdown content for storage""" + # Remove markdown formatting + text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) + text = re.sub(r'\*([^*]+)\*', r'\1', text) + text = re.sub(r'`([^`]+)`', r'\1', text) + text = re.sub(r'```[\s\S]*?```', '', text) + # Remove headers + text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE) + # Remove excess whitespace + text = re.sub(r'\n{3,}', '\n\n', text) + return text.strip() + +def parse_memory_file(file_path: Path) -> List[Dict[str, Any]]: + """Parse a memory markdown file into entries""" + entries = [] + + try: + content = file_path.read_text(encoding='utf-8') + except Exception as e: + print(f"Error reading {file_path}: {e}", file=sys.stderr) + return entries + + # Extract date from filename + date_match = re.search(r'(\d{4}-\d{2}-\d{2})', file_path.name) + date_str = date_match.group(1) if date_match else datetime.now().strftime('%Y-%m-%d') + + # Split by session headers (## Session: or ## Update:) + sessions = re.split(r'\n## ', content) + + for i, session in enumerate(sessions): + if not session.strip(): + continue + + # Extract session title if present + title_match = re.match(r'Session:\s*(.+)', session, re.MULTILINE) + if not title_match: + title_match = re.match(r'Update:\s*(.+)', session, re.MULTILINE) + session_title = title_match.group(1).strip() if title_match else f"Session {i}" + + # Extract key events, decisions, and content + # Look for bullet points and content + sections = session.split('\n### ') + + for section in sections: + if not section.strip(): + continue + + # Clean the content + cleaned = clean_content(section) + if len(cleaned) < 20: # Skip very short sections + continue + + entry = { + 'content': cleaned[:2000], + 'role': 'assistant', # These are summaries + 'date': date_str, + 'session_title': session_title, + 'file': file_path.name, + 'source': 'memory-backfill' + } + entries.append(entry) + + return entries + +def store_to_qdrant(entry: Dict[str, Any], dry_run: bool = False) -> bool: + """Store a memory entry to Qdrant""" + content = entry['content'] + + if dry_run: + print(f"[DRY RUN] Would store: {content[:60]}...") + return True + + vector = get_embedding(content) + if vector is None: + return False + + # Generate deterministic ID + hash_content = f"{USER_ID}:{entry['date']}:{content[:100]}" + hash_bytes = hashlib.sha256(hash_content.encode()).digest()[:8] + point_id = abs(int.from_bytes(hash_bytes, byteorder='big') % (2**63)) + + payload = { + 'user_id': USER_ID, + 'role': entry.get('role', 'assistant'), + 'content': content, + 'date': entry['date'], + 'timestamp': datetime.now(timezone.utc).isoformat(), + 'source': entry.get('source', 'memory-backfill'), + 'file': entry.get('file', ''), + 'session_title': entry.get('session_title', ''), + 'curated': True # Mark as curated since these are processed + } + + try: + response = requests.put( + f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points", + json={'points': [{'id': point_id, 'vector': vector, 'payload': payload}]}, + timeout=30 + ) + response.raise_for_status() + return True + except Exception as e: + print(f"Error storing to Qdrant: {e}", file=sys.stderr) + return False + +def main(): + parser = argparse.ArgumentParser(description='Backfill memory files to Qdrant') + parser.add_argument('--dry-run', '-n', action='store_true', help='Dry run - do not write to Qdrant') + parser.add_argument('--limit', '-l', type=int, default=None, help='Limit number of files to process') + args = parser.parse_args() + + if not MEMORY_DIR.exists(): + print(f"Memory directory not found: {MEMORY_DIR}", file=sys.stderr) + sys.exit(1) + + # Get all markdown files + md_files = sorted(MEMORY_DIR.glob('*.md')) + + if args.limit: + md_files = md_files[:args.limit] + + print(f"Found {len(md_files)} memory files to process") + print(f"Target collection: {COLLECTION_NAME}") + print(f"Qdrant URL: {QDRANT_URL}") + print(f"Ollama URL: {OLLAMA_URL}") + print() + + total_entries = 0 + stored = 0 + failed = 0 + + for file_path in md_files: + print(f"Processing: {file_path.name}") + entries = parse_memory_file(file_path) + + for entry in entries: + total_entries += 1 + if store_to_qdrant(entry, args.dry_run): + stored += 1 + print(f" ✅ Stored entry {stored}") + else: + failed += 1 + print(f" ❌ Failed entry {failed}") + + print() + print(f"Done! Processed {len(md_files)} files") + print(f"Total entries: {total_entries}") + print(f"Stored: {stored}") + print(f"Failed: {failed}") + +if __name__ == '__main__': + main() diff --git a/watcher/realtime_qdrant_watcher.py b/watcher/realtime_qdrant_watcher.py index 8f32e55..bee8961 100644 --- a/watcher/realtime_qdrant_watcher.py +++ b/watcher/realtime_qdrant_watcher.py @@ -1,9 +1,15 @@ #!/usr/bin/env python3 """ -TrueRecall Base - Real-time Qdrant Watcher +TrueRecall v1.2 - Real-time Qdrant Watcher Monitors OpenClaw sessions and stores to memories_tr instantly. -This is the CAPTURE component. For curation and injection, install Gems or Blocks addon. +This is the CAPTURE component. For curation and injection, install v2. + +Changelog: +- v1.2: Fixed session rotation bug - added inactivity detection (30s threshold) + and improved file scoring to properly detect new sessions on /new or /reset +- v1.1: Added 1-second mtime polling for session rotation +- v1.0: Initial release """ import os @@ -18,15 +24,15 @@ from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, Optional, List -# Config - EDIT THESE for your environment -QDRANT_URL = os.getenv("QDRANT_URL", "http://:6333") +# Config +QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333") QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "memories_tr") -OLLAMA_URL = os.getenv("OLLAMA_URL", "http://:11434") +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2") -USER_ID = os.getenv("USER_ID", "") +USER_ID = os.getenv("USER_ID", "rob") -# Paths - EDIT for your environment -SESSIONS_DIR = Path("~/.openclaw/agents/main/sessions").expanduser() +# Paths +SESSIONS_DIR = Path(os.getenv("OPENCLAW_SESSIONS_DIR", "/root/.openclaw/agents/main/sessions")) # State running = True @@ -133,15 +139,111 @@ def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool: return False +def is_lock_valid(lock_path: Path, max_age_seconds: int = 1800) -> bool: + """Check if lock file is valid (not stale, PID exists).""" + try: + with open(lock_path, 'r') as f: + data = json.load(f) + + # Check lock file age + created = datetime.fromisoformat(data['createdAt'].replace('Z', '+00:00')) + if (datetime.now(timezone.utc) - created).total_seconds() > max_age_seconds: + return False + + # Check PID exists + pid = data.get('pid') + if pid and not os.path.exists(f"/proc/{pid}"): + return False + + return True + except Exception: + return False + + def get_current_session_file(): + """Find the most recently active session file. + + Priority (per subagent analysis consensus): + 1. Explicit agent:main:main lookup from sessions.json (highest priority) + 2. Lock files with valid PID + recent timestamp + 3. Parse sessions.json for other active sessions + 4. File scoring by mtime + size (fallback) + """ if not SESSIONS_DIR.exists(): return None + sessions_json = SESSIONS_DIR / "sessions.json" + + # PRIORITY 1: Explicit main session lookup + if sessions_json.exists(): + try: + with open(sessions_json, 'r') as f: + sessions_data = json.load(f) + + # Look up agent:main:main explicitly + main_session = sessions_data.get("agent:main:main", {}) + main_session_id = main_session.get('sessionId') + + if main_session_id: + main_file = SESSIONS_DIR / f"{main_session_id}.jsonl" + if main_file.exists(): + return main_file + except Exception as e: + print(f"Warning: Failed to parse sessions.json for main session: {e}", file=sys.stderr) + + # PRIORITY 2: Lock files with PID validation + lock_files = list(SESSIONS_DIR.glob("*.jsonl.lock")) + valid_locks = [lf for lf in lock_files if is_lock_valid(lf)] + + if valid_locks: + # Get the most recent valid lock file + newest_lock = max(valid_locks, key=lambda p: p.stat().st_mtime) + session_file = SESSIONS_DIR / newest_lock.name.replace('.jsonl.lock', '.jsonl') + if session_file.exists(): + return session_file + + # PRIORITY 3: Parse sessions.json for other sessions with sessionFile + if sessions_json.exists(): + try: + with open(sessions_json, 'r') as f: + sessions_data = json.load(f) + + active_session = None + active_mtime = 0 + + for session_key, session_info in sessions_data.items(): + # Skip if no sessionFile (inactive subagents have null) + session_file_path = session_info.get('sessionFile') + if not session_file_path: + continue + + session_file = Path(session_file_path) + if session_file.exists(): + mtime = session_file.stat().st_mtime + if mtime > active_mtime: + active_mtime = mtime + active_session = session_file + + if active_session: + return active_session + except Exception as e: + print(f"Warning: Failed to parse sessions.json: {e}", file=sys.stderr) + + # PRIORITY 4: Score files by recency (mtime) + size files = list(SESSIONS_DIR.glob("*.jsonl")) if not files: return None - return max(files, key=lambda p: p.stat().st_mtime) + def file_score(p: Path) -> float: + try: + stat = p.stat() + mtime = stat.st_mtime + size = stat.st_size + return mtime + (size / 1e9) + except Exception: + return 0 + + return max(files, key=file_score) def parse_turn(line: str, session_name: str) -> Optional[Dict[str, Any]]: @@ -224,13 +326,57 @@ def watch_session(session_file: Path, dry_run: bool = False): print(f"Warning: Could not read existing turns: {e}", file=sys.stderr) last_position = 0 + last_session_check = time.time() + last_data_time = time.time() # Track when we last saw new data + last_file_size = session_file.stat().st_size if session_file.exists() else 0 + + INACTIVITY_THRESHOLD = 30 # seconds - if no data for 30s, check for new session + with open(session_file, 'r') as f: while running: if not session_file.exists(): print("Session file removed, looking for new session...") return None + current_time = time.time() + + # Check for newer session every 1 second + if current_time - last_session_check > 1.0: + last_session_check = current_time + newest_session = get_current_session_file() + if newest_session and newest_session != session_file: + print(f"Newer session detected: {newest_session.name}") + return newest_session + + # Check if current file is stale (no new data for threshold) + if current_time - last_data_time > INACTIVITY_THRESHOLD: + try: + current_size = session_file.stat().st_size + # If file hasn't grown, check if another session is active + if current_size == last_file_size: + newest_session = get_current_session_file() + if newest_session and newest_session != session_file: + print(f"Current session inactive, switching to: {newest_session.name}") + return newest_session + else: + # File grew, update tracking + last_file_size = current_size + last_data_time = current_time + except Exception: + pass + + # Process new lines and update activity tracking + old_position = last_position process_new_lines(f, session_name, dry_run) + + # If we processed new data, update activity timestamp + if last_position > old_position: + last_data_time = current_time + try: + last_file_size = session_file.stat().st_size + except Exception: + pass + time.sleep(0.1) return session_file @@ -263,7 +409,7 @@ def watch_loop(dry_run: bool = False): def main(): global USER_ID - parser = argparse.ArgumentParser(description="TrueRecall Base - Real-time Memory Capture") + parser = argparse.ArgumentParser(description="TrueRecall v1.1 - Real-time Memory Capture") parser.add_argument("--daemon", "-d", action="store_true", help="Run as daemon") parser.add_argument("--once", "-o", action="store_true", help="Process once then exit") parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write to Qdrant") @@ -277,7 +423,7 @@ def main(): if args.user_id: USER_ID = args.user_id - print(f"🔍 TrueRecall Base - Real-time Memory Capture") + print(f"🔍 TrueRecall v1.1 - Real-time Memory Capture") print(f"📍 Qdrant: {QDRANT_URL}/{QDRANT_COLLECTION}") print(f"🧠 Ollama: {OLLAMA_URL}/{EMBEDDING_MODEL}") print(f"👤 User: {USER_ID}")