docs: simplify README, update validation and curator docs

This commit is contained in:
root
2026-03-10 12:08:53 -05:00
parent 08aaddb4d0
commit 62953e9f39
6 changed files with 261 additions and 813 deletions

View File

@@ -1,7 +1,7 @@
{
"timer_minutes": 5,
"max_batch_size": 100,
"user_id": "rob",
"user_id": "<USER_ID>",
"source_collection": "memories_tr",
"target_collection": "gems_tr"
}

View File

@@ -1,144 +1,102 @@
#!/usr/bin/env python3
"""
TrueRecall Timer Curator: Runs every 30 minutes via cron.
TrueRecall v2 - Timer Curator
Runs every 5 minutes via cron
Extracts gems from uncurated memories and stores them in gems_tr
- Queries all uncurated memories from memories_tr
- Sends batch to qwen3 for gem extraction
- Stores gems to gems_tr
- Marks processed memories as curated=true
Usage:
python3 curator_timer.py --config curator_config.json
python3 curator_timer.py --config curator_config.json --dry-run
REQUIRES: TrueRecall v1 (provides memories_tr via watcher)
"""
import os
import sys
import json
import argparse
import hashlib
import requests
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Any, Optional
import hashlib
# Load config
def load_config(config_path: str) -> Dict[str, Any]:
with open(config_path, 'r') as f:
return json.load(f)
# Default paths
SCRIPT_DIR = Path(__file__).parent
DEFAULT_CONFIG = SCRIPT_DIR / "curator_config.json"
# Curator prompt path
CURATOR_PROMPT_PATH = Path("~/.openclaw/workspace/.local_projects/true-recall-v2/curator-prompt.md")
# Configuration - EDIT THESE for your environment
QDRANT_URL = "http://<QDRANT_IP>:6333"
OLLAMA_URL = "http://<OLLAMA_IP>:11434"
SOURCE_COLLECTION = "memories_tr"
TARGET_COLLECTION = "gems_tr"
EMBEDDING_MODEL = "snowflake-arctic-embed2"
MAX_BATCH = 100
USER_ID = "<USER_ID>"
def load_curator_prompt() -> str:
"""Load the curator system prompt."""
def get_uncurated_memories(qdrant_url: str, collection: str, user_id: str, max_batch: int = 100) -> List[Dict[str, Any]]:
"""Fetch uncurated memories from Qdrant."""
try:
with open(CURATOR_PROMPT_PATH, 'r') as f:
return f.read()
except FileNotFoundError:
print(f"⚠️ Curator prompt not found at {CURATOR_PROMPT_PATH}")
return """You are The Curator. Extract meaningful gems from conversation history.
Extract facts, insights, decisions, preferences, and context that would be valuable to remember.
Output a JSON array of gems with fields: gem, context, snippet, categories, importance (1-5), confidence (0-0.99)."""
def get_uncurated_memories(qdrant_url: str, collection: str, user_id: str, max_batch: int) -> List[Dict[str, Any]]:
"""Query Qdrant for uncurated memories."""
filter_data = {
"must": [
{"key": "user_id", "match": {"value": user_id}},
{"key": "curated", "match": {"value": False}}
]
}
all_points = []
offset = None
iterations = 0
max_iterations = 10
while len(all_points) < max_batch and iterations < max_iterations:
iterations += 1
scroll_data = {
"limit": min(100, max_batch - len(all_points)),
"with_payload": True,
"filter": filter_data
}
if offset:
scroll_data["offset"] = offset
try:
response = requests.post(
f"{qdrant_url}/collections/{collection}/points/scroll",
json=scroll_data,
headers={"Content-Type": "application/json"},
timeout=30
)
response.raise_for_status()
result = response.json()
points = result.get("result", {}).get("points", [])
if not points:
break
all_points.extend(points)
offset = result.get("result", {}).get("next_page_offset")
if not offset:
break
except Exception as e:
print(f"Error querying Qdrant: {e}", file=sys.stderr)
break
# Convert to simple dicts
memories = []
for point in all_points:
payload = point.get("payload", {})
memories.append({
"id": point.get("id"),
"content": payload.get("content", ""),
"role": payload.get("role", ""),
"timestamp": payload.get("timestamp", ""),
"turn": payload.get("turn", 0),
**payload
})
return memories[:max_batch]
response = requests.post(
f"{qdrant_url}/collections/{collection}/points/scroll",
json={
"limit": max_batch,
"filter": {
"must": [
{"key": "user_id", "match": {"value": user_id}},
{"key": "curated", "match": {"value": False}}
]
},
"with_payload": True
},
timeout=30
)
response.raise_for_status()
data = response.json()
return data.get("result", {}).get("points", [])
except Exception as e:
print(f"Error fetching memories: {e}", file=sys.stderr)
return []
def extract_gems(memories: List[Dict[str, Any]], ollama_url: str) -> List[Dict[str, Any]]:
"""Send memories to qwen3 for gem extraction."""
"""Send memories to LLM for gem extraction."""
if not memories:
return []
# Build conversation from memories (support both 'text' and 'content' fields)
SKIP_PATTERNS = [
"gems extracted", "curator", "curation complete",
"system is running", "validation round",
]
conversation_lines = []
for i, mem in enumerate(memories):
# Support both migrated memories (text) and watcher memories (content)
text = mem.get("text", "") or mem.get("content", "")
if text:
# Truncate very long texts
text = text[:500] if len(text) > 500 else text
conversation_lines.append(f"[{i+1}] {text}")
payload = mem.get("payload", {})
text = payload.get("text", "") or payload.get("content", "")
role = payload.get("role", "")
if not text:
continue
text = str(text)
if role == "assistant":
continue
text_lower = text.lower()
if len(text) < 20:
continue
if any(pattern in text_lower for pattern in SKIP_PATTERNS):
continue
text = text[:500] if len(text) > 500 else text
conversation_lines.append(f"[{i+1}] {text}")
if not conversation_lines:
return []
conversation_text = "\n\n".join(conversation_lines)
# Simple extraction prompt
prompt = """You are a memory curator. Extract atomic facts from the conversation below.
For each distinct fact/decision/preference, output a JSON object with:
- "text": the atomic fact (1-2 sentences)
- "text": the atomic fact (1-2 sentences) - use FIRST PERSON ("I" not "User")
- "category": one of [decision, preference, technical, project, knowledge, system]
- "importance": "high" or "medium"
Return ONLY a JSON array. Example:
[
{"text": "User decided to use Redis for caching", "category": "decision", "importance": "high"},
{"text": "User prefers dark mode", "category": "preference", "importance": "medium"}
{"text": "I decided to use Redis for caching", "category": "decision", "importance": "high"},
{"text": "I prefer dark mode", "category": "preference", "importance": "medium"}
]
If no extractable facts, return [].
@@ -152,7 +110,7 @@ CONVERSATION:
response = requests.post(
f"{ollama_url}/api/generate",
json={
"model": "qwen3:30b-a3b-instruct-2507-q8_0",
"model": "<CURATOR_MODEL>",
"system": prompt,
"prompt": full_prompt,
"stream": False,
@@ -169,28 +127,20 @@ CONVERSATION:
return []
result = response.json()
output = result.get('response', '').strip()
# Extract JSON from output
if '```json' in output:
output = output.split('```json')[1].split('```')[0].strip()
elif '```' in output:
output = output.split('```')[1].split('```')[0].strip()
response_text = result.get("response", "")
try:
# Find JSON array in output
start_idx = output.find('[')
end_idx = output.rfind(']')
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
output = output[start_idx:end_idx+1]
gems = json.loads(output)
start = response_text.find('[')
end = response_text.rfind(']')
if start == -1 or end == -1:
return []
json_str = response_text[start:end+1]
gems = json.loads(json_str)
if not isinstance(gems, list):
gems = [gems] if gems else []
return []
return gems
except json.JSONDecodeError as e:
print(f"Error parsing curator output: {e}", file=sys.stderr)
print(f"Raw output: {repr(output[:500])}...", file=sys.stderr)
print(f"JSON parse error: {e}", file=sys.stderr)
return []
@@ -199,50 +149,35 @@ def get_embedding(text: str, ollama_url: str) -> Optional[List[float]]:
try:
response = requests.post(
f"{ollama_url}/api/embeddings",
json={"model": "snowflake-arctic-embed2", "prompt": text},
json={
"model": EMBEDDING_MODEL,
"prompt": text
},
timeout=30
)
response.raise_for_status()
return response.json()['embedding']
data = response.json()
return data.get("embedding")
except Exception as e:
print(f"Error getting embedding: {e}", file=sys.stderr)
return None
def store_gem(gem: Dict[str, Any], user_id: str, qdrant_url: str, target_collection: str, ollama_url: str) -> bool:
"""Store a single gem to Qdrant."""
# Support both old format (gem, context, snippet) and new format (text, category, importance)
embedding_text = gem.get('text', '') or gem.get('gem', '')
if not embedding_text:
embedding_text = f"{gem.get('gem', '')} {gem.get('context', '')} {gem.get('snippet', '')}".strip()
def store_gem(gem: Dict[str, Any], vector: List[float], qdrant_url: str, target_collection: str, user_id: str) -> bool:
"""Store a gem in Qdrant."""
embedding_text = gem.get("text", "") or gem.get("gem", "")
if not embedding_text:
print(f"⚠️ Empty embedding text for gem, skipping", file=sys.stderr)
return False
vector = get_embedding(embedding_text, ollama_url)
if vector is None:
print(f"⚠️ Failed to get embedding for gem", file=sys.stderr)
return False
# Generate ID
hash_content = f"{user_id}:{gem.get('conversation_id', '')}:{gem.get('turn_range', '')}:{gem.get('gem', '')[:50]}"
hash_content = f"{user_id}:{embedding_text[:100]}"
hash_bytes = hashlib.sha256(hash_content.encode()).digest()[:8]
gem_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63)
# Normalize gem fields - ensure we have text field
payload = {
"text": embedding_text,
"category": gem.get("category", "fact"),
"importance": gem.get("importance", "medium"),
"user_id": user_id,
"text": gem.get('text', gem.get('gem', '')),
"category": gem.get('category', 'general'),
"importance": gem.get('importance', 'medium'),
"curated_at": datetime.now(timezone.utc).isoformat()
"created_at": datetime.now(timezone.utc).isoformat()
}
# Preserve any other fields from gem
for key in ['context', 'snippet', 'confidence', 'conversation_id', 'turn_range']:
if key in gem:
payload[key] = gem[key]
try:
response = requests.put(
@@ -264,7 +199,7 @@ def store_gem(gem: Dict[str, Any], user_id: str, qdrant_url: str, target_collect
def mark_curated(memory_ids: List, qdrant_url: str, collection: str) -> bool:
"""Mark memories as curated in Qdrant using POST /points/payload format."""
"""Mark memories as curated."""
if not memory_ids:
return True
@@ -288,79 +223,58 @@ def mark_curated(memory_ids: List, qdrant_url: str, collection: str) -> bool:
def main():
parser = argparse.ArgumentParser(description="TrueRecall Timer Curator")
parser.add_argument("--config", "-c", default=str(DEFAULT_CONFIG), help="Config file path")
parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write, just preview")
args = parser.parse_args()
print("TrueRecall v2 - Timer Curator")
print(f"User: {USER_ID}")
print(f"Source: {SOURCE_COLLECTION}")
print(f"Target: {TARGET_COLLECTION}")
print(f"Max batch: {MAX_BATCH}\n")
config = load_config(args.config)
qdrant_url = os.getenv("QDRANT_URL", "http://<QDRANT_IP>:6333")
ollama_url = os.getenv("OLLAMA_URL", "http://<OLLAMA_IP>:11434")
user_id = config.get("user_id", "rob")
source_collection = config.get("source_collection", "memories_tr")
target_collection = config.get("target_collection", "gems_tr")
max_batch = config.get("max_batch_size", 100)
print(f"🔍 TrueRecall Timer Curator")
print(f"👤 User: {user_id}")
print(f"📥 Source: {source_collection}")
print(f"💎 Target: {target_collection}")
print(f"📦 Max batch: {max_batch}")
if args.dry_run:
print("🏃 DRY RUN MODE")
print()
# Get uncurated memories
print("📥 Fetching uncurated memories...")
memories = get_uncurated_memories(qdrant_url, source_collection, user_id, max_batch)
print(f"✅ Found {len(memories)} uncurated memories")
print("Fetching uncurated memories...")
memories = get_uncurated_memories(QDRANT_URL, SOURCE_COLLECTION, USER_ID, MAX_BATCH)
print(f"Found {len(memories)} uncurated memories\n")
if not memories:
print("🤷 Nothing to curate. Exiting.")
print("Nothing to curate. Exiting.")
return
# Extract gems
print(f"\n🧠 Sending {len(memories)} memories to curator...")
gems = extract_gems(memories, ollama_url)
print(f"✅ Extracted {len(gems)} gems")
print("Sending memories to curator...")
gems = extract_gems(memories, OLLAMA_URL)
print(f"Extracted {len(gems)} gems\n")
if not gems:
print("⚠️ No gems extracted. Nothing to store.")
# Still mark as curated so we don't reprocess
memory_ids = [m["id"] for m in memories] # Keep as integers
mark_curated(memory_ids, qdrant_url, source_collection)
print("No gems extracted. Exiting.")
return
# Preview
print("\n💎 Gems preview:")
print("Gems preview:")
for i, gem in enumerate(gems[:3], 1):
print(f" {i}. {gem.get('gem', 'N/A')[:80]}...")
text = gem.get("text", "N/A")[:50]
print(f" {i}. {text}...")
if len(gems) > 3:
print(f" ... and {len(gems) - 3} more")
print()
if args.dry_run:
print("\n🏃 DRY RUN: Not storing gems or marking curated.")
return
# Store gems
print(f"\n💾 Storing {len(gems)} gems...")
print("Storing gems...")
stored = 0
for gem in gems:
if store_gem(gem, user_id, qdrant_url, target_collection, ollama_url):
stored += 1
print(f"✅ Stored: {stored}/{len(gems)}")
text = gem.get("text", "") or gem.get("gem", "")
if not text:
continue
vector = get_embedding(text, OLLAMA_URL)
if vector:
if store_gem(gem, vector, QDRANT_URL, TARGET_COLLECTION, USER_ID):
stored += 1
# Mark memories as curated
print("\n📝 Marking memories as curated...")
memory_ids = [m["id"] for m in memories] # Keep as integers
if mark_curated(memory_ids, qdrant_url, source_collection):
print(f"✅ Marked {len(memory_ids)} memories as curated")
print(f"Stored: {stored}/{len(gems)}\n")
print("Marking memories as curated...")
memory_ids = [mem.get("id") for mem in memories if mem.get("id")]
if mark_curated(memory_ids, QDRANT_URL, SOURCE_COLLECTION):
print(f"Marked {len(memory_ids)} memories as curated\n")
else:
print(f"⚠️ Failed to mark some memories as curated")
print("Failed to mark memories\n")
print("\n🎉 Curation complete!")
print("Curation complete!")
if __name__ == "__main__":