- Add portalocker file locking to debug_log() to prevent interleaved entries - Add exc_info=True to curator _call_llm error logging for stack traces - Add debug log message on JSON parse fallback in _parse_json_response - Warn when cloud is enabled but API key env var is not set Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
218 lines
7.4 KiB
Python
218 lines
7.4 KiB
Python
# app/proxy_handler.py
|
|
from fastapi import Request
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
import httpx
|
|
import json
|
|
import re
|
|
import logging
|
|
import os
|
|
import portalocker
|
|
from pathlib import Path
|
|
from .config import config
|
|
from .singleton import get_qdrant_service
|
|
from .utils import count_tokens, build_augmented_messages
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Debug log directory (configurable via environment)
|
|
# Logs are written to VERA_LOG_DIR or /app/logs by default
|
|
DEBUG_LOG_DIR = Path(os.environ.get("VERA_LOG_DIR", "/app/logs"))
|
|
|
|
|
|
def clean_message_content(content: str) -> str:
|
|
"""Strip [Memory context] wrapper and extract actual user message."""
|
|
if not content:
|
|
return content
|
|
|
|
# Check for memory context wrapper
|
|
wrapper_match = re.search(
|
|
r'\[Memory context\].*?- user_msg:\s*(.+?)(?:\n\n|\Z)',
|
|
content, re.DOTALL
|
|
)
|
|
if wrapper_match:
|
|
return wrapper_match.group(1).strip()
|
|
|
|
# Also strip timestamp prefixes if present
|
|
ts_match = re.match(r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*', content)
|
|
if ts_match:
|
|
return content[ts_match.end():].strip()
|
|
|
|
return content
|
|
|
|
|
|
def debug_log(category: str, message: str, data: dict = None):
|
|
"""Append a debug entry to the daily debug log if debug mode is enabled.
|
|
|
|
Logs are written to VERA_LOG_DIR/debug_YYYY-MM-DD.log
|
|
This ensures logs are persisted and easily accessible.
|
|
"""
|
|
if not config.debug:
|
|
return
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
# Create logs directory
|
|
log_dir = DEBUG_LOG_DIR
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
log_path = log_dir / f"debug_{today}.log"
|
|
|
|
entry = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
|
"category": category,
|
|
"message": message
|
|
}
|
|
if data:
|
|
entry["data"] = data
|
|
|
|
with open(log_path, "a") as f:
|
|
portalocker.lock(f, portalocker.LOCK_EX)
|
|
f.write(json.dumps(entry) + "\n")
|
|
portalocker.unlock(f)
|
|
|
|
|
|
async def handle_chat_non_streaming(body: dict):
|
|
"""Handle non-streaming chat request."""
|
|
incoming_messages = body.get("messages", [])
|
|
model = body.get("model", "")
|
|
|
|
debug_log("INPUT", "Non-streaming chat request", {"messages": incoming_messages})
|
|
|
|
# Clean messages
|
|
cleaned_messages = []
|
|
for msg in incoming_messages:
|
|
if msg.get("role") == "user":
|
|
cleaned_content = clean_message_content(msg.get("content", ""))
|
|
cleaned_messages.append({"role": "user", "content": cleaned_content})
|
|
else:
|
|
cleaned_messages.append(msg)
|
|
|
|
# Build augmented messages
|
|
augmented_messages = await build_augmented_messages(cleaned_messages)
|
|
|
|
debug_log("THOUGHT", "Built augmented messages", {"augmented_count": len(augmented_messages)})
|
|
|
|
# Forward to Ollama
|
|
forwarded_body = body.copy()
|
|
forwarded_body["messages"] = augmented_messages
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
resp = await client.post(f"{config.ollama_host}/api/chat", json=forwarded_body)
|
|
result = resp.json()
|
|
|
|
debug_log("OUTPUT", "LLM non-streaming response", {"response": result})
|
|
|
|
# Store the Q&A turn
|
|
user_question = ""
|
|
for msg in reversed(incoming_messages):
|
|
if msg.get("role") == "user":
|
|
user_question = clean_message_content(msg.get("content", ""))
|
|
break
|
|
|
|
assistant_answer = result.get("message", {}).get("content", "")
|
|
|
|
if user_question and assistant_answer:
|
|
qdrant_service = get_qdrant_service()
|
|
try:
|
|
result_id = await qdrant_service.store_qa_turn(user_question, assistant_answer)
|
|
debug_log("STORAGE", "Non-streaming Q&A stored", {"question": user_question, "answer": assistant_answer})
|
|
except Exception as e:
|
|
logger.error(f"[STORE] FAILED: {e}")
|
|
|
|
result["model"] = model
|
|
return JSONResponse(content=result)
|
|
|
|
|
|
async def handle_chat(request: Request):
|
|
"""Handle streaming chat request."""
|
|
body = await request.json()
|
|
incoming_messages = body.get("messages", [])
|
|
model = body.get("model", "")
|
|
|
|
debug_log("INPUT", "Streaming chat request", {"messages": incoming_messages})
|
|
|
|
# Clean messages
|
|
cleaned_messages = []
|
|
for msg in incoming_messages:
|
|
if msg.get("role") == "user":
|
|
cleaned_content = clean_message_content(msg.get("content", ""))
|
|
cleaned_messages.append({"role": "user", "content": cleaned_content})
|
|
else:
|
|
cleaned_messages.append(msg)
|
|
|
|
# Build augmented messages
|
|
augmented_messages = await build_augmented_messages(cleaned_messages)
|
|
|
|
debug_log("THOUGHT", "Built augmented messages for streaming", {
|
|
"original_count": len(incoming_messages),
|
|
"augmented_count": len(augmented_messages)
|
|
})
|
|
|
|
# Forward to Ollama with streaming
|
|
forwarded_body = body.copy()
|
|
forwarded_body["messages"] = augmented_messages
|
|
|
|
headers = dict(request.headers)
|
|
headers.pop("content-length", None)
|
|
headers.pop("Content-Length", None)
|
|
headers.pop("content-type", None)
|
|
headers.pop("Content-Type", None)
|
|
|
|
async def stream_response():
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
resp = await client.post(
|
|
f"{config.ollama_host}/api/chat",
|
|
json=forwarded_body,
|
|
headers=headers
|
|
)
|
|
|
|
full_content = ""
|
|
async for chunk in resp.aiter_bytes():
|
|
yield chunk
|
|
|
|
for line in chunk.decode().strip().splitlines():
|
|
try:
|
|
data = json.loads(line)
|
|
if "message" in data and "content" in data["message"]:
|
|
full_content += data["message"]["content"]
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
debug_log("OUTPUT", "LLM streaming response complete", {
|
|
"content_length": len(full_content)
|
|
})
|
|
|
|
# Store Q&A turn
|
|
user_question = ""
|
|
for msg in reversed(incoming_messages):
|
|
if msg.get("role") == "user":
|
|
user_question = clean_message_content(msg.get("content", ""))
|
|
break
|
|
|
|
if user_question and full_content:
|
|
qdrant_service = get_qdrant_service()
|
|
try:
|
|
result_id = await qdrant_service.store_qa_turn(user_question, full_content)
|
|
logger.info(f"[STORE] Success! ID={result_id[:8]}, Q={len(user_question)} chars")
|
|
except Exception as e:
|
|
logger.error(f"[STORE] FAILED: {type(e).__name__}: {e}")
|
|
|
|
return StreamingResponse(stream_response(), media_type="application/x-ndjson")
|
|
|
|
|
|
async def forward_to_ollama(request: Request, path: str):
|
|
"""Forward request to Ollama transparently."""
|
|
body = await request.body()
|
|
headers = dict(request.headers)
|
|
headers.pop("content-length", None)
|
|
headers.pop("Content-Length", None)
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
resp = await client.request(
|
|
method=request.method,
|
|
url=f"{config.ollama_host}{path}",
|
|
content=body,
|
|
headers=headers
|
|
)
|
|
return resp |