fix: watcher crash on deleted session files, add chunking for long content
This commit is contained in:
@@ -27,7 +27,7 @@ from typing import Dict, Any, Optional, List
|
|||||||
# Config
|
# Config
|
||||||
QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333")
|
QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333")
|
||||||
QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "memories_tr")
|
QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "memories_tr")
|
||||||
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434")
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
|
||||||
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2")
|
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2")
|
||||||
USER_ID = os.getenv("USER_ID", "rob")
|
USER_ID = os.getenv("USER_ID", "rob")
|
||||||
|
|
||||||
@@ -94,30 +94,104 @@ def clean_content(text: str) -> str:
|
|||||||
return text.strip()
|
return text.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def chunk_text(text: str, max_chars: int = 6000, overlap: int = 200) -> list:
|
||||||
|
"""Split text into overlapping chunks for embedding.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Text to chunk
|
||||||
|
max_chars: Max chars per chunk (6000 = safe for 4K token limit)
|
||||||
|
overlap: Chars to overlap between chunks
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of chunk dicts with 'text' and 'chunk_index'
|
||||||
|
"""
|
||||||
|
if len(text) <= max_chars:
|
||||||
|
return [{'text': text, 'chunk_index': 0, 'total_chunks': 1}]
|
||||||
|
|
||||||
|
chunks = []
|
||||||
|
start = 0
|
||||||
|
chunk_num = 0
|
||||||
|
|
||||||
|
while start < len(text):
|
||||||
|
end = start + max_chars
|
||||||
|
|
||||||
|
# Try to break at sentence boundary
|
||||||
|
if end < len(text):
|
||||||
|
# Look for paragraph break first
|
||||||
|
para_break = text.rfind('\n\n', start, end)
|
||||||
|
if para_break > start + 500:
|
||||||
|
end = para_break
|
||||||
|
else:
|
||||||
|
# Look for sentence break
|
||||||
|
for delim in ['. ', '? ', '! ', '\n']:
|
||||||
|
sent_break = text.rfind(delim, start, end)
|
||||||
|
if sent_break > start + 500:
|
||||||
|
end = sent_break + 1
|
||||||
|
break
|
||||||
|
|
||||||
|
chunk_text = text[start:end].strip()
|
||||||
|
if len(chunk_text) > 100: # Skip tiny chunks
|
||||||
|
chunks.append(chunk_text)
|
||||||
|
chunk_num += 1
|
||||||
|
|
||||||
|
start = end - overlap if end < len(text) else len(text)
|
||||||
|
|
||||||
|
# Add metadata to each chunk
|
||||||
|
total = len(chunks)
|
||||||
|
return [{'text': c, 'chunk_index': i, 'total_chunks': total} for i, c in enumerate(chunks)]
|
||||||
|
|
||||||
|
|
||||||
def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool:
|
def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool:
|
||||||
|
"""Store a conversation turn to Qdrant, chunking if needed.
|
||||||
|
|
||||||
|
For long content, splits into multiple chunks (no data loss).
|
||||||
|
Each chunk gets its own point with chunk_index metadata.
|
||||||
|
"""
|
||||||
if dry_run:
|
if dry_run:
|
||||||
print(f"[DRY RUN] Would store turn {turn['turn']} ({turn['role']}): {turn['content'][:60]}...")
|
print(f"[DRY RUN] Would store turn {turn['turn']} ({turn['role']}): {turn['content'][:60]}...")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
vector = get_embedding(turn['content'])
|
content = turn['content']
|
||||||
if vector is None:
|
chunks = chunk_text(content)
|
||||||
print(f"Failed to get embedding for turn {turn['turn']}", file=sys.stderr)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
if len(chunks) > 1:
|
||||||
|
print(f" 📦 Chunking turn {turn['turn']}: {len(content)} chars → {len(chunks)} chunks", file=sys.stderr)
|
||||||
|
|
||||||
|
turn_id = turn.get('turn', 0)
|
||||||
|
base_time = datetime.now().strftime('%H%M%S')
|
||||||
|
all_success = True
|
||||||
|
|
||||||
|
for chunk_info in chunks:
|
||||||
|
chunk_text_content = chunk_info['text']
|
||||||
|
chunk_index = chunk_info['chunk_index']
|
||||||
|
total_chunks = chunk_info['total_chunks']
|
||||||
|
|
||||||
|
# Get embedding for this chunk
|
||||||
|
vector = get_embedding(chunk_text_content)
|
||||||
|
if vector is None:
|
||||||
|
print(f"Failed to get embedding for turn {turn['turn']} chunk {chunk_index}", file=sys.stderr)
|
||||||
|
all_success = False
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Payload includes full content reference, chunk metadata
|
||||||
payload = {
|
payload = {
|
||||||
"user_id": turn.get('user_id', USER_ID),
|
"user_id": turn.get('user_id', USER_ID),
|
||||||
"role": turn['role'],
|
"role": turn['role'],
|
||||||
"content": turn['content'],
|
"content": chunk_text_content, # Store chunk content (searchable)
|
||||||
|
"full_content_length": len(content), # Original length
|
||||||
"turn": turn['turn'],
|
"turn": turn['turn'],
|
||||||
"timestamp": turn.get('timestamp', datetime.now(timezone.utc).isoformat()),
|
"timestamp": turn.get('timestamp', datetime.now(timezone.utc).isoformat()),
|
||||||
"date": datetime.now(timezone.utc).strftime('%Y-%m-%d'),
|
"date": datetime.now(timezone.utc).strftime('%Y-%m-%d'),
|
||||||
"source": "openclaw-true-recall-base",
|
"source": "true-recall-base",
|
||||||
"curated": False
|
"curated": False,
|
||||||
|
"chunk_index": chunk_index,
|
||||||
|
"total_chunks": total_chunks
|
||||||
}
|
}
|
||||||
|
|
||||||
# Generate deterministic ID
|
# Generate unique ID for each chunk
|
||||||
turn_id = turn.get('turn', 0)
|
hash_bytes = hashlib.sha256(
|
||||||
hash_bytes = hashlib.sha256(f"{USER_ID}:turn:{turn_id}:{datetime.now().strftime('%H%M%S')}".encode()).digest()[:8]
|
f"{USER_ID}:turn:{turn_id}:chunk{chunk_index}:{base_time}".encode()
|
||||||
|
).digest()[:8]
|
||||||
point_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63)
|
point_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -133,10 +207,11 @@ def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool:
|
|||||||
timeout=30
|
timeout=30
|
||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return True
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error writing to Qdrant: {e}", file=sys.stderr)
|
print(f"Error writing chunk {chunk_index} to Qdrant: {e}", file=sys.stderr)
|
||||||
return False
|
all_success = False
|
||||||
|
|
||||||
|
return all_success
|
||||||
|
|
||||||
|
|
||||||
def is_lock_valid(lock_path: Path, max_age_seconds: int = 1800) -> bool:
|
def is_lock_valid(lock_path: Path, max_age_seconds: int = 1800) -> bool:
|
||||||
@@ -332,10 +407,24 @@ def watch_session(session_file: Path, dry_run: bool = False):
|
|||||||
|
|
||||||
INACTIVITY_THRESHOLD = 30 # seconds - if no data for 30s, check for new session
|
INACTIVITY_THRESHOLD = 30 # seconds - if no data for 30s, check for new session
|
||||||
|
|
||||||
with open(session_file, 'r') as f:
|
# Check file exists before opening (handles deleted sessions)
|
||||||
|
if not session_file.exists():
|
||||||
|
print(f"Session file gone: {session_file.name}, looking for new session...", file=sys.stderr)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Track file handle for re-opening
|
||||||
|
try:
|
||||||
|
f = open(session_file, 'r')
|
||||||
|
f.seek(last_position)
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f"Session file removed during open: {session_file.name}", file=sys.stderr)
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
while running:
|
while running:
|
||||||
if not session_file.exists():
|
if not session_file.exists():
|
||||||
print("Session file removed, looking for new session...")
|
print("Session file removed, looking for new session...")
|
||||||
|
f.close()
|
||||||
return None
|
return None
|
||||||
|
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
@@ -346,6 +435,7 @@ def watch_session(session_file: Path, dry_run: bool = False):
|
|||||||
newest_session = get_current_session_file()
|
newest_session = get_current_session_file()
|
||||||
if newest_session and newest_session != session_file:
|
if newest_session and newest_session != session_file:
|
||||||
print(f"Newer session detected: {newest_session.name}")
|
print(f"Newer session detected: {newest_session.name}")
|
||||||
|
f.close()
|
||||||
return newest_session
|
return newest_session
|
||||||
|
|
||||||
# Check if current file is stale (no new data for threshold)
|
# Check if current file is stale (no new data for threshold)
|
||||||
@@ -357,6 +447,7 @@ def watch_session(session_file: Path, dry_run: bool = False):
|
|||||||
newest_session = get_current_session_file()
|
newest_session = get_current_session_file()
|
||||||
if newest_session and newest_session != session_file:
|
if newest_session and newest_session != session_file:
|
||||||
print(f"Current session inactive, switching to: {newest_session.name}")
|
print(f"Current session inactive, switching to: {newest_session.name}")
|
||||||
|
f.close()
|
||||||
return newest_session
|
return newest_session
|
||||||
else:
|
else:
|
||||||
# File grew, update tracking
|
# File grew, update tracking
|
||||||
@@ -365,19 +456,31 @@ def watch_session(session_file: Path, dry_run: bool = False):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Process new lines and update activity tracking
|
# Check if file has grown since last read
|
||||||
|
try:
|
||||||
|
current_size = session_file.stat().st_size
|
||||||
|
except Exception:
|
||||||
|
current_size = 0
|
||||||
|
|
||||||
|
# Only process if file has grown
|
||||||
|
if current_size > last_position:
|
||||||
old_position = last_position
|
old_position = last_position
|
||||||
process_new_lines(f, session_name, dry_run)
|
process_new_lines(f, session_name, dry_run)
|
||||||
|
|
||||||
# If we processed new data, update activity timestamp
|
# If we processed new data, update activity timestamp
|
||||||
if last_position > old_position:
|
if last_position > old_position:
|
||||||
last_data_time = current_time
|
last_data_time = current_time
|
||||||
try:
|
last_file_size = current_size
|
||||||
last_file_size = session_file.stat().st_size
|
else:
|
||||||
except Exception:
|
# Re-open file handle to detect new writes
|
||||||
pass
|
f.close()
|
||||||
|
time.sleep(0.05) # Brief pause before re-opening
|
||||||
|
f = open(session_file, 'r')
|
||||||
|
f.seek(last_position)
|
||||||
|
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
finally:
|
||||||
|
f.close()
|
||||||
|
|
||||||
return session_file
|
return session_file
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user