"""Tests for utility functions.""" import pytest from unittest.mock import AsyncMock, MagicMock, patch from app.utils import count_tokens, truncate_by_tokens, parse_curated_turn, build_augmented_messages, count_messages_tokens class TestCountTokens: """Tests for count_tokens function.""" def test_empty_string(self): """Empty string should return 0 tokens.""" assert count_tokens("") == 0 def test_simple_text(self): """Simple text should count tokens correctly.""" text = "Hello, world!" assert count_tokens(text) > 0 def test_longer_text(self): """Longer text should have more tokens.""" short = "Hello" long = "Hello, this is a longer sentence with more words." assert count_tokens(long) > count_tokens(short) class TestTruncateByTokens: """Tests for truncate_by_tokens function.""" def test_no_truncation_needed(self): """Text shorter than limit should not be truncated.""" text = "Short text" result = truncate_by_tokens(text, max_tokens=100) assert result == text def test_truncation_applied(self): """Text longer than limit should be truncated.""" text = "This is a longer piece of text that will need to be truncated" result = truncate_by_tokens(text, max_tokens=5) assert count_tokens(result) <= 5 def test_empty_string(self): """Empty string should return empty string.""" assert truncate_by_tokens("", max_tokens=10) == "" class TestParseCuratedTurn: """Tests for parse_curated_turn function.""" def test_empty_string(self): """Empty string should return empty list.""" assert parse_curated_turn("") == [] def test_single_turn(self): """Single Q&A turn should parse correctly.""" text = "User: What is Python?\nAssistant: A programming language." result = parse_curated_turn(text) assert len(result) == 2 assert result[0]["role"] == "user" assert result[0]["content"] == "What is Python?" assert result[1]["role"] == "assistant" assert result[1]["content"] == "A programming language." def test_multiple_turns(self): """Multiple Q&A turns should parse correctly.""" text = """User: What is Python? Assistant: A programming language. User: Is it popular? Assistant: Yes, very popular.""" result = parse_curated_turn(text) assert len(result) == 4 def test_timestamp_ignored(self): """Timestamp lines should be ignored.""" text = "User: Question?\nAssistant: Answer.\nTimestamp: 2024-01-01T00:00:00Z" result = parse_curated_turn(text) assert len(result) == 2 for msg in result: assert "Timestamp" not in msg["content"] def test_multiline_content(self): """Multiline content should be preserved.""" text = "User: Line 1\nLine 2\nLine 3\nAssistant: Response" result = parse_curated_turn(text) assert "Line 1" in result[0]["content"] assert "Line 2" in result[0]["content"] assert "Line 3" in result[0]["content"] class TestCountMessagesTokens: """Tests for count_messages_tokens function.""" def test_empty_list(self): """Empty message list returns 0.""" assert count_messages_tokens([]) == 0 def test_single_message(self): """Single message counts tokens of its content.""" msgs = [{"role": "user", "content": "Hello world"}] result = count_messages_tokens(msgs) assert result > 0 def test_multiple_messages(self): """Multiple messages sum up their token counts.""" msgs = [ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there, how can I help you today?"}, ] result = count_messages_tokens(msgs) assert result > count_messages_tokens([msgs[0]]) def test_message_without_content(self): """Message without content field contributes 0 tokens.""" msgs = [{"role": "system"}] assert count_messages_tokens(msgs) == 0 class TestLoadSystemPrompt: """Tests for load_system_prompt function.""" def test_loads_from_prompts_dir(self, tmp_path): """Loads systemprompt.md from PROMPTS_DIR.""" import app.utils as utils_module prompts_dir = tmp_path / "prompts" prompts_dir.mkdir() (prompts_dir / "systemprompt.md").write_text("You are Vera.") with patch.object(utils_module, "PROMPTS_DIR", prompts_dir): result = utils_module.load_system_prompt() assert result == "You are Vera." def test_falls_back_to_static_dir(self, tmp_path): """Falls back to STATIC_DIR when PROMPTS_DIR has no file.""" import app.utils as utils_module prompts_dir = tmp_path / "no_prompts" # does not exist static_dir = tmp_path / "static" static_dir.mkdir() (static_dir / "systemprompt.md").write_text("Static Vera.") with patch.object(utils_module, "PROMPTS_DIR", prompts_dir), \ patch.object(utils_module, "STATIC_DIR", static_dir): result = utils_module.load_system_prompt() assert result == "Static Vera." def test_returns_empty_when_not_found(self, tmp_path): """Returns empty string when systemprompt.md not found anywhere.""" import app.utils as utils_module with patch.object(utils_module, "PROMPTS_DIR", tmp_path / "nope"), \ patch.object(utils_module, "STATIC_DIR", tmp_path / "also_nope"): result = utils_module.load_system_prompt() assert result == "" class TestFilterMemoriesByTime: """Tests for filter_memories_by_time function.""" def test_includes_recent_memory(self): """Memory with timestamp in the last 24h should be included.""" from datetime import datetime, timedelta, timezone from app.utils import filter_memories_by_time ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1)).isoformat() memories = [{"timestamp": ts, "text": "recent"}] result = filter_memories_by_time(memories, hours=24) assert len(result) == 1 def test_excludes_old_memory(self): """Memory older than cutoff should be excluded.""" from datetime import datetime, timedelta, timezone from app.utils import filter_memories_by_time ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=48)).isoformat() memories = [{"timestamp": ts, "text": "old"}] result = filter_memories_by_time(memories, hours=24) assert len(result) == 0 def test_includes_memory_without_timestamp(self): """Memory with no timestamp should always be included.""" from app.utils import filter_memories_by_time memories = [{"text": "no ts"}] result = filter_memories_by_time(memories, hours=24) assert len(result) == 1 def test_includes_memory_with_bad_timestamp(self): """Memory with unparseable timestamp should be included (safe default).""" from app.utils import filter_memories_by_time memories = [{"timestamp": "not-a-date", "text": "bad ts"}] result = filter_memories_by_time(memories, hours=24) assert len(result) == 1 def test_z_suffix_old_timestamp_excluded(self): """Regression: chained .replace() was not properly handling Z suffix on old timestamps.""" from datetime import datetime, timedelta, timezone from app.utils import filter_memories_by_time old_ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=48)).isoformat() + "Z" memories = [{"timestamp": old_ts, "text": "old with Z"}] result = filter_memories_by_time(memories, hours=24) assert len(result) == 0, f"Old Z-suffixed timestamp should be excluded but wasn't: {old_ts}" def test_empty_list(self): """Empty input returns empty list.""" from app.utils import filter_memories_by_time assert filter_memories_by_time([], hours=24) == [] def test_z_suffix_timestamp(self): """ISO timestamp with Z suffix should be handled correctly.""" from datetime import datetime, timedelta, timezone from app.utils import filter_memories_by_time ts = (datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1)).isoformat() + "Z" memories = [{"timestamp": ts, "text": "recent with Z"}] result = filter_memories_by_time(memories, hours=24) assert len(result) == 1 class TestMergeMemories: """Tests for merge_memories function.""" def test_empty_list(self): """Empty list returns empty text and ids.""" from app.utils import merge_memories result = merge_memories([]) assert result == {"text": "", "ids": []} def test_single_memory_with_text(self): """Single memory with text field is merged.""" from app.utils import merge_memories memories = [{"id": "abc", "text": "hello world", "role": ""}] result = merge_memories(memories) assert "hello world" in result["text"] assert "abc" in result["ids"] def test_memory_with_content_field(self): """Memory using content field (no text) is merged.""" from app.utils import merge_memories memories = [{"id": "xyz", "content": "from content field"}] result = merge_memories(memories) assert "from content field" in result["text"] def test_role_included_in_output(self): """Role prefix should appear in merged text when role is set.""" from app.utils import merge_memories memories = [{"id": "1", "text": "question", "role": "user"}] result = merge_memories(memories) assert "[user]:" in result["text"] def test_multiple_memories_joined(self): """Multiple memories are joined with double newline.""" from app.utils import merge_memories memories = [ {"id": "1", "text": "first"}, {"id": "2", "text": "second"}, ] result = merge_memories(memories) assert "first" in result["text"] assert "second" in result["text"] assert len(result["ids"]) == 2 class TestBuildAugmentedMessages: """Tests for build_augmented_messages function (mocked I/O).""" def _make_qdrant_mock(self): """Return an AsyncMock QdrantService.""" from unittest.mock import AsyncMock, MagicMock mock_qdrant = MagicMock() mock_qdrant.semantic_search = AsyncMock(return_value=[]) mock_qdrant.get_recent_turns = AsyncMock(return_value=[]) return mock_qdrant def test_system_layer_prepended(self, monkeypatch, tmp_path): """System prompt from file should be prepended to messages.""" import asyncio from unittest.mock import patch import app.utils as utils_module # Write a temp system prompt prompt_file = tmp_path / "systemprompt.md" prompt_file.write_text("You are Vera.") mock_qdrant = self._make_qdrant_mock() with patch.object(utils_module, "load_system_prompt", return_value="You are Vera."), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): result = asyncio.get_event_loop().run_until_complete( utils_module.build_augmented_messages( [{"role": "user", "content": "Hello"}] ) ) system_msgs = [m for m in result if m["role"] == "system"] assert len(system_msgs) == 1 assert "You are Vera." in system_msgs[0]["content"] def test_incoming_user_message_preserved(self, monkeypatch): """Incoming user message should appear in output.""" import asyncio from unittest.mock import patch import app.utils as utils_module mock_qdrant = self._make_qdrant_mock() with patch.object(utils_module, "load_system_prompt", return_value=""), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): result = asyncio.get_event_loop().run_until_complete( utils_module.build_augmented_messages( [{"role": "user", "content": "What is 2+2?"}] ) ) user_msgs = [m for m in result if m.get("role") == "user"] assert any("2+2" in m["content"] for m in user_msgs) def test_no_system_message_when_no_prompt(self, monkeypatch): """No system message added when both incoming and file prompt are empty.""" import asyncio from unittest.mock import patch import app.utils as utils_module mock_qdrant = self._make_qdrant_mock() with patch.object(utils_module, "load_system_prompt", return_value=""), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): result = asyncio.get_event_loop().run_until_complete( utils_module.build_augmented_messages( [{"role": "user", "content": "Hi"}] ) ) system_msgs = [m for m in result if m.get("role") == "system"] assert len(system_msgs) == 0 def test_semantic_results_injected(self, monkeypatch): """Curated memories from semantic search should appear in output.""" import asyncio from unittest.mock import patch, AsyncMock, MagicMock import app.utils as utils_module mock_qdrant = MagicMock() mock_qdrant.semantic_search = AsyncMock(return_value=[ {"payload": {"text": "User: Old question?\nAssistant: Old answer."}} ]) mock_qdrant.get_recent_turns = AsyncMock(return_value=[]) with patch.object(utils_module, "load_system_prompt", return_value=""), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): result = asyncio.get_event_loop().run_until_complete( utils_module.build_augmented_messages( [{"role": "user", "content": "Tell me"}] ) ) contents = [m["content"] for m in result] assert any("Old question" in c or "Old answer" in c for c in contents) @pytest.mark.asyncio async def test_system_prompt_appends_to_caller_system(self): """systemprompt.md content appends to caller's system message.""" import app.utils as utils_module mock_qdrant = self._make_qdrant_mock() with patch.object(utils_module, "load_system_prompt", return_value="Vera memory context"), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): incoming = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Hello"} ] result = await build_augmented_messages(incoming) system_msg = result[0] assert system_msg["role"] == "system" assert system_msg["content"] == "You are a helpful assistant.\n\nVera memory context" @pytest.mark.asyncio async def test_empty_system_prompt_passthrough(self): """When systemprompt.md is empty, only caller's system message passes through.""" import app.utils as utils_module mock_qdrant = self._make_qdrant_mock() with patch.object(utils_module, "load_system_prompt", return_value=""), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): incoming = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Hello"} ] result = await build_augmented_messages(incoming) system_msg = result[0] assert system_msg["content"] == "You are a helpful assistant." @pytest.mark.asyncio async def test_no_caller_system_with_vera_prompt(self): """When caller sends no system message but systemprompt.md exists, use vera prompt.""" import app.utils as utils_module mock_qdrant = self._make_qdrant_mock() with patch.object(utils_module, "load_system_prompt", return_value="Vera memory context"), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): incoming = [{"role": "user", "content": "Hello"}] result = await build_augmented_messages(incoming) system_msg = result[0] assert system_msg["role"] == "system" assert system_msg["content"] == "Vera memory context" @pytest.mark.asyncio async def test_no_system_anywhere(self): """When neither caller nor systemprompt.md provides system content, no system message.""" import app.utils as utils_module mock_qdrant = self._make_qdrant_mock() with patch.object(utils_module, "load_system_prompt", return_value=""), \ patch.object(utils_module, "get_qdrant_service", return_value=mock_qdrant): incoming = [{"role": "user", "content": "Hello"}] result = await build_augmented_messages(incoming) # First message should be user, not system assert result[0]["role"] == "user"