fix: parse curated turns into proper user/assistant roles
- Added parse_curated_turn() function to correctly parse stored memories - Fixed build_augmented_messages() to use proper message roles - Layer 2 (semantic) and Layer 3 (context) now correctly parse User: X / Assistant: Y format into separate messages - Resolves context corruption where turns were dumped as single user message v2.0.2
This commit is contained in:
136
app/utils.py
136
app/utils.py
@@ -2,7 +2,7 @@
|
|||||||
from .config import config
|
from .config import config
|
||||||
import tiktoken
|
import tiktoken
|
||||||
import os
|
import os
|
||||||
from typing import List, Dict
|
from typing import List, Dict, Optional
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -127,10 +127,70 @@ def load_system_prompt() -> str:
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def parse_curated_turn(text: str) -> List[Dict]:
|
||||||
|
"""Parse a curated turn into alternating user/assistant messages.
|
||||||
|
|
||||||
|
Input format:
|
||||||
|
User: [question]
|
||||||
|
Assistant: [answer]
|
||||||
|
Timestamp: ISO datetime
|
||||||
|
|
||||||
|
Returns list of message dicts with role and content.
|
||||||
|
Returns empty list if parsing fails.
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return []
|
||||||
|
|
||||||
|
messages = []
|
||||||
|
lines = text.strip().split("\n")
|
||||||
|
|
||||||
|
current_role = None
|
||||||
|
current_content = []
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith("User:"):
|
||||||
|
# Save previous content if exists
|
||||||
|
if current_role and current_content:
|
||||||
|
messages.append({
|
||||||
|
"role": current_role,
|
||||||
|
"content": "\n".join(current_content).strip()
|
||||||
|
})
|
||||||
|
current_role = "user"
|
||||||
|
current_content = [line[5:].strip()] # Remove "User:" prefix
|
||||||
|
elif line.startswith("Assistant:"):
|
||||||
|
# Save previous content if exists
|
||||||
|
if current_role and current_content:
|
||||||
|
messages.append({
|
||||||
|
"role": current_role,
|
||||||
|
"content": "\n".join(current_content).strip()
|
||||||
|
})
|
||||||
|
current_role = "assistant"
|
||||||
|
current_content = [line[10:].strip()] # Remove "Assistant:" prefix
|
||||||
|
elif line.startswith("Timestamp:"):
|
||||||
|
# Ignore timestamp line
|
||||||
|
continue
|
||||||
|
elif current_role:
|
||||||
|
# Continuation of current message
|
||||||
|
current_content.append(line)
|
||||||
|
|
||||||
|
# Save last message
|
||||||
|
if current_role and current_content:
|
||||||
|
messages.append({
|
||||||
|
"role": current_role,
|
||||||
|
"content": "\n".join(current_content).strip()
|
||||||
|
})
|
||||||
|
|
||||||
|
return messages
|
||||||
|
|
||||||
|
|
||||||
async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
|
async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
|
||||||
"""Build 4-layer augmented messages from incoming messages.
|
"""Build 4-layer augmented messages from incoming messages.
|
||||||
|
|
||||||
This is a standalone version that can be used by proxy_handler.py.
|
Layer 1: System prompt (preserved from incoming + vera context)
|
||||||
|
Layer 2: Semantic memories (curated, parsed into proper roles)
|
||||||
|
Layer 3: Recent context (raw turns, parsed into proper roles)
|
||||||
|
Layer 4: Current conversation (passed through)
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@@ -153,6 +213,10 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
|
|||||||
search_context += msg.get("content", "") + " "
|
search_context += msg.get("content", "") + " "
|
||||||
|
|
||||||
messages = []
|
messages = []
|
||||||
|
token_budget = {
|
||||||
|
"semantic": config.semantic_token_budget,
|
||||||
|
"context": config.context_token_budget
|
||||||
|
}
|
||||||
|
|
||||||
# === LAYER 1: System Prompt ===
|
# === LAYER 1: System Prompt ===
|
||||||
system_content = ""
|
system_content = ""
|
||||||
@@ -166,6 +230,7 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
|
|||||||
|
|
||||||
if system_content:
|
if system_content:
|
||||||
messages.append({"role": "system", "content": system_content})
|
messages.append({"role": "system", "content": system_content})
|
||||||
|
logger.info(f"Layer 1 (system): {count_tokens(system_content)} tokens")
|
||||||
|
|
||||||
# === LAYER 2: Semantic (curated memories) ===
|
# === LAYER 2: Semantic (curated memories) ===
|
||||||
qdrant = get_qdrant_service()
|
qdrant = get_qdrant_service()
|
||||||
@@ -176,28 +241,71 @@ async def build_augmented_messages(incoming_messages: List[Dict]) -> List[Dict]:
|
|||||||
entry_type="curated"
|
entry_type="curated"
|
||||||
)
|
)
|
||||||
|
|
||||||
semantic_tokens = 0
|
semantic_messages = []
|
||||||
|
semantic_tokens_used = 0
|
||||||
|
|
||||||
for result in semantic_results:
|
for result in semantic_results:
|
||||||
payload = result.get("payload", {})
|
payload = result.get("payload", {})
|
||||||
text = payload.get("text", "")
|
text = payload.get("text", "")
|
||||||
if text and semantic_tokens < config.semantic_token_budget:
|
if text:
|
||||||
messages.append({"role": "user", "content": text}) # Add as context
|
# Parse curated turn into proper user/assistant messages
|
||||||
semantic_tokens += count_tokens(text)
|
parsed = parse_curated_turn(text)
|
||||||
|
for msg in parsed:
|
||||||
|
msg_tokens = count_tokens(msg.get("content", ""))
|
||||||
|
if semantic_tokens_used + msg_tokens <= token_budget["semantic"]:
|
||||||
|
semantic_messages.append(msg)
|
||||||
|
semantic_tokens_used += msg_tokens
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
if semantic_tokens_used >= token_budget["semantic"]:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Add parsed messages to context
|
||||||
|
for msg in semantic_messages:
|
||||||
|
messages.append(msg)
|
||||||
|
|
||||||
|
if semantic_messages:
|
||||||
|
logger.info(f"Layer 2 (semantic): {len(semantic_messages)} messages, ~{semantic_tokens_used} tokens")
|
||||||
|
|
||||||
# === LAYER 3: Context (recent turns) ===
|
# === LAYER 3: Context (recent turns) ===
|
||||||
recent_turns = await qdrant.get_recent_turns(limit=20)
|
recent_turns = await qdrant.get_recent_turns(limit=50)
|
||||||
|
|
||||||
context_tokens = 0
|
context_messages = []
|
||||||
|
context_tokens_used = 0
|
||||||
|
|
||||||
|
# Process oldest first for chronological order
|
||||||
for turn in reversed(recent_turns):
|
for turn in reversed(recent_turns):
|
||||||
payload = turn.get("payload", {})
|
payload = turn.get("payload", {})
|
||||||
text = payload.get("text", "")
|
text = payload.get("text", "")
|
||||||
if text and context_tokens < config.context_token_budget:
|
entry_type = payload.get("type", "raw")
|
||||||
messages.append({"role": "user", "content": text}) # Add as context
|
|
||||||
context_tokens += count_tokens(text)
|
|
||||||
|
|
||||||
# === LAYER 4: Current messages (passed through) ===
|
if text:
|
||||||
for msg in incoming_messages:
|
# Parse turn into messages
|
||||||
if msg.get("role") != "system": # Do not duplicate system
|
parsed = parse_curated_turn(text)
|
||||||
|
|
||||||
|
for msg in parsed:
|
||||||
|
msg_tokens = count_tokens(msg.get("content", ""))
|
||||||
|
if context_tokens_used + msg_tokens <= token_budget["context"]:
|
||||||
|
context_messages.append(msg)
|
||||||
|
context_tokens_used += msg_tokens
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
if context_tokens_used >= token_budget["context"]:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Add context messages (oldest first maintains conversation order)
|
||||||
|
for msg in context_messages:
|
||||||
messages.append(msg)
|
messages.append(msg)
|
||||||
|
|
||||||
|
if context_messages:
|
||||||
|
logger.info(f"Layer 3 (context): {len(context_messages)} messages, ~{context_tokens_used} tokens")
|
||||||
|
|
||||||
|
# === LAYER 4: Current conversation ===
|
||||||
|
for msg in incoming_messages:
|
||||||
|
if msg.get("role") != "system": # System already handled in Layer 1
|
||||||
|
messages.append(msg)
|
||||||
|
|
||||||
|
logger.info(f"Layer 4 (current): {len([m for m in incoming_messages if m.get('role') != 'system'])} messages")
|
||||||
|
|
||||||
return messages
|
return messages
|
||||||
Reference in New Issue
Block a user