diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..684e707 --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +*.egg-info/ +dist/ +build/ + +# Environment +.env +.env.* +.venv/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Session notes (local only) +session.md +*.session.md + +# Logs +*.log +logs/ diff --git a/README.md b/README.md index 33aae7e..bdd56cc 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,17 @@ In this version, we use a **local Qdrant database** (`http://:6333`). --- +## Gotchas & Known Limitations + +> ⚠️ **Embedding Dimensions:** `snowflake-arctic-embed2` outputs **1024 dimensions**, not 768. Ensure your Qdrant collection is configured with `"size": 1024`. + +> ⚠️ **Hardcoded Sessions Path:** `SESSIONS_DIR` is hardcoded to `/root/.openclaw/agents/main/sessions`. To use a different path, modify `realtime_qdrant_watcher.py` to read from an environment variable: +> ```python +> SESSIONS_DIR = Path(os.getenv("OPENCLAW_SESSIONS_DIR", "/root/.openclaw/agents/main/sessions")) +> ``` + +--- + ## Three-Tier Architecture ``` @@ -246,6 +257,8 @@ The watcher monitors OpenClaw session files in real-time: SESSIONS_DIR = Path("/root/.openclaw/agents/main/sessions") ``` +> ⚠️ **Known Limitation:** `SESSIONS_DIR` is currently hardcoded. To use a different path, patch the watcher script to read from an environment variable (e.g., `os.getenv("OPENCLAW_SESSIONS_DIR", "/root/.openclaw/agents/main/sessions")`). + **What happens:** - Uses `inotify` or polling to watch the sessions directory - Automatically detects the most recently modified `.jsonl` file @@ -327,7 +340,7 @@ def get_embedding(text: str) -> List[float]: **What happens:** - Sends text to Ollama API (10.0.0.10:11434) - Uses `snowflake-arctic-embed2` model -- Returns 768-dimensional vector +- Returns **1024-dimensional vector** (not 768) - Falls back gracefully if Ollama is unavailable #### Step 5: Qdrant Storage @@ -404,7 +417,7 @@ When OpenClaw starts a new session: { "name": "memories_tr", "vectors": { - "size": 768, # snowflake-arctic-embed2 dimension + "size": 1024, # snowflake-arctic-embed2 dimension (1024, not 768) "distance": "Cosine" # Similarity metric }, "payload_schema": { @@ -550,4 +563,94 @@ memories_tr → Topic Engine → topic_blocks_tr → Retrieval → Context --- +## Updating / Patching + +If you already have TrueRecall Base installed and need to apply a bug fix or update: + +### Quick Update (v1.2 Patch) + +**Applies to:** Session file detection fix (picks wrong file when multiple sessions active) + +```bash +# 1. Backup current watcher +cp /root/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py \ + /root/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py.bak.$(date +%Y%m%d) + +# 2. Download latest watcher (choose one source) + +# Option A: From GitHub +curl -o /root/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py \ + https://raw.githubusercontent.com/speedyfoxai/openclaw-true-recall-base/master/watcher/realtime_qdrant_watcher.py + +# Option B: From GitLab +curl -o /root/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py \ + https://gitlab.com/mdkrush/true-recall-base/-/raw/master/watcher/realtime_qdrant_watcher.py + +# Option C: From local git (if cloned) +cp /path/to/true-recall-base/watcher/realtime_qdrant_watcher.py \ + /root/.openclaw/workspace/skills/qdrant-memory/scripts/ + +# 3. Stop old watcher +pkill -f realtime_qdrant_watcher + +# 4. Start new watcher +python3 /root/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py --daemon + +# 5. Verify +ps aux | grep watcher +lsof -p $(pgrep -f realtime_qdrant_watcher) | grep jsonl +``` + +### Update with Git (If Cloned) + +```bash +cd /path/to/true-recall-base +git pull origin master + +# Copy updated files +cp watcher/realtime_qdrant_watcher.py \ + /root/.openclaw/workspace/skills/qdrant-memory/scripts/ + +# Copy optional: backfill script +cp scripts/backfill_memory_to_q.py \ + /root/.openclaw/workspace/skills/qdrant-memory/scripts/ 2>/dev/null || true + +# Restart watcher +sudo systemctl restart mem-qdrant-watcher +# OR manually: +pkill -f realtime_qdrant_watcher +python3 /root/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py --daemon +``` + +### Verify Update Applied + +```bash +# Check version in file +grep "v1.2" /root/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py + +# Verify watcher is running +ps aux | grep realtime_qdrant_watcher + +# Confirm watching main session (not subagent) +lsof -p $(pgrep -f realtime_qdrant_watcher) | grep jsonl + +# Check recent captures in Qdrant +curl -s "http://10.0.0.40:6333/collections/memories_tr/points/scroll" \ + -H "Content-Type: application/json" \ + -d '{"limit": 3, "with_payload": true}' | jq -r '.result.points[].payload.timestamp' +``` + +### What's New in v1.2 + +| Feature | Benefit | +|---------|---------| +| **Priority-based session detection** | Always picks `agent:main:main` first | +| **Lock file validation** | Ignores stale/crashed session locks via PID check | +| **Inactive subagent filtering** | Skips sessions with `sessionFile=null` | +| **Backfill script** | Import historical memories from markdown files | + +**No config changes required** - existing `config.json` works unchanged. + +--- + **Prerequisite for:** TrueRecall Gems, TrueRecall Blocks diff --git a/config.json b/config.json index 931a2c9..a643e31 100644 --- a/config.json +++ b/config.json @@ -1,12 +1,12 @@ { "version": "1.0", - "description": "TrueRecall v1 - Memory capture only", + "description": "TrueRecall Base - Memory capture", "components": ["watcher"], "collections": { "memories": "memories_tr" }, - "qdrant_url": "http://10.0.0.40:6333", - "ollama_url": "http://10.0.0.10:11434", + "qdrant_url": "http://:6333", + "ollama_url": "http://:11434", "embedding_model": "snowflake-arctic-embed2", - "user_id": "rob" + "user_id": "" } diff --git a/scripts/backfill_memory_to_q.py b/scripts/backfill_memory_to_q.py new file mode 100644 index 0000000..6322479 --- /dev/null +++ b/scripts/backfill_memory_to_q.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +""" +Backfill memories_tr collection from memory markdown files. + +Processes all .md files in /root/.openclaw/workspace/memory/ +and stores them to Qdrant memories_tr collection. + +Usage: + python3 backfill_memory_to_q.py [--dry-run] +""" + +import argparse +import hashlib +import json +import os +import re +import sys +from pathlib import Path +from datetime import datetime, timezone +from typing import List, Optional, Dict, Any + +import requests + +# Config +QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333") +COLLECTION_NAME = "memories_tr" +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434") +EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2") +MEMORY_DIR = Path("/root/.openclaw/workspace/memory") +USER_ID = "rob" + +def get_embedding(text: str) -> Optional[List[float]]: + """Generate embedding using Ollama""" + try: + response = requests.post( + f"{OLLAMA_URL}/api/embeddings", + json={"model": EMBEDDING_MODEL, "prompt": text[:4000]}, + timeout=30 + ) + response.raise_for_status() + return response.json()["embedding"] + except Exception as e: + print(f"Error getting embedding: {e}", file=sys.stderr) + return None + +def clean_content(text: str) -> str: + """Clean markdown content for storage""" + # Remove markdown formatting + text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) + text = re.sub(r'\*([^*]+)\*', r'\1', text) + text = re.sub(r'`([^`]+)`', r'\1', text) + text = re.sub(r'```[\s\S]*?```', '', text) + # Remove headers + text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE) + # Remove excess whitespace + text = re.sub(r'\n{3,}', '\n\n', text) + return text.strip() + +def parse_memory_file(file_path: Path) -> List[Dict[str, Any]]: + """Parse a memory markdown file into entries""" + entries = [] + + try: + content = file_path.read_text(encoding='utf-8') + except Exception as e: + print(f"Error reading {file_path}: {e}", file=sys.stderr) + return entries + + # Extract date from filename + date_match = re.search(r'(\d{4}-\d{2}-\d{2})', file_path.name) + date_str = date_match.group(1) if date_match else datetime.now().strftime('%Y-%m-%d') + + # Split by session headers (## Session: or ## Update:) + sessions = re.split(r'\n## ', content) + + for i, session in enumerate(sessions): + if not session.strip(): + continue + + # Extract session title if present + title_match = re.match(r'Session:\s*(.+)', session, re.MULTILINE) + if not title_match: + title_match = re.match(r'Update:\s*(.+)', session, re.MULTILINE) + session_title = title_match.group(1).strip() if title_match else f"Session {i}" + + # Extract key events, decisions, and content + # Look for bullet points and content + sections = session.split('\n### ') + + for section in sections: + if not section.strip(): + continue + + # Clean the content + cleaned = clean_content(section) + if len(cleaned) < 20: # Skip very short sections + continue + + entry = { + 'content': cleaned[:2000], + 'role': 'assistant', # These are summaries + 'date': date_str, + 'session_title': session_title, + 'file': file_path.name, + 'source': 'memory-backfill' + } + entries.append(entry) + + return entries + +def store_to_qdrant(entry: Dict[str, Any], dry_run: bool = False) -> bool: + """Store a memory entry to Qdrant""" + content = entry['content'] + + if dry_run: + print(f"[DRY RUN] Would store: {content[:60]}...") + return True + + vector = get_embedding(content) + if vector is None: + return False + + # Generate deterministic ID + hash_content = f"{USER_ID}:{entry['date']}:{content[:100]}" + hash_bytes = hashlib.sha256(hash_content.encode()).digest()[:8] + point_id = abs(int.from_bytes(hash_bytes, byteorder='big') % (2**63)) + + payload = { + 'user_id': USER_ID, + 'role': entry.get('role', 'assistant'), + 'content': content, + 'date': entry['date'], + 'timestamp': datetime.now(timezone.utc).isoformat(), + 'source': entry.get('source', 'memory-backfill'), + 'file': entry.get('file', ''), + 'session_title': entry.get('session_title', ''), + 'curated': True # Mark as curated since these are processed + } + + try: + response = requests.put( + f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points", + json={'points': [{'id': point_id, 'vector': vector, 'payload': payload}]}, + timeout=30 + ) + response.raise_for_status() + return True + except Exception as e: + print(f"Error storing to Qdrant: {e}", file=sys.stderr) + return False + +def main(): + parser = argparse.ArgumentParser(description='Backfill memory files to Qdrant') + parser.add_argument('--dry-run', '-n', action='store_true', help='Dry run - do not write to Qdrant') + parser.add_argument('--limit', '-l', type=int, default=None, help='Limit number of files to process') + args = parser.parse_args() + + if not MEMORY_DIR.exists(): + print(f"Memory directory not found: {MEMORY_DIR}", file=sys.stderr) + sys.exit(1) + + # Get all markdown files + md_files = sorted(MEMORY_DIR.glob('*.md')) + + if args.limit: + md_files = md_files[:args.limit] + + print(f"Found {len(md_files)} memory files to process") + print(f"Target collection: {COLLECTION_NAME}") + print(f"Qdrant URL: {QDRANT_URL}") + print(f"Ollama URL: {OLLAMA_URL}") + print() + + total_entries = 0 + stored = 0 + failed = 0 + + for file_path in md_files: + print(f"Processing: {file_path.name}") + entries = parse_memory_file(file_path) + + for entry in entries: + total_entries += 1 + if store_to_qdrant(entry, args.dry_run): + stored += 1 + print(f" ✅ Stored entry {stored}") + else: + failed += 1 + print(f" ❌ Failed entry {failed}") + + print() + print(f"Done! Processed {len(md_files)} files") + print(f"Total entries: {total_entries}") + print(f"Stored: {stored}") + print(f"Failed: {failed}") + +if __name__ == '__main__': + main() diff --git a/watcher/mem-qdrant-watcher.service b/watcher/mem-qdrant-watcher.service index 57fcf43..a5c566a 100644 --- a/watcher/mem-qdrant-watcher.service +++ b/watcher/mem-qdrant-watcher.service @@ -4,14 +4,14 @@ After=network.target [Service] Type=simple -User=root -WorkingDirectory=/root/.openclaw/workspace/.local_projects/true-recall-base/watcher -Environment="QDRANT_URL=http://10.0.0.40:6333" +User= +WorkingDirectory=/true-recall-base/watcher +Environment="QDRANT_URL=http://:6333" Environment="QDRANT_COLLECTION=memories_tr" -Environment="OLLAMA_URL=http://10.0.0.10:11434" +Environment="OLLAMA_URL=http://:11434" Environment="EMBEDDING_MODEL=snowflake-arctic-embed2" -Environment="USER_ID=rob" -ExecStart=/usr/bin/python3 /root/.openclaw/workspace/.local_projects/true-recall-base/watcher/realtime_qdrant_watcher.py --daemon +Environment="USER_ID=" +ExecStart=/usr/bin/python3 /true-recall-base/watcher/realtime_qdrant_watcher.py --daemon Restart=always RestartSec=5 diff --git a/watcher/realtime_qdrant_watcher.py b/watcher/realtime_qdrant_watcher.py index f3963d2..bee8961 100644 --- a/watcher/realtime_qdrant_watcher.py +++ b/watcher/realtime_qdrant_watcher.py @@ -1,9 +1,15 @@ #!/usr/bin/env python3 """ -TrueRecall v1 - Real-time Qdrant Watcher +TrueRecall v1.2 - Real-time Qdrant Watcher Monitors OpenClaw sessions and stores to memories_tr instantly. This is the CAPTURE component. For curation and injection, install v2. + +Changelog: +- v1.2: Fixed session rotation bug - added inactivity detection (30s threshold) + and improved file scoring to properly detect new sessions on /new or /reset +- v1.1: Added 1-second mtime polling for session rotation +- v1.0: Initial release """ import os @@ -26,7 +32,7 @@ EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2") USER_ID = os.getenv("USER_ID", "rob") # Paths -SESSIONS_DIR = Path("/root/.openclaw/agents/main/sessions") +SESSIONS_DIR = Path(os.getenv("OPENCLAW_SESSIONS_DIR", "/root/.openclaw/agents/main/sessions")) # State running = True @@ -133,15 +139,111 @@ def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool: return False +def is_lock_valid(lock_path: Path, max_age_seconds: int = 1800) -> bool: + """Check if lock file is valid (not stale, PID exists).""" + try: + with open(lock_path, 'r') as f: + data = json.load(f) + + # Check lock file age + created = datetime.fromisoformat(data['createdAt'].replace('Z', '+00:00')) + if (datetime.now(timezone.utc) - created).total_seconds() > max_age_seconds: + return False + + # Check PID exists + pid = data.get('pid') + if pid and not os.path.exists(f"/proc/{pid}"): + return False + + return True + except Exception: + return False + + def get_current_session_file(): + """Find the most recently active session file. + + Priority (per subagent analysis consensus): + 1. Explicit agent:main:main lookup from sessions.json (highest priority) + 2. Lock files with valid PID + recent timestamp + 3. Parse sessions.json for other active sessions + 4. File scoring by mtime + size (fallback) + """ if not SESSIONS_DIR.exists(): return None + sessions_json = SESSIONS_DIR / "sessions.json" + + # PRIORITY 1: Explicit main session lookup + if sessions_json.exists(): + try: + with open(sessions_json, 'r') as f: + sessions_data = json.load(f) + + # Look up agent:main:main explicitly + main_session = sessions_data.get("agent:main:main", {}) + main_session_id = main_session.get('sessionId') + + if main_session_id: + main_file = SESSIONS_DIR / f"{main_session_id}.jsonl" + if main_file.exists(): + return main_file + except Exception as e: + print(f"Warning: Failed to parse sessions.json for main session: {e}", file=sys.stderr) + + # PRIORITY 2: Lock files with PID validation + lock_files = list(SESSIONS_DIR.glob("*.jsonl.lock")) + valid_locks = [lf for lf in lock_files if is_lock_valid(lf)] + + if valid_locks: + # Get the most recent valid lock file + newest_lock = max(valid_locks, key=lambda p: p.stat().st_mtime) + session_file = SESSIONS_DIR / newest_lock.name.replace('.jsonl.lock', '.jsonl') + if session_file.exists(): + return session_file + + # PRIORITY 3: Parse sessions.json for other sessions with sessionFile + if sessions_json.exists(): + try: + with open(sessions_json, 'r') as f: + sessions_data = json.load(f) + + active_session = None + active_mtime = 0 + + for session_key, session_info in sessions_data.items(): + # Skip if no sessionFile (inactive subagents have null) + session_file_path = session_info.get('sessionFile') + if not session_file_path: + continue + + session_file = Path(session_file_path) + if session_file.exists(): + mtime = session_file.stat().st_mtime + if mtime > active_mtime: + active_mtime = mtime + active_session = session_file + + if active_session: + return active_session + except Exception as e: + print(f"Warning: Failed to parse sessions.json: {e}", file=sys.stderr) + + # PRIORITY 4: Score files by recency (mtime) + size files = list(SESSIONS_DIR.glob("*.jsonl")) if not files: return None - return max(files, key=lambda p: p.stat().st_mtime) + def file_score(p: Path) -> float: + try: + stat = p.stat() + mtime = stat.st_mtime + size = stat.st_size + return mtime + (size / 1e9) + except Exception: + return 0 + + return max(files, key=file_score) def parse_turn(line: str, session_name: str) -> Optional[Dict[str, Any]]: @@ -224,13 +326,57 @@ def watch_session(session_file: Path, dry_run: bool = False): print(f"Warning: Could not read existing turns: {e}", file=sys.stderr) last_position = 0 + last_session_check = time.time() + last_data_time = time.time() # Track when we last saw new data + last_file_size = session_file.stat().st_size if session_file.exists() else 0 + + INACTIVITY_THRESHOLD = 30 # seconds - if no data for 30s, check for new session + with open(session_file, 'r') as f: while running: if not session_file.exists(): print("Session file removed, looking for new session...") return None + current_time = time.time() + + # Check for newer session every 1 second + if current_time - last_session_check > 1.0: + last_session_check = current_time + newest_session = get_current_session_file() + if newest_session and newest_session != session_file: + print(f"Newer session detected: {newest_session.name}") + return newest_session + + # Check if current file is stale (no new data for threshold) + if current_time - last_data_time > INACTIVITY_THRESHOLD: + try: + current_size = session_file.stat().st_size + # If file hasn't grown, check if another session is active + if current_size == last_file_size: + newest_session = get_current_session_file() + if newest_session and newest_session != session_file: + print(f"Current session inactive, switching to: {newest_session.name}") + return newest_session + else: + # File grew, update tracking + last_file_size = current_size + last_data_time = current_time + except Exception: + pass + + # Process new lines and update activity tracking + old_position = last_position process_new_lines(f, session_name, dry_run) + + # If we processed new data, update activity timestamp + if last_position > old_position: + last_data_time = current_time + try: + last_file_size = session_file.stat().st_size + except Exception: + pass + time.sleep(0.1) return session_file @@ -263,7 +409,7 @@ def watch_loop(dry_run: bool = False): def main(): global USER_ID - parser = argparse.ArgumentParser(description="TrueRecall v1 - Real-time Memory Capture") + parser = argparse.ArgumentParser(description="TrueRecall v1.1 - Real-time Memory Capture") parser.add_argument("--daemon", "-d", action="store_true", help="Run as daemon") parser.add_argument("--once", "-o", action="store_true", help="Process once then exit") parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write to Qdrant") @@ -277,7 +423,7 @@ def main(): if args.user_id: USER_ID = args.user_id - print(f"🔍 TrueRecall v1 - Real-time Memory Capture") + print(f"🔍 TrueRecall v1.1 - Real-time Memory Capture") print(f"📍 Qdrant: {QDRANT_URL}/{QDRANT_COLLECTION}") print(f"🧠 Ollama: {OLLAMA_URL}/{EMBEDDING_MODEL}") print(f"👤 User: {USER_ID}")