diff --git a/app/utils.py b/app/utils.py index b2f22b0..532a8a5 100644 --- a/app/utils.py +++ b/app/utils.py @@ -2,7 +2,7 @@ from .config import config import tiktoken import os -from typing import List, Dict +from typing import List, Dict, Optional from datetime import datetime, timedelta from pathlib import Path @@ -127,10 +127,70 @@ def load_system_prompt() -> str: return "" +def parse_curated_turn(text: str) -> List[Dict]: + """Parse a curated turn into alternating user/assistant messages. + + Input format: + User: [question] + Assistant: [answer] + Timestamp: ISO datetime + + Returns list of message dicts with role and content. + Returns empty list if parsing fails. + """ + if not text: + return [] + + messages = [] + lines = text.strip().split("\n") + + current_role = None + current_content = [] + + for line in lines: + line = line.strip() + if line.startswith("User:"): + # Save previous content if exists + if current_role and current_content: + messages.append({ + "role": current_role, + "content": "\n".join(current_content).strip() + }) + current_role = "user" + current_content = [line[5:].strip()] # Remove "User:" prefix + elif line.startswith("Assistant:"): + # Save previous content if exists + if current_role and current_content: + messages.append({ + "role": current_role, + "content": "\n".join(current_content).strip() + }) + current_role = "assistant" + current_content = [line[10:].strip()] # Remove "Assistant:" prefix + elif line.startswith("Timestamp:"): + # Ignore timestamp line + continue + elif current_role: + # Continuation of current message + current_content.append(line) + + # Save last message + if current_role and current_content: + messages.append({ + "role": current_role, + "content": "\n".join(current_content).strip() + }) + + return messages + + async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]: """Build 4-layer augmented messages from incoming messages. - This is a standalone version that can be used by proxy_handler.py. + Layer 1: System prompt (preserved from incoming + vera context) + Layer 2: Semantic memories (curated, parsed into proper roles) + Layer 3: Recent context (raw turns, parsed into proper roles) + Layer 4: Current conversation (passed through) """ import logging @@ -153,6 +213,10 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]: search_context += msg.get("content", "") + " " messages = [] + token_budget = { + "semantic": config.semantic_token_budget, + "context": config.context_token_budget + } # === LAYER 1: System Prompt === system_content = "" @@ -166,6 +230,7 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]: if system_content: messages.append({"role": "system", "content": system_content}) + logger.info(f"Layer 1 (system): {count_tokens(system_content)} tokens") # === LAYER 2: Semantic (curated memories) === qdrant = get_qdrant_service() @@ -176,28 +241,71 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]: entry_type="curated" ) - semantic_tokens = 0 + semantic_messages = [] + semantic_tokens_used = 0 + for result in semantic_results: payload = result.get("payload", {}) text = payload.get("text", "") - if text and semantic_tokens < config.semantic_token_budget: - messages.append({"role": "user", "content": text}) # Add as context - semantic_tokens += count_tokens(text) + if text: + # Parse curated turn into proper user/assistant messages + parsed = parse_curated_turn(text) + for msg in parsed: + msg_tokens = count_tokens(msg.get("content", "")) + if semantic_tokens_used + msg_tokens <= token_budget["semantic"]: + semantic_messages.append(msg) + semantic_tokens_used += msg_tokens + else: + break + if semantic_tokens_used >= token_budget["semantic"]: + break + + # Add parsed messages to context + for msg in semantic_messages: + messages.append(msg) + + if semantic_messages: + logger.info(f"Layer 2 (semantic): {len(semantic_messages)} messages, ~{semantic_tokens_used} tokens") # === LAYER 3: Context (recent turns) === - recent_turns = await qdrant.get_recent_turns(limit=20) + recent_turns = await qdrant.get_recent_turns(limit=50) - context_tokens = 0 + context_messages = [] + context_tokens_used = 0 + + # Process oldest first for chronological order for turn in reversed(recent_turns): payload = turn.get("payload", {}) text = payload.get("text", "") - if text and context_tokens < config.context_token_budget: - messages.append({"role": "user", "content": text}) # Add as context - context_tokens += count_tokens(text) + entry_type = payload.get("type", "raw") + + if text: + # Parse turn into messages + parsed = parse_curated_turn(text) + + for msg in parsed: + msg_tokens = count_tokens(msg.get("content", "")) + if context_tokens_used + msg_tokens <= token_budget["context"]: + context_messages.append(msg) + context_tokens_used += msg_tokens + else: + break + + if context_tokens_used >= token_budget["context"]: + break - # === LAYER 4: Current messages (passed through) === + # Add context messages (oldest first maintains conversation order) + for msg in context_messages: + messages.append(msg) + + if context_messages: + logger.info(f"Layer 3 (context): {len(context_messages)} messages, ~{context_tokens_used} tokens") + + # === LAYER 4: Current conversation === for msg in incoming_messages: - if msg.get("role") != "system": # Do not duplicate system + if msg.get("role") != "system": # System already handled in Layer 1 messages.append(msg) - return messages \ No newline at end of file + logger.info(f"Layer 4 (current): {len([m for m in incoming_messages if m.get('role') != 'system'])} messages") + + return messages