feat: update watcher with priority-based session file detection

This commit is contained in:
root
2026-03-04 10:03:13 -06:00
parent e2ba91cbea
commit 23d9f3b36b
2 changed files with 355 additions and 11 deletions

View File

@@ -1,9 +1,15 @@
#!/usr/bin/env python3
"""
TrueRecall Base - Real-time Qdrant Watcher
TrueRecall v1.2 - Real-time Qdrant Watcher
Monitors OpenClaw sessions and stores to memories_tr instantly.
This is the CAPTURE component. For curation and injection, install Gems or Blocks addon.
This is the CAPTURE component. For curation and injection, install v2.
Changelog:
- v1.2: Fixed session rotation bug - added inactivity detection (30s threshold)
and improved file scoring to properly detect new sessions on /new or /reset
- v1.1: Added 1-second mtime polling for session rotation
- v1.0: Initial release
"""
import os
@@ -18,15 +24,15 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any, Optional, List
# Config - EDIT THESE for your environment
QDRANT_URL = os.getenv("QDRANT_URL", "http://<QDRANT_IP>:6333")
# Config
QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333")
QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "memories_tr")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://<OLLAMA_IP>:11434")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2")
USER_ID = os.getenv("USER_ID", "<USER_ID>")
USER_ID = os.getenv("USER_ID", "rob")
# Paths - EDIT for your environment
SESSIONS_DIR = Path("~/.openclaw/agents/main/sessions").expanduser()
# Paths
SESSIONS_DIR = Path(os.getenv("OPENCLAW_SESSIONS_DIR", "/root/.openclaw/agents/main/sessions"))
# State
running = True
@@ -133,15 +139,111 @@ def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool:
return False
def is_lock_valid(lock_path: Path, max_age_seconds: int = 1800) -> bool:
"""Check if lock file is valid (not stale, PID exists)."""
try:
with open(lock_path, 'r') as f:
data = json.load(f)
# Check lock file age
created = datetime.fromisoformat(data['createdAt'].replace('Z', '+00:00'))
if (datetime.now(timezone.utc) - created).total_seconds() > max_age_seconds:
return False
# Check PID exists
pid = data.get('pid')
if pid and not os.path.exists(f"/proc/{pid}"):
return False
return True
except Exception:
return False
def get_current_session_file():
"""Find the most recently active session file.
Priority (per subagent analysis consensus):
1. Explicit agent:main:main lookup from sessions.json (highest priority)
2. Lock files with valid PID + recent timestamp
3. Parse sessions.json for other active sessions
4. File scoring by mtime + size (fallback)
"""
if not SESSIONS_DIR.exists():
return None
sessions_json = SESSIONS_DIR / "sessions.json"
# PRIORITY 1: Explicit main session lookup
if sessions_json.exists():
try:
with open(sessions_json, 'r') as f:
sessions_data = json.load(f)
# Look up agent:main:main explicitly
main_session = sessions_data.get("agent:main:main", {})
main_session_id = main_session.get('sessionId')
if main_session_id:
main_file = SESSIONS_DIR / f"{main_session_id}.jsonl"
if main_file.exists():
return main_file
except Exception as e:
print(f"Warning: Failed to parse sessions.json for main session: {e}", file=sys.stderr)
# PRIORITY 2: Lock files with PID validation
lock_files = list(SESSIONS_DIR.glob("*.jsonl.lock"))
valid_locks = [lf for lf in lock_files if is_lock_valid(lf)]
if valid_locks:
# Get the most recent valid lock file
newest_lock = max(valid_locks, key=lambda p: p.stat().st_mtime)
session_file = SESSIONS_DIR / newest_lock.name.replace('.jsonl.lock', '.jsonl')
if session_file.exists():
return session_file
# PRIORITY 3: Parse sessions.json for other sessions with sessionFile
if sessions_json.exists():
try:
with open(sessions_json, 'r') as f:
sessions_data = json.load(f)
active_session = None
active_mtime = 0
for session_key, session_info in sessions_data.items():
# Skip if no sessionFile (inactive subagents have null)
session_file_path = session_info.get('sessionFile')
if not session_file_path:
continue
session_file = Path(session_file_path)
if session_file.exists():
mtime = session_file.stat().st_mtime
if mtime > active_mtime:
active_mtime = mtime
active_session = session_file
if active_session:
return active_session
except Exception as e:
print(f"Warning: Failed to parse sessions.json: {e}", file=sys.stderr)
# PRIORITY 4: Score files by recency (mtime) + size
files = list(SESSIONS_DIR.glob("*.jsonl"))
if not files:
return None
return max(files, key=lambda p: p.stat().st_mtime)
def file_score(p: Path) -> float:
try:
stat = p.stat()
mtime = stat.st_mtime
size = stat.st_size
return mtime + (size / 1e9)
except Exception:
return 0
return max(files, key=file_score)
def parse_turn(line: str, session_name: str) -> Optional[Dict[str, Any]]:
@@ -224,13 +326,57 @@ def watch_session(session_file: Path, dry_run: bool = False):
print(f"Warning: Could not read existing turns: {e}", file=sys.stderr)
last_position = 0
last_session_check = time.time()
last_data_time = time.time() # Track when we last saw new data
last_file_size = session_file.stat().st_size if session_file.exists() else 0
INACTIVITY_THRESHOLD = 30 # seconds - if no data for 30s, check for new session
with open(session_file, 'r') as f:
while running:
if not session_file.exists():
print("Session file removed, looking for new session...")
return None
current_time = time.time()
# Check for newer session every 1 second
if current_time - last_session_check > 1.0:
last_session_check = current_time
newest_session = get_current_session_file()
if newest_session and newest_session != session_file:
print(f"Newer session detected: {newest_session.name}")
return newest_session
# Check if current file is stale (no new data for threshold)
if current_time - last_data_time > INACTIVITY_THRESHOLD:
try:
current_size = session_file.stat().st_size
# If file hasn't grown, check if another session is active
if current_size == last_file_size:
newest_session = get_current_session_file()
if newest_session and newest_session != session_file:
print(f"Current session inactive, switching to: {newest_session.name}")
return newest_session
else:
# File grew, update tracking
last_file_size = current_size
last_data_time = current_time
except Exception:
pass
# Process new lines and update activity tracking
old_position = last_position
process_new_lines(f, session_name, dry_run)
# If we processed new data, update activity timestamp
if last_position > old_position:
last_data_time = current_time
try:
last_file_size = session_file.stat().st_size
except Exception:
pass
time.sleep(0.1)
return session_file
@@ -263,7 +409,7 @@ def watch_loop(dry_run: bool = False):
def main():
global USER_ID
parser = argparse.ArgumentParser(description="TrueRecall Base - Real-time Memory Capture")
parser = argparse.ArgumentParser(description="TrueRecall v1.1 - Real-time Memory Capture")
parser.add_argument("--daemon", "-d", action="store_true", help="Run as daemon")
parser.add_argument("--once", "-o", action="store_true", help="Process once then exit")
parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write to Qdrant")
@@ -277,7 +423,7 @@ def main():
if args.user_id:
USER_ID = args.user_id
print(f"🔍 TrueRecall Base - Real-time Memory Capture")
print(f"🔍 TrueRecall v1.1 - Real-time Memory Capture")
print(f"📍 Qdrant: {QDRANT_URL}/{QDRANT_COLLECTION}")
print(f"🧠 Ollama: {OLLAMA_URL}/{EMBEDDING_MODEL}")
print(f"👤 User: {USER_ID}")