diff --git a/watcher/realtime_qdrant_watcher.py b/watcher/realtime_qdrant_watcher.py index e95dd41..120c178 100644 --- a/watcher/realtime_qdrant_watcher.py +++ b/watcher/realtime_qdrant_watcher.py @@ -27,7 +27,7 @@ from typing import Dict, Any, Optional, List # Config QDRANT_URL = os.getenv("QDRANT_URL", "http://10.0.0.40:6333") QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "memories_tr") -OLLAMA_URL = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434") +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "snowflake-arctic-embed2") USER_ID = os.getenv("USER_ID", "rob") @@ -94,49 +94,124 @@ def clean_content(text: str) -> str: return text.strip() +def chunk_text(text: str, max_chars: int = 6000, overlap: int = 200) -> list: + """Split text into overlapping chunks for embedding. + + Args: + text: Text to chunk + max_chars: Max chars per chunk (6000 = safe for 4K token limit) + overlap: Chars to overlap between chunks + + Returns: + List of chunk dicts with 'text' and 'chunk_index' + """ + if len(text) <= max_chars: + return [{'text': text, 'chunk_index': 0, 'total_chunks': 1}] + + chunks = [] + start = 0 + chunk_num = 0 + + while start < len(text): + end = start + max_chars + + # Try to break at sentence boundary + if end < len(text): + # Look for paragraph break first + para_break = text.rfind('\n\n', start, end) + if para_break > start + 500: + end = para_break + else: + # Look for sentence break + for delim in ['. ', '? ', '! ', '\n']: + sent_break = text.rfind(delim, start, end) + if sent_break > start + 500: + end = sent_break + 1 + break + + chunk_text = text[start:end].strip() + if len(chunk_text) > 100: # Skip tiny chunks + chunks.append(chunk_text) + chunk_num += 1 + + start = end - overlap if end < len(text) else len(text) + + # Add metadata to each chunk + total = len(chunks) + return [{'text': c, 'chunk_index': i, 'total_chunks': total} for i, c in enumerate(chunks)] + + def store_to_qdrant(turn: Dict[str, Any], dry_run: bool = False) -> bool: + """Store a conversation turn to Qdrant, chunking if needed. + + For long content, splits into multiple chunks (no data loss). + Each chunk gets its own point with chunk_index metadata. + """ if dry_run: print(f"[DRY RUN] Would store turn {turn['turn']} ({turn['role']}): {turn['content'][:60]}...") return True - vector = get_embedding(turn['content']) - if vector is None: - print(f"Failed to get embedding for turn {turn['turn']}", file=sys.stderr) - return False + content = turn['content'] + chunks = chunk_text(content) - payload = { - "user_id": turn.get('user_id', USER_ID), - "role": turn['role'], - "content": turn['content'], - "turn": turn['turn'], - "timestamp": turn.get('timestamp', datetime.now(timezone.utc).isoformat()), - "date": datetime.now(timezone.utc).strftime('%Y-%m-%d'), - "source": "openclaw-true-recall-base", - "curated": False - } + if len(chunks) > 1: + print(f" 📦 Chunking turn {turn['turn']}: {len(content)} chars → {len(chunks)} chunks", file=sys.stderr) - # Generate deterministic ID turn_id = turn.get('turn', 0) - hash_bytes = hashlib.sha256(f"{USER_ID}:turn:{turn_id}:{datetime.now().strftime('%H%M%S')}".encode()).digest()[:8] - point_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63) + base_time = datetime.now().strftime('%H%M%S') + all_success = True - try: - response = requests.put( - f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points", - json={ - "points": [{ - "id": abs(point_id), - "vector": vector, - "payload": payload - }] - }, - timeout=30 - ) - response.raise_for_status() - return True - except Exception as e: - print(f"Error writing to Qdrant: {e}", file=sys.stderr) - return False + for chunk_info in chunks: + chunk_text_content = chunk_info['text'] + chunk_index = chunk_info['chunk_index'] + total_chunks = chunk_info['total_chunks'] + + # Get embedding for this chunk + vector = get_embedding(chunk_text_content) + if vector is None: + print(f"Failed to get embedding for turn {turn['turn']} chunk {chunk_index}", file=sys.stderr) + all_success = False + continue + + # Payload includes full content reference, chunk metadata + payload = { + "user_id": turn.get('user_id', USER_ID), + "role": turn['role'], + "content": chunk_text_content, # Store chunk content (searchable) + "full_content_length": len(content), # Original length + "turn": turn['turn'], + "timestamp": turn.get('timestamp', datetime.now(timezone.utc).isoformat()), + "date": datetime.now(timezone.utc).strftime('%Y-%m-%d'), + "source": "true-recall-base", + "curated": False, + "chunk_index": chunk_index, + "total_chunks": total_chunks + } + + # Generate unique ID for each chunk + hash_bytes = hashlib.sha256( + f"{USER_ID}:turn:{turn_id}:chunk{chunk_index}:{base_time}".encode() + ).digest()[:8] + point_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63) + + try: + response = requests.put( + f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points", + json={ + "points": [{ + "id": abs(point_id), + "vector": vector, + "payload": payload + }] + }, + timeout=30 + ) + response.raise_for_status() + except Exception as e: + print(f"Error writing chunk {chunk_index} to Qdrant: {e}", file=sys.stderr) + all_success = False + + return all_success def is_lock_valid(lock_path: Path, max_age_seconds: int = 1800) -> bool: @@ -332,10 +407,24 @@ def watch_session(session_file: Path, dry_run: bool = False): INACTIVITY_THRESHOLD = 30 # seconds - if no data for 30s, check for new session - with open(session_file, 'r') as f: + # Check file exists before opening (handles deleted sessions) + if not session_file.exists(): + print(f"Session file gone: {session_file.name}, looking for new session...", file=sys.stderr) + return None + + # Track file handle for re-opening + try: + f = open(session_file, 'r') + f.seek(last_position) + except FileNotFoundError: + print(f"Session file removed during open: {session_file.name}", file=sys.stderr) + return None + + try: while running: if not session_file.exists(): print("Session file removed, looking for new session...") + f.close() return None current_time = time.time() @@ -346,6 +435,7 @@ def watch_session(session_file: Path, dry_run: bool = False): newest_session = get_current_session_file() if newest_session and newest_session != session_file: print(f"Newer session detected: {newest_session.name}") + f.close() return newest_session # Check if current file is stale (no new data for threshold) @@ -357,6 +447,7 @@ def watch_session(session_file: Path, dry_run: bool = False): newest_session = get_current_session_file() if newest_session and newest_session != session_file: print(f"Current session inactive, switching to: {newest_session.name}") + f.close() return newest_session else: # File grew, update tracking @@ -365,19 +456,31 @@ def watch_session(session_file: Path, dry_run: bool = False): except Exception: pass - # Process new lines and update activity tracking - old_position = last_position - process_new_lines(f, session_name, dry_run) + # Check if file has grown since last read + try: + current_size = session_file.stat().st_size + except Exception: + current_size = 0 - # If we processed new data, update activity timestamp - if last_position > old_position: - last_data_time = current_time - try: - last_file_size = session_file.stat().st_size - except Exception: - pass + # Only process if file has grown + if current_size > last_position: + old_position = last_position + process_new_lines(f, session_name, dry_run) + + # If we processed new data, update activity timestamp + if last_position > old_position: + last_data_time = current_time + last_file_size = current_size + else: + # Re-open file handle to detect new writes + f.close() + time.sleep(0.05) # Brief pause before re-opening + f = open(session_file, 'r') + f.seek(last_position) time.sleep(0.1) + finally: + f.close() return session_file