351 lines
12 KiB
Python
Executable File
351 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
TrueRecall Timer Curator: Runs every 30 minutes via cron.
|
|
|
|
- Queries all uncurated memories from memories_tr
|
|
- Sends batch to qwen3 for gem extraction
|
|
- Stores gems to gems_tr
|
|
- Marks processed memories as curated=true
|
|
|
|
Usage:
|
|
python3 curator_timer.py --config curator_config.json
|
|
python3 curator_timer.py --config curator_config.json --dry-run
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import requests
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
import hashlib
|
|
|
|
# Load config
|
|
def load_config(config_path: str) -> Dict[str, Any]:
|
|
with open(config_path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
# Default paths
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
DEFAULT_CONFIG = SCRIPT_DIR / "curator_config.json"
|
|
|
|
# Curator prompt path
|
|
CURATOR_PROMPT_PATH = Path("/root/.openclaw/workspace/.projects/true-recall-v2/curator-prompt.md")
|
|
|
|
|
|
def load_curator_prompt() -> str:
|
|
"""Load the curator system prompt."""
|
|
try:
|
|
with open(CURATOR_PROMPT_PATH, 'r') as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
print(f"⚠️ Curator prompt not found at {CURATOR_PROMPT_PATH}")
|
|
return """You are The Curator. Extract meaningful gems from conversation history.
|
|
Extract facts, insights, decisions, preferences, and context that would be valuable to remember.
|
|
Output a JSON array of gems with fields: gem, context, snippet, categories, importance (1-5), confidence (0-0.99)."""
|
|
|
|
|
|
def get_uncurated_memories(qdrant_url: str, collection: str, user_id: str, max_batch: int) -> List[Dict[str, Any]]:
|
|
"""Query Qdrant for uncurated memories."""
|
|
filter_data = {
|
|
"must": [
|
|
{"key": "user_id", "match": {"value": user_id}},
|
|
{"key": "curated", "match": {"value": False}}
|
|
]
|
|
}
|
|
|
|
all_points = []
|
|
offset = None
|
|
iterations = 0
|
|
max_iterations = 10
|
|
|
|
while len(all_points) < max_batch and iterations < max_iterations:
|
|
iterations += 1
|
|
scroll_data = {
|
|
"limit": min(100, max_batch - len(all_points)),
|
|
"with_payload": True,
|
|
"filter": filter_data
|
|
}
|
|
|
|
if offset:
|
|
scroll_data["offset"] = offset
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{qdrant_url}/collections/{collection}/points/scroll",
|
|
json=scroll_data,
|
|
headers={"Content-Type": "application/json"},
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
points = result.get("result", {}).get("points", [])
|
|
|
|
if not points:
|
|
break
|
|
|
|
all_points.extend(points)
|
|
offset = result.get("result", {}).get("next_page_offset")
|
|
if not offset:
|
|
break
|
|
except Exception as e:
|
|
print(f"Error querying Qdrant: {e}", file=sys.stderr)
|
|
break
|
|
|
|
# Convert to simple dicts
|
|
memories = []
|
|
for point in all_points:
|
|
payload = point.get("payload", {})
|
|
memories.append({
|
|
"id": point.get("id"),
|
|
"content": payload.get("content", ""),
|
|
"role": payload.get("role", ""),
|
|
"timestamp": payload.get("timestamp", ""),
|
|
"turn": payload.get("turn", 0),
|
|
**payload
|
|
})
|
|
|
|
return memories[:max_batch]
|
|
|
|
|
|
def extract_gems(memories: List[Dict[str, Any]], ollama_url: str) -> List[Dict[str, Any]]:
|
|
"""Send memories to qwen3 for gem extraction."""
|
|
if not memories:
|
|
return []
|
|
|
|
prompt = load_curator_prompt()
|
|
|
|
# Build conversation from memories
|
|
conversation_lines = []
|
|
for mem in memories:
|
|
role = mem.get("role", "unknown")
|
|
content = mem.get("content", "")
|
|
if content:
|
|
conversation_lines.append(f"{role}: {content}")
|
|
|
|
conversation_text = "\n".join(conversation_lines)
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{ollama_url}/api/generate",
|
|
json={
|
|
"model": "qwen3:30b-a3b-instruct-2507-q8_0",
|
|
"system": prompt,
|
|
"prompt": f"## Input Conversation\n\n{conversation_text}\n\n## Output\n",
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.1,
|
|
"num_predict": 4000
|
|
}
|
|
},
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
except Exception as e:
|
|
print(f"Error calling Ollama: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
result = response.json()
|
|
output = result.get('response', '').strip()
|
|
|
|
# Extract JSON from output
|
|
if '```json' in output:
|
|
output = output.split('```json')[1].split('```')[0].strip()
|
|
elif '```' in output:
|
|
output = output.split('```')[1].split('```')[0].strip()
|
|
|
|
try:
|
|
start_idx = output.find('[')
|
|
end_idx = output.rfind(']')
|
|
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
|
|
output = output[start_idx:end_idx+1]
|
|
|
|
# Fix common JSON issues from LLM output
|
|
# Replace problematic escape sequences
|
|
output = output.replace('\\n', '\n').replace('\\t', '\t')
|
|
# Fix single quotes in content that break JSON
|
|
output = output.replace("\\'", "'")
|
|
|
|
gems = json.loads(output)
|
|
if not isinstance(gems, list):
|
|
gems = [gems] if gems else []
|
|
return gems
|
|
except json.JSONDecodeError as e:
|
|
# Try to extract gems with regex fallback
|
|
import re
|
|
gem_matches = re.findall(r'"gem"\s*:\s*"([^"]+)"', output)
|
|
if gem_matches:
|
|
gems = []
|
|
for gem_text in gem_matches:
|
|
gems.append({
|
|
"gem": gem_text,
|
|
"context": "Extracted via fallback",
|
|
"categories": ["extracted"],
|
|
"importance": 3,
|
|
"confidence": 0.7
|
|
})
|
|
print(f"⚠️ Fallback extraction: {len(gems)} gems", file=sys.stderr)
|
|
return gems
|
|
print(f"Error parsing curator output: {e}", file=sys.stderr)
|
|
print(f"Raw output: {repr(output[:500])}...", file=sys.stderr)
|
|
return []
|
|
|
|
|
|
def get_embedding(text: str, ollama_url: str) -> Optional[List[float]]:
|
|
"""Get embedding from Ollama."""
|
|
try:
|
|
response = requests.post(
|
|
f"{ollama_url}/api/embeddings",
|
|
json={"model": "mxbai-embed-large", "prompt": text},
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()['embedding']
|
|
except Exception as e:
|
|
print(f"Error getting embedding: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def store_gem(gem: Dict[str, Any], user_id: str, qdrant_url: str, target_collection: str, ollama_url: str) -> bool:
|
|
"""Store a single gem to Qdrant."""
|
|
embedding_text = f"{gem.get('gem', '')} {gem.get('context', '')} {gem.get('snippet', '')}"
|
|
vector = get_embedding(embedding_text, ollama_url)
|
|
|
|
if vector is None:
|
|
return False
|
|
|
|
# Generate ID
|
|
hash_content = f"{user_id}:{gem.get('conversation_id', '')}:{gem.get('turn_range', '')}:{gem.get('gem', '')[:50]}"
|
|
hash_bytes = hashlib.sha256(hash_content.encode()).digest()[:8]
|
|
gem_id = int.from_bytes(hash_bytes, byteorder='big') % (2**63)
|
|
|
|
payload = {
|
|
"user_id": user_id,
|
|
**gem,
|
|
"curated_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
try:
|
|
response = requests.put(
|
|
f"{qdrant_url}/collections/{target_collection}/points",
|
|
json={
|
|
"points": [{
|
|
"id": abs(gem_id),
|
|
"vector": vector,
|
|
"payload": payload
|
|
}]
|
|
},
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error storing gem: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def mark_curated(memory_ids: List, qdrant_url: str, collection: str) -> bool:
|
|
"""Mark memories as curated in Qdrant using POST /points/payload format."""
|
|
if not memory_ids:
|
|
return True
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{qdrant_url}/collections/{collection}/points/payload",
|
|
json={
|
|
"points": memory_ids,
|
|
"payload": {
|
|
"curated": True,
|
|
"curated_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
},
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error marking curated: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="TrueRecall Timer Curator")
|
|
parser.add_argument("--config", "-c", default=str(DEFAULT_CONFIG), help="Config file path")
|
|
parser.add_argument("--dry-run", "-n", action="store_true", help="Don't write, just preview")
|
|
args = parser.parse_args()
|
|
|
|
config = load_config(args.config)
|
|
|
|
qdrant_url = os.getenv("QDRANT_URL", "http://10.0.0.40:6333")
|
|
ollama_url = os.getenv("OLLAMA_URL", "http://10.0.0.10:11434")
|
|
|
|
user_id = config.get("user_id", "rob")
|
|
source_collection = config.get("source_collection", "memories_tr")
|
|
target_collection = config.get("target_collection", "gems_tr")
|
|
max_batch = config.get("max_batch_size", 100)
|
|
|
|
print(f"🔍 TrueRecall Timer Curator")
|
|
print(f"👤 User: {user_id}")
|
|
print(f"📥 Source: {source_collection}")
|
|
print(f"💎 Target: {target_collection}")
|
|
print(f"📦 Max batch: {max_batch}")
|
|
if args.dry_run:
|
|
print("🏃 DRY RUN MODE")
|
|
print()
|
|
|
|
# Get uncurated memories
|
|
print("📥 Fetching uncurated memories...")
|
|
memories = get_uncurated_memories(qdrant_url, source_collection, user_id, max_batch)
|
|
print(f"✅ Found {len(memories)} uncurated memories")
|
|
|
|
if not memories:
|
|
print("🤷 Nothing to curate. Exiting.")
|
|
return
|
|
|
|
# Extract gems
|
|
print(f"\n🧠 Sending {len(memories)} memories to curator...")
|
|
gems = extract_gems(memories, ollama_url)
|
|
print(f"✅ Extracted {len(gems)} gems")
|
|
|
|
if not gems:
|
|
print("⚠️ No gems extracted. Nothing to store.")
|
|
# Still mark as curated so we don't reprocess
|
|
memory_ids = [m["id"] for m in memories] # Keep as integers
|
|
mark_curated(memory_ids, qdrant_url, source_collection)
|
|
return
|
|
|
|
# Preview
|
|
print("\n💎 Gems preview:")
|
|
for i, gem in enumerate(gems[:3], 1):
|
|
print(f" {i}. {gem.get('gem', 'N/A')[:80]}...")
|
|
if len(gems) > 3:
|
|
print(f" ... and {len(gems) - 3} more")
|
|
|
|
if args.dry_run:
|
|
print("\n🏃 DRY RUN: Not storing gems or marking curated.")
|
|
return
|
|
|
|
# Store gems
|
|
print(f"\n💾 Storing {len(gems)} gems...")
|
|
stored = 0
|
|
for gem in gems:
|
|
if store_gem(gem, user_id, qdrant_url, target_collection, ollama_url):
|
|
stored += 1
|
|
print(f"✅ Stored: {stored}/{len(gems)}")
|
|
|
|
# Mark memories as curated
|
|
print("\n📝 Marking memories as curated...")
|
|
memory_ids = [m["id"] for m in memories] # Keep as integers
|
|
if mark_curated(memory_ids, qdrant_url, source_collection):
|
|
print(f"✅ Marked {len(memory_ids)} memories as curated")
|
|
else:
|
|
print(f"⚠️ Failed to mark some memories as curated")
|
|
|
|
print("\n🎉 Curation complete!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|