Initial commit: TrueRecall v2.2 with 30b curator and timer-based curation
This commit is contained in:
BIN
tr-daily/__pycache__/curate_from_qdrant.cpython-312.pyc
Normal file
BIN
tr-daily/__pycache__/curate_from_qdrant.cpython-312.pyc
Normal file
Binary file not shown.
358
tr-daily/curate_from_qdrant.py
Normal file
358
tr-daily/curate_from_qdrant.py
Normal file
@@ -0,0 +1,358 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
True-Recall v2 Curator: Reads from Qdrant kimi_memories
|
||||
|
||||
Reads 24 hours of conversation from Qdrant kimi_memories collection,
|
||||
extracts contextual gems using qwen3, stores to Qdrant gems_tr with mxbai embeddings.
|
||||
|
||||
Usage:
|
||||
python curate_from_qdrant.py --user-id rob
|
||||
python curate_from_qdrant.py --user-id rob --date 2026-02-23
|
||||
"""
|
||||
|
||||
import json
|
||||
import argparse
|
||||
import requests
|
||||
import urllib.request
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional
|
||||
import hashlib
|
||||
|
||||
# Configuration
|
||||
QDRANT_URL = "http://10.0.0.40:6333"
|
||||
SOURCE_COLLECTION = "memories_tr"
|
||||
TARGET_COLLECTION = "gems_tr"
|
||||
|
||||
OLLAMA_URL = "http://10.0.0.10:11434"
|
||||
EMBEDDING_MODEL = "mxbai-embed-large"
|
||||
CURATION_MODEL = "qwen3:4b-instruct"
|
||||
|
||||
# Load curator prompt
|
||||
CURATOR_PROMPT_PATH = "/root/.openclaw/workspace/.projects/true-recall/curator-prompt.md"
|
||||
|
||||
|
||||
def load_curator_prompt() -> str:
|
||||
"""Load the curator system prompt."""
|
||||
try:
|
||||
with open(CURATOR_PROMPT_PATH, 'r') as f:
|
||||
return f.read()
|
||||
except FileNotFoundError:
|
||||
# Fallback to v2 location
|
||||
CURATOR_PROMPT_PATH_V2 = "/root/.openclaw/workspace/.projects/true-recall-v2/curator-prompt.md"
|
||||
with open(CURATOR_PROMPT_PATH_V2, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def get_turns_from_qdrant(user_id: str, date_str: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all conversation turns from Qdrant for a specific user and date.
|
||||
|
||||
Returns turns sorted by conversation_id and turn_number.
|
||||
"""
|
||||
# Build filter for user_id and date
|
||||
filter_data = {
|
||||
"must": [
|
||||
{"key": "user_id", "match": {"value": user_id}},
|
||||
{"key": "date", "match": {"value": date_str}}
|
||||
]
|
||||
}
|
||||
|
||||
# Use scroll API to get all matching points
|
||||
all_points = []
|
||||
offset = None
|
||||
max_iterations = 100 # Safety limit
|
||||
iterations = 0
|
||||
|
||||
while iterations < max_iterations:
|
||||
iterations += 1
|
||||
scroll_data = {
|
||||
"limit": 100,
|
||||
"with_payload": True,
|
||||
"filter": filter_data
|
||||
}
|
||||
|
||||
if offset:
|
||||
scroll_data["offset"] = offset
|
||||
|
||||
req = urllib.request.Request(
|
||||
f"{QDRANT_URL}/collections/{SOURCE_COLLECTION}/points/scroll",
|
||||
data=json.dumps(scroll_data).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as response:
|
||||
result = json.loads(response.read().decode())
|
||||
points = result.get("result", {}).get("points", [])
|
||||
|
||||
if not points:
|
||||
break
|
||||
|
||||
all_points.extend(points)
|
||||
|
||||
# Check if there's more
|
||||
offset = result.get("result", {}).get("next_page_offset")
|
||||
if not offset:
|
||||
break
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
print(f"⚠️ Collection {SOURCE_COLLECTION} not found")
|
||||
return []
|
||||
raise
|
||||
|
||||
# Convert points to turn format (harvested summaries)
|
||||
turns = []
|
||||
for point in all_points:
|
||||
payload = point.get("payload", {})
|
||||
|
||||
# Extract user and AI messages
|
||||
user_msg = payload.get("user_message", "")
|
||||
ai_msg = payload.get("ai_response", "")
|
||||
|
||||
# Get timestamp from created_at
|
||||
created_at = payload.get("created_at", "")
|
||||
|
||||
turn = {
|
||||
"turn": payload.get("turn_number", 0),
|
||||
"user_id": payload.get("user_id", user_id),
|
||||
"user": user_msg,
|
||||
"ai": ai_msg,
|
||||
"conversation_id": payload.get("conversation_id", ""),
|
||||
"session_id": payload.get("session_id", ""),
|
||||
"timestamp": created_at,
|
||||
"date": payload.get("date", date_str),
|
||||
"content_hash": payload.get("content_hash", "")
|
||||
}
|
||||
|
||||
# Skip if no content
|
||||
if turn["user"] or turn["ai"]:
|
||||
turns.append(turn)
|
||||
|
||||
# Sort by conversation_id, then by turn number
|
||||
turns.sort(key=lambda x: (x.get("conversation_id", ""), x.get("turn", 0)))
|
||||
|
||||
return turns
|
||||
|
||||
|
||||
def extract_gems_with_curator(turns: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Use qwen3 to extract gems from conversation turns."""
|
||||
if not turns:
|
||||
return []
|
||||
|
||||
prompt = load_curator_prompt()
|
||||
|
||||
# Build the conversation input
|
||||
conversation_json = json.dumps(turns, indent=2)
|
||||
|
||||
# Call Ollama with native system prompt
|
||||
response = requests.post(
|
||||
f"{OLLAMA_URL}/api/generate",
|
||||
json={
|
||||
"model": CURATION_MODEL,
|
||||
"system": prompt,
|
||||
"prompt": f"## Input Conversation\n\n```json\n{conversation_json}\n```\n\n## Output\n",
|
||||
"stream": False,
|
||||
"options": {
|
||||
"temperature": 0.1,
|
||||
"num_predict": 4000
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Curation failed: {response.text}")
|
||||
|
||||
result = response.json()
|
||||
output = result.get('response', '').strip()
|
||||
|
||||
# Extract JSON from output (handle markdown code blocks)
|
||||
if '```json' in output:
|
||||
output = output.split('```json')[1].split('```')[0].strip()
|
||||
elif '```' in output:
|
||||
output = output.split('```')[1].split('```')[0].strip()
|
||||
|
||||
try:
|
||||
# Extract JSON array - find first [ and last ]
|
||||
start_idx = output.find('[')
|
||||
end_idx = output.rfind(']')
|
||||
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
|
||||
output = output[start_idx:end_idx+1]
|
||||
|
||||
gems = json.loads(output)
|
||||
if not isinstance(gems, list):
|
||||
print(f"Warning: Curator returned non-list, wrapping: {type(gems)}")
|
||||
gems = [gems] if gems else []
|
||||
return gems
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error parsing curator output: {e}")
|
||||
print(f"Raw output: {output[:500]}...")
|
||||
return []
|
||||
|
||||
|
||||
def get_embedding(text: str) -> List[float]:
|
||||
"""Get embedding vector from Ollama using mxbai-embed-large."""
|
||||
response = requests.post(
|
||||
f"{OLLAMA_URL}/api/embeddings",
|
||||
json={
|
||||
"model": EMBEDDING_MODEL,
|
||||
"prompt": text
|
||||
}
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Embedding failed: {response.text}")
|
||||
|
||||
return response.json()['embedding']
|
||||
|
||||
|
||||
def get_gem_id(gem: Dict[str, Any], user_id: str) -> int:
|
||||
"""Generate deterministic integer ID for a gem."""
|
||||
hash_bytes = hashlib.sha256(
|
||||
f"{user_id}:{gem.get('conversation_id', '')}:{gem.get('turn_range', '')}".encode()
|
||||
).digest()[:8]
|
||||
return int.from_bytes(hash_bytes, byteorder='big') % (2**63)
|
||||
|
||||
|
||||
def check_duplicate(gem: Dict[str, Any], user_id: str) -> bool:
|
||||
"""Check if a similar gem already exists in gems_tr."""
|
||||
gem_id = get_gem_id(gem, user_id)
|
||||
|
||||
# Check if point exists
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{QDRANT_URL}/collections/{TARGET_COLLECTION}/points/{gem_id}",
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="GET"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10) as response:
|
||||
return True # Point exists
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return False # Point doesn't exist
|
||||
raise
|
||||
|
||||
|
||||
def store_gem_to_qdrant(gem: Dict[str, Any], user_id: str) -> bool:
|
||||
"""Store a gem to Qdrant with embedding."""
|
||||
# Create embedding from gem text
|
||||
embedding_text = f"{gem.get('gem', '')} {gem.get('context', '')} {gem.get('snippet', '')}"
|
||||
vector = get_embedding(embedding_text)
|
||||
|
||||
# Prepare payload
|
||||
payload = {
|
||||
"user_id": user_id,
|
||||
**gem
|
||||
}
|
||||
|
||||
# Generate deterministic integer ID
|
||||
gem_id = get_gem_id(gem, user_id)
|
||||
|
||||
# Store to Qdrant
|
||||
response = requests.put(
|
||||
f"{QDRANT_URL}/collections/{TARGET_COLLECTION}/points",
|
||||
json={
|
||||
"points": [{
|
||||
"id": gem_id,
|
||||
"vector": vector,
|
||||
"payload": payload
|
||||
}]
|
||||
}
|
||||
)
|
||||
|
||||
return response.status_code == 200
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="True-Recall Curator v2 - Reads from Qdrant")
|
||||
parser.add_argument("--user-id", required=True, help="User ID to process")
|
||||
parser.add_argument("--date", help="Specific date to process (YYYY-MM-DD), defaults to yesterday")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't store, just preview")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Determine date (yesterday by default)
|
||||
if args.date:
|
||||
date_str = args.date
|
||||
else:
|
||||
yesterday = datetime.now() - timedelta(days=1)
|
||||
date_str = yesterday.strftime("%Y-%m-%d")
|
||||
|
||||
print(f"🔍 True-Recall Curator v2 for {args.user_id}")
|
||||
print(f"📅 Processing date: {date_str}")
|
||||
print(f"🧠 Embedding model: {EMBEDDING_MODEL}")
|
||||
print(f"💎 Target collection: {TARGET_COLLECTION}")
|
||||
print()
|
||||
|
||||
# Get turns from Qdrant
|
||||
print(f"📥 Fetching conversation turns from {SOURCE_COLLECTION}...")
|
||||
turns = get_turns_from_qdrant(args.user_id, date_str)
|
||||
print(f"✅ Found {len(turns)} turns")
|
||||
|
||||
if not turns:
|
||||
print("⚠️ No turns to process. Exiting.")
|
||||
return
|
||||
|
||||
# Show sample
|
||||
print("\n📄 Sample turns:")
|
||||
for i, turn in enumerate(turns[:3], 1):
|
||||
user_msg = turn.get("user", "")[:60]
|
||||
ai_msg = turn.get("ai", "")[:60]
|
||||
print(f" Turn {turn.get('turn')}: User: {user_msg}...")
|
||||
print(f" AI: {ai_msg}...")
|
||||
if len(turns) > 3:
|
||||
print(f" ... and {len(turns) - 3} more")
|
||||
|
||||
# Extract gems
|
||||
print("\n🧠 Extracting gems with The Curator (qwen3)...")
|
||||
gems = extract_gems_with_curator(turns)
|
||||
print(f"✅ Extracted {len(gems)} gems")
|
||||
|
||||
if not gems:
|
||||
print("⚠️ No gems extracted. Exiting.")
|
||||
return
|
||||
|
||||
# Preview gems
|
||||
print("\n💎 Preview of extracted gems:")
|
||||
for i, gem in enumerate(gems[:3], 1):
|
||||
print(f"\n--- Gem {i} ---")
|
||||
print(f"Gem: {gem.get('gem', 'N/A')[:100]}...")
|
||||
print(f"Categories: {gem.get('categories', [])}")
|
||||
print(f"Importance: {gem.get('importance', 'N/A')}")
|
||||
print(f"Confidence: {gem.get('confidence', 'N/A')}")
|
||||
|
||||
if len(gems) > 3:
|
||||
print(f"\n... and {len(gems) - 3} more gems")
|
||||
|
||||
if args.dry_run:
|
||||
print("\n🏃 DRY RUN: Not storing gems.")
|
||||
return
|
||||
|
||||
# Check for duplicates and store
|
||||
print("\n💾 Storing gems to Qdrant...")
|
||||
stored = 0
|
||||
skipped = 0
|
||||
failed = 0
|
||||
|
||||
for gem in gems:
|
||||
# Check for duplicates
|
||||
if check_duplicate(gem, args.user_id):
|
||||
print(f" ⏭️ Skipping duplicate: {gem.get('gem', 'N/A')[:50]}...")
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
if store_gem_to_qdrant(gem, args.user_id):
|
||||
stored += 1
|
||||
else:
|
||||
print(f" ⚠️ Failed to store gem: {gem.get('gem', 'N/A')[:50]}...")
|
||||
failed += 1
|
||||
|
||||
print(f"\n✅ Stored: {stored}")
|
||||
print(f"⏭️ Skipped (duplicates): {skipped}")
|
||||
print(f"❌ Failed: {failed}")
|
||||
|
||||
print("\n🎉 Curation complete!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user