Files
true-recall/tr-continuous/curator_turn_based.py

292 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""
Turn-Based Curator: Extract gems every N turns (instead of daily).
Usage:
python3 curator_turn_based.py --threshold 10 --dry-run
python3 curator_turn_based.py --threshold 10 --execute
python3 curator_turn_based.py --status # Show turn counts
This tracks turn count since last curation and runs when threshold is reached.
"""
import argparse
import json
import os
import requests
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional
# Config
QDRANT_URL = "http://10.0.0.40:6333"
MEMORIES_COLLECTION = "memories_tr"
GEMS_COLLECTION = "gems_tr"
OLLAMA_URL = "http://10.0.0.10:11434"
CURATOR_MODEL = "ollama-remote/qwen3:30b-a3b-instruct-2507-q8_0"
# State file tracks last curation
STATE_FILE = Path("/tmp/curator_turn_state.json")
def get_curator_prompt(conversation_text: str) -> str:
"""Generate prompt for gem extraction."""
return f"""You are a memory curator. Extract only the most valuable gems (key insights) from this conversation.
Rules:
1. Extract only genuinely important information (decisions, preferences, key facts)
2. Skip transient/trivial content (greetings, questions, temporary requests)
3. Each gem should be self-contained and useful for future context
4. Format: concise, factual statements
5. Max 3-5 gems total
Conversation to curate:
---
{conversation_text}
---
Return ONLY a JSON array of gems like:
[{{"text": "User decided to use X approach for Y", "category": "decision"}}]
Categories: preference, fact, decision, entity, other
JSON:"""
def load_state() -> Dict[str, Any]:
"""Load curation state."""
if STATE_FILE.exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except:
pass
return {"last_turn": 0, "last_curation": None}
def save_state(state: Dict[str, Any]):
"""Save curation state."""
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def get_point_count_since(last_time: str) -> int:
"""Get count of points since last curation time."""
try:
response = requests.post(
f"{QDRANT_URL}/collections/{MEMORIES_COLLECTION}/points/count",
json={
"filter": {
"must": [
{
"key": "timestamp",
"range": {
"gt": last_time
}
}
]
}
},
timeout=30
)
response.raise_for_status()
return response.json().get("result", {}).get("count", 0)
except Exception as e:
print(f"Error getting count: {e}", file=sys.stderr)
return 0
def get_turns_since(last_turn: int, limit: int = 100) -> List[Dict[str, Any]]:
"""Get all turns since last curation."""
try:
response = requests.post(
f"{QDRANT_URL}/collections/{MEMORIES_COLLECTION}/points/scroll",
json={"limit": limit, "with_payload": True},
timeout=30
)
response.raise_for_status()
data = response.json()
turns = []
for point in data.get("result", {}).get("points", []):
turn_num = point.get("payload", {}).get("turn", 0)
if turn_num > last_turn:
turns.append(point)
# Sort by turn number
turns.sort(key=lambda x: x.get("payload", {}).get("turn", 0))
return turns
except Exception as e:
print(f"Error fetching turns: {e}", file=sys.stderr)
return []
def extract_gems_with_llm(conversation_text: str) -> List[Dict[str, str]]:
"""Send conversation to LLM for gem extraction."""
prompt = get_curator_prompt(conversation_text)
try:
response = requests.post(
f"{OLLAMA_URL}/v1/chat/completions",
json={
"model": CURATOR_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 1000
},
timeout=120
)
response.raise_for_status()
data = response.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "[]")
# Extract JSON from response
try:
# Try to find JSON array in response
start = content.find('[')
end = content.rfind(']')
if start != -1 and end != -1:
json_str = content[start:end+1]
gems = json.loads(json_str)
if isinstance(gems, list):
return gems
except:
pass
return []
except Exception as e:
print(f"Error calling LLM: {e}", file=sys.stderr)
return []
def store_gem(gem: Dict[str, str]) -> bool:
"""Store a single gem to gems_tr."""
try:
# Get embedding for gem
response = requests.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": "snowflake-arctic-embed2", "prompt": gem["text"]},
timeout=30
)
response.raise_for_status()
vector = response.json().get("embedding", [])
if not vector:
return False
# Store to gems_tr
response = requests.put(
f"{QDRANT_URL}/collections/{GEMS_COLLECTION}/points",
json={
"points": [{
"id": hash(gem["text"]) % (2**63),
"vector": vector,
"payload": {
"text": gem["text"],
"category": gem.get("category", "other"),
"createdAt": datetime.now(timezone.utc).isoformat(),
"source": "turn_based_curator"
}
}]
},
timeout=30
)
response.raise_for_status()
return True
except Exception as e:
print(f"Error storing gem: {e}", file=sys.stderr)
return False
def main():
parser = argparse.ArgumentParser(description="Turn-based curator")
parser.add_argument("--threshold", "-t", type=int, default=10,
help="Run curation every N turns (default: 10)")
parser.add_argument("--execute", "-e", action="store_true",
help="Execute curation")
parser.add_argument("--dry-run", "-n", action="store_true",
help="Preview what would be curated")
parser.add_argument("--status", "-s", action="store_true",
help="Show current turn status")
args = parser.parse_args()
# Load state
state = load_state()
current_turn = get_current_turn_count()
turns_since = current_turn - state["last_turn"]
if args.status:
print(f"Current turn: {current_turn}")
print(f"Last curation: {state['last_turn']}")
print(f"Turns since last curation: {turns_since}")
print(f"Threshold: {args.threshold}")
print(f"Ready to curate: {'YES' if turns_since >= args.threshold else 'NO'}")
return
print(f"Turn-based Curator")
print(f"Current turn: {current_turn}")
print(f"Last curation: {state['last_turn']}")
print(f"Turns since: {turns_since}")
print(f"Threshold: {args.threshold}")
print()
if turns_since < args.threshold:
print(f"Not enough turns. Need {args.threshold}, have {turns_since}")
return
# Get turns to process
print(f"Fetching {turns_since} turns...")
turns = get_turns_since(state["last_turn"], limit=turns_since + 10)
if not turns:
print("No new turns found")
return
# Build conversation text
conversation_parts = []
for turn in turns:
role = turn.get("payload", {}).get("role", "unknown")
content = turn.get("payload", {}).get("content", "")
conversation_parts.append(f"{role.upper()}: {content}")
conversation_text = "\n\n".join(conversation_parts)
print(f"Processing {len(turns)} turns ({len(conversation_text)} chars)")
print()
if args.dry_run:
print("=== CONVERSATION TEXT ===")
print(conversation_text[:500] + "..." if len(conversation_text) > 500 else conversation_text)
print()
print("[DRY RUN] Would extract gems and store to gems_tr")
return
if not args.execute:
print("Use --execute to run curation or --dry-run to preview")
return
# Extract gems
print("Extracting gems with LLM...")
gems = extract_gems_with_llm(conversation_text)
if not gems:
print("No gems extracted")
return
print(f"Extracted {len(gems)} gems:")
for i, gem in enumerate(gems, 1):
print(f" {i}. [{gem.get('category', 'other')}] {gem['text'][:80]}...")
print()
# Store gems
print("Storing gems...")
success = 0
for gem in gems:
if store_gem(gem):
success += 1
# Update state
state["last_turn"] = current_turn
state["last_curation"] = datetime.now(timezone.utc).isoformat()
save_state(state)
print(f"Done! Stored {success}/{len(gems)} gems")
if __name__ == "__main__":
main()