chore: Remove unnecessary files and folders

Removed:
- session.md (user-specific session notes)
- migrate_memories.py (one-time migration script)
- test_curator.py (test file)
- __pycache__/ (Python cache)
- tr-compact/ (v1 deprecated)
- tr-daily/ (v1 deprecated)
- tr-worker/ (empty)
- shared/ (empty)
- tr-continuous/migrate_add_curated.py
- tr-continuous/curator_by_count.py
- tr-continuous/curator_turn_based.py
- tr-continuous/curator_cron.sh
- tr-continuous/turn-curator.service
- tr-continuous/README.md (redundant)

Remaining core files:
- README.md, checklist.md, curator-prompt.md
- install.py, push-all.sh, .gitignore
- tr-continuous/curator_timer.py
- tr-continuous/curator_config.json
This commit is contained in:
root
2026-02-24 21:42:48 -06:00
parent 198334c0b4
commit a22e6f095a
13 changed files with 0 additions and 1905 deletions

View File

@@ -1,187 +0,0 @@
#!/usr/bin/env python3
"""
Migrate memories from kimi_memories to memories_tr
- Reads from kimi_memories (Qdrant)
- Cleans/strips noise (metadata, thinking tags)
- Stores to memories_tr (Qdrant)
- Keeps original kimi_memories intact
"""
import json
import urllib.request
import urllib.error
from datetime import datetime
from typing import List, Dict, Any
QDRANT_URL = "http://10.0.0.40:6333"
SOURCE_COLLECTION = "kimi_memories"
TARGET_COLLECTION = "memories_tr"
def clean_content(text: str) -> str:
"""Clean noise from content"""
if not text:
return ""
cleaned = text
# Remove metadata JSON blocks
import re
cleaned = re.sub(r'Conversation info \(untrusted metadata\):\s*```json\s*\{[\s\S]*?\}\s*```', '', cleaned)
# Remove thinking tags
cleaned = re.sub(r'\[thinking:[^\]]*\]', '', cleaned)
# Remove timestamp lines
cleaned = re.sub(r'\[\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2} [A-Z]{3}\]', '', cleaned)
# Clean up whitespace
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
cleaned = cleaned.strip()
return cleaned
def get_all_points(collection: str) -> List[Dict]:
"""Get all points from a collection"""
all_points = []
offset = None
max_iterations = 1000
iterations = 0
while iterations < max_iterations:
iterations += 1
scroll_data = {
"limit": 100,
"with_payload": True,
"with_vector": True
}
if offset:
scroll_data["offset"] = offset
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{collection}/points/scroll",
data=json.dumps(scroll_data).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as response:
result = json.loads(response.read().decode())
points = result.get("result", {}).get("points", [])
if not points:
break
all_points.extend(points)
offset = result.get("result", {}).get("next_page_offset")
if not offset:
break
except urllib.error.HTTPError as e:
print(f"Error: {e}")
break
return all_points
def store_points(collection: str, points: List[Dict]) -> int:
"""Store points to collection"""
if not points:
return 0
# Batch upload
batch_size = 100
stored = 0
for i in range(0, len(points), batch_size):
batch = points[i:i+batch_size]
points_data = {
"points": batch
}
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{collection}/points",
data=json.dumps(points_data).encode(),
headers={"Content-Type": "application/json"},
method="PUT"
)
try:
with urllib.request.urlopen(req, timeout=60) as response:
if response.status == 200:
stored += len(batch)
except urllib.error.HTTPError as e:
print(f"Error storing batch: {e}")
return stored
def migrate_point(point: Dict) -> Dict:
"""Clean a single point"""
payload = point.get("payload", {})
# Clean user and AI messages
user_msg = clean_content(payload.get("user_message", ""))
ai_msg = clean_content(payload.get("ai_response", ""))
# Keep other fields
cleaned_payload = {
**payload,
"user_message": user_msg,
"ai_response": ai_msg,
"migrated_from": "kimi_memories",
"migrated_at": datetime.now().isoformat()
}
return {
"id": point.get("id"),
"vector": point.get("vector"),
"payload": cleaned_payload
}
def main():
print("=" * 60)
print("Memory Migration: kimi_memories → memories_tr")
print("=" * 60)
print()
# Check source
print(f"📥 Reading from {SOURCE_COLLECTION}...")
source_points = get_all_points(SOURCE_COLLECTION)
print(f" Found {len(source_points)} points")
if not source_points:
print("❌ No points to migrate")
return
# Clean points
print(f"\n🧹 Cleaning {len(source_points)} points...")
cleaned_points = [migrate_point(p) for p in source_points]
print(f" ✓ Cleaned")
# Store to target
print(f"\n💾 Storing to {TARGET_COLLECTION}...")
stored = store_points(TARGET_COLLECTION, cleaned_points)
print(f" ✓ Stored {stored} points")
# Verify
print(f"\n🔍 Verifying...")
target_points = get_all_points(TARGET_COLLECTION)
print(f" Target now has {len(target_points)} points")
# Summary
print()
print("=" * 60)
print("Migration Summary:")
print(f" Source ({SOURCE_COLLECTION}): {len(source_points)} points")
print(f" Target ({TARGET_COLLECTION}): {len(target_points)} points")
print(f" Cleaned & migrated: {stored} points")
print("=" * 60)
if stored == len(source_points):
print("\n✅ Migration complete!")
else:
print(f"\n⚠️ Warning: Only migrated {stored}/{len(source_points)} points")
if __name__ == "__main__":
main()

View File

@@ -1,494 +0,0 @@
# TrueRecall v2 - Session Notes
**Last Updated:** 2026-02-24 19:02 CST
**Status:** ✅ Active & Verified
**Version:** v2.2 (Timer-based curation deployed)
---
## Session End (18:09 CST)
**Reason:** User starting new session
**Current State:**
- Real-time watcher: ✅ Active (capturing live)
- Timer curator: ✅ Deployed (every 30 min via cron)
- Daily curator: ❌ Removed (replaced by timer)
- Total memories: 12,378 (all tagged with `curated: false`)
- Gems: 5 (from Feb 18 test)
**Next session start:** Read this file, then check:
```bash
# Quick status
python3 ~/.openclaw/workspace/.projects/true-recall-v2/tr-continuous/curator_by_count.py --status
sudo systemctl status mem-qdrant-watcher
curl -s http://<QDRANT_IP>:6333/collections/memories_tr | jq '.result.points_count'
```
---
## Executive Summary
TrueRecall v2 is a complete memory system with real-time capture, daily curation, and context injection. All components are operational.
---
## Current State (Verified 18:09 CST)
### Qdrant Collections
| Collection | Points | Purpose | Status |
|------------|--------|---------|--------|
| `memories_tr` | **12,378** | Full text (live capture) | ✅ Active |
| `gems_tr` | **5** | Curated gems (injection) | ✅ Active |
| `true_recall` | existing | Legacy archive | 📦 Preserved |
| `kimi_memories` | 12,223 | Original backup | 📦 Preserved |
**Note:** All memories tagged with `curated: false` for timer curator.
### Services
| Service | Status | Uptime |
|---------|--------|--------|
| `mem-qdrant-watcher` | ✅ Active | 30+ min |
| OpenClaw Gateway | ✅ Running | 2026.2.23 |
| memory-qdrant plugin | ✅ Loaded | recall: gems_tr, capture: memories_tr |
---
## Architecture
### v2.2: Timer-Based Curation (DEPLOYED)
**Data Flow:**
```
┌─────────────────┐ ┌──────────────────────┐ ┌─────────────┐
│ OpenClaw Chat │────▶│ Real-Time Watcher │────▶│ memories_tr │
│ (Session JSONL)│ │ (Python daemon) │ │ (Qdrant) │
└─────────────────┘ └──────────────────────┘ └──────┬──────┘
│ Every 30 min
┌──────────────────┐
│ Timer Curator │
│ (cron/qwen3) │
└────────┬─────────┘
┌──────────────────┐
│ gems_tr │
│ (Qdrant) │
└────────┬─────────┘
Per turn │
┌──────────────────┐
│ memory-qdrant │
│ plugin │
└──────────────────┘
```
**Key Changes:**
- ✅ Replaced daily 2:45 AM batch with 30-minute timer
- ✅ All memories tagged `curated: false` on write
- ✅ Migration completed for 12,378 existing memories
- ✅ No Redis dependency (direct Qdrant only)
---
## Components
### Curation Mode: Timer-Based (DEPLOYED v2.2)
| Setting | Value | Adjustable |
|---------|-------|------------|
| **Trigger** | Cron timer | ✅ |
| **Interval** | 30 minutes | ✅ Config file |
| **Batch size** | 100 memories max | ✅ Config file |
| **Minimum** | None (0 is OK) | — |
**Config:** `/tr-continuous/curator_config.json`
```json
{
"timer_minutes": 30,
"max_batch_size": 100,
"user_id": "rob",
"source_collection": "memories_tr",
"target_collection": "gems_tr"
}
```
**Cron:**
```
*/30 * * * * cd .../tr-continuous && python3 curator_timer.py
```
**Old modes deprecated:**
- ❌ Turn-based (every N turns)
- ❌ Hybrid (timer + turn)
- ❌ Daily batch (2:45 AM)
### 1. Real-Time Watcher (Primary Capture)
**Location:** `~/.openclaw/workspace/skills/qdrant-memory/scripts/realtime_qdrant_watcher.py`
**Function:**
- Watches `~/.openclaw/agents/main/sessions/*.jsonl`
- Parses every conversation turn in real-time
- Embeds with `snowflake-arctic-embed2` (Ollama @ <OLLAMA_IP>)
- Stores directly to `memories_tr` (no Redis)
- **Cleans content:** Removes markdown, tables, metadata, thinking tags
**Service:** `mem-qdrant-watcher.service`
- **Status:** Active since 16:46:53 CST
- **Systemd:** Enabled, auto-restart
**Log:** `journalctl -u mem-qdrant-watcher -f`
---
### 2. Content Cleaner (Existing Data)
**Location:** `~/.openclaw/workspace/skills/qdrant-memory/scripts/clean_memories_tr.py`
**Function:**
- Batch-cleans existing `memories_tr` points
- Removes: `**bold**`, `|tables|`, `` `code` ``, `---` rules, `# headers`
- Flattens nested content dicts
- Rate-limited to prevent Qdrant overload
**Usage:**
```bash
# Dry run (preview)
python3 clean_memories_tr.py --dry-run
# Clean all
python3 clean_memories_tr.py --execute
# Clean limited (test)
python3 clean_memories_tr.py --execute --limit 100
```
---
### 3. Timer Curator (v2.2 - DEPLOYED)
**Replaces:** Daily curator (2:45 AM batch) and turn-based curator
**Location:** `~/.openclaw/workspace/.projects/true-recall-v2/tr-continuous/curator_timer.py`
**Schedule:** Every 30 minutes (cron)
**Flow:**
1. Query uncurated memories (`curated: false`)
2. Send batch to qwen3 (max 100)
3. Extract gems using curator prompt
4. Store gems to `gems_tr`
5. Mark processed memories as `curated: true`
**Files:**
| File | Purpose |
|------|---------|
| `curator_timer.py` | Main curator script |
| `curator_config.json` | Adjustable settings |
| `migrate_add_curated.py` | One-time migration (completed) |
**Usage:**
```bash
# Dry run (preview)
python3 curator_timer.py --dry-run
# Manual run
python3 curator_timer.py --config curator_config.json
```
**Status:** ✅ Deployed, first run will process ~12,378 existing memories
### 5. Silent Compacting (NEW - Concept)
**Idea:** Automatically remove old context from prompt when token limit approached.
**Behavior:**
- Trigger: Context window > 80% full
- Action: Remove oldest messages (silently)
- Preserve: Gems always kept, recent N turns kept
- Result: Seamless conversation without "compacting" notification
**Config:**
```json
{
"compacting": {
"enabled": true,
"triggerAtPercent": 80,
"keepRecentTurns": 20,
"preserveGems": true,
"silent": true
}
}
```
**Status:** ⏳ Concept only - requires OpenClaw core changes
### 6. memory-qdrant Plugin
**Location:** `~/.openclaw/extensions/memory-qdrant/`
**Config:**
```json
{
"collectionName": "gems_tr",
"captureCollection": "memories_tr",
"autoRecall": true,
"autoCapture": true
}
```
**Function:**
- **Recall:** Searches `gems_tr`, injects as context (hidden)
- **Capture:** Session-level capture to `memories_tr` (backup)
**Status:** Loaded, dual collection support working
---
## Files & Locations
### Core Project Files
```
~/.openclaw/workspace/.projects/true-recall-v2/
├── README.md # Architecture docs
├── session.md # This file
├── curator-prompt.md # Gem extraction prompt
├── tr-daily/ # Daily batch curation
│ └── curate_from_qdrant.py # Daily curator (2:45 AM)
├── tr-continuous/ # Real-time curation (NEW)
│ ├── curator_by_count.py # Turn-based curator
│ ├── curator_turn_based.py # Alternative approach
│ ├── curator_cron.sh # Cron wrapper
│ ├── turn-curator.service # Systemd service
│ └── README.md # Documentation
└── shared/
└── (shared resources)
```
### New Files (2026-02-24 19:00)
| File | Purpose |
|------|---------|
| `tr-continuous/curator_timer.py` | Timer-based curator (deployed) |
| `tr-continuous/curator_config.json` | Curator settings |
| `tr-continuous/migrate_add_curated.py` | Migration script (completed) |
### Legacy Files (Pre-v2.2)
| File | Status | Note |
|------|--------|------|
| `tr-daily/curate_from_qdrant.py` | 📦 Archived | Replaced by timer |
| `tr-continuous/curator_by_count.py` | 📦 Archived | Replaced by timer |
| `tr-continuous/curator_turn_based.py` | 📦 Archived | Replaced by timer |
### System Locations
| File | Purpose |
|------|---------|
| `~/.openclaw/extensions/memory-qdrant/` | Plugin code |
| `~/.openclaw/openclaw.json` | Plugin configuration |
| `/etc/systemd/system/mem-qdrant-watcher.service` | Systemd service |
---
## Changes Made Today (2026-02-24 19:00)
### 1. Timer Curator Deployed (v2.2)
- Created `curator_timer.py` — simplified timer-based curation
- Created `curator_config.json` — adjustable settings
- Removed daily 2:45 AM cron job
- Added `*/30 * * * *` cron timer
- **Status:** ✅ Deployed, logs to `/var/log/true-recall-timer.log`
### 2. Migration Completed
- Created `migrate_add_curated.py`
- Tagged 12,378 existing memories with `curated: false`
- Updated watcher to add `curated: false` to new memories
- **Status:** ✅ Complete
### 3. Simplified Architecture
- ❌ Removed turn-based curator complexity
- ❌ Removed daily batch processing
- ✅ Single timer trigger every 30 minutes
- ✅ No minimum threshold (processes 0-N memories)
---
## Configuration
### memory-qdrant Plugin
**File:** `~/.openclaw/openclaw.json`
```json
{
"memory-qdrant": {
"config": {
"autoCapture": true,
"autoRecall": true,
"collectionName": "gems_tr",
"captureCollection": "memories_tr",
"embeddingModel": "snowflake-arctic-embed2",
"maxRecallResults": 2,
"minRecallScore": 0.7,
"ollamaUrl": "http://<OLLAMA_IP>:11434",
"qdrantUrl": "http://<QDRANT_IP>:6333"
},
"enabled": true
}
}
```
### Gateway (OpenClaw Update Fix)
```json
{
"gateway": {
"controlUi": {
"allowedOrigins": ["*"],
"allowInsecureAuth": false,
"dangerouslyDisableDeviceAuth": true
}
}
}
```
---
## Validation Commands
### Check Collections
```bash
# Points count
curl -s http://<QDRANT_IP>:6333/collections/memories_tr | jq '.result.points_count'
curl -s http://<QDRANT_IP>:6333/collections/gems_tr | jq '.result.points_count'
# Recent points
curl -s -X POST http://<QDRANT_IP>:6333/collections/memories_tr/points/scroll \
-H "Content-Type: application/json" \
-d '{"limit": 5, "with_payload": true}' | jq '.result.points[].payload.content'
```
### Check Services
```bash
# Watcher status
sudo systemctl status mem-qdrant-watcher
# Watcher logs
sudo journalctl -u mem-qdrant-watcher -n 20
# OpenClaw status
openclaw status
```
---
## Troubleshooting
### Issue: Watcher Not Capturing
**Check:**
1. Service running? `systemctl status mem-qdrant-watcher`
2. Logs: `journalctl -u mem-qdrant-watcher -f`
3. Qdrant accessible? `curl http://<QDRANT_IP>:6333/`
4. Ollama accessible? `curl http://<OLLAMA_IP>:11434/api/tags`
### Issue: Cleaner Fails
**Common causes:**
- Qdrant connection timeout (add `time.sleep(0.1)` between batches)
- Nested content dicts (handled in updated script)
- Type errors (non-string content — handled)
### Issue: Plugin Not Loading
**Check:**
1. `openclaw.json` syntax valid? `openclaw config validate`
2. Plugin compiled? `cd ~/.openclaw/extensions/memory-qdrant && npx tsc`
3. Gateway logs: `tail /tmp/openclaw/openclaw-$(date +%Y-%m-%d).log`
---
## Cron Schedule (Updated v2.2)
| Time | Job | Script | Status |
|------|-----|--------|--------|
| Every 30 min | Timer curator | `tr-continuous/curator_timer.py` | ✅ Active |
| Per turn | Capture | `mem-qdrant-watcher` | ✅ Daemon |
| Per turn | Injection | `memory-qdrant` plugin | ✅ Active |
**Removed:**
- ❌ 2:45 AM daily curator
- ❌ Every-minute turn curator check
---
## Next Steps
### Immediate
- ⏳ Monitor first timer run (logs: `/var/log/true-recall-timer.log`)
- ⏳ Validate gem extraction quality from timer curator
- ⏳ Archive old curator scripts if timer works
### Completed ✅
-**Compactor config** — Minimal overhead: `mode: default`, `reserveTokensFloor: 0`, `memoryFlush: false`
### Future
- ⏳ Curator tuning based on timer results
- ⏳ Silent compacting (requires OpenClaw core changes)
### Planned Features (Backlog)
-**Interactive install script** — Prompts for embedding model, timer interval, batch size, endpoints
-**Single embedding model option** — Use one model for both collections
-**Configurable thresholds** — Per-user customization via prompts
**Compactor Settings (Applied):**
```json5
{
agents: {
defaults: {
compaction: {
mode: "default",
reserveTokensFloor: 0,
memoryFlush: { enabled: false }
}
}
}
}
```
**Note:** Only `mode`, `reserveTokensFloor`, and `memoryFlush` are valid under `agents.defaults.compaction`. Other settings are Pi runtime parameters.
**Install script prompts:**
1. Embedding model (snowflake vs mxbai)
2. Timer interval (5 min / 30 min / hourly)
3. Batch size (50 / 100 / 500)
4. Qdrant/Ollama URLs
5. User ID
---
## Session Recovery
If starting fresh:
1. Read `README.md` for architecture overview
2. Check service status: `sudo systemctl status mem-qdrant-watcher`
3. Check timer curator: `tail /var/log/true-recall-timer.log`
4. Verify collections: `curl http://<QDRANT_IP>:6333/collections`
---
*Last Verified: 2026-02-24 19:29 CST*
*Version: v2.2 (30b curator, install script planned)*

View File

@@ -1,64 +0,0 @@
#!/usr/bin/env python3
"""Quick test of curator with simple input"""
import json
import requests
# Load prompt from v1
with open('/root/.openclaw/workspace/.projects/true-recall-v1/curator-prompt.md') as f:
prompt = f.read()
# Test with a simple conversation
test_turns = [
{
'turn': 1,
'user_id': 'rob',
'user': 'I want to switch from Redis to Qdrant for memory storage',
'ai': 'Got it - Qdrant is a good choice for vector storage.',
'conversation_id': 'test123',
'timestamp': '2026-02-23T10:00:00',
'date': '2026-02-23'
},
{
'turn': 2,
'user_id': 'rob',
'user': 'Yes, and I want the curator to read from Qdrant directly',
'ai': 'Makes sense - we can modify the curator to query Qdrant instead of Redis.',
'conversation_id': 'test123',
'timestamp': '2026-02-23T10:01:00',
'date': '2026-02-23'
}
]
conversation_json = json.dumps(test_turns, indent=2)
prompt_text = f"""## Input Conversation
```json
{conversation_json}
```
## Output
"""
response = requests.post(
'http://10.0.0.10:11434/api/generate',
json={
'model': 'qwen3:4b-instruct',
'system': prompt,
'prompt': prompt_text,
'stream': False,
'options': {'temperature': 0.1, 'num_predict': 2000}
},
timeout=120
)
result = response.json()
output = result.get('response', '').strip()
print('=== RAW OUTPUT ===')
print(output[:2000])
print()
print('=== PARSED ===')
# Try to extract JSON
if '```json' in output:
parsed = output.split('```json')[1].split('```')[0].strip()
print(parsed)

View File

@@ -1,105 +0,0 @@
#!/usr/bin/env python3
"""
TrueRecall v2 - Compaction Hook
Fast Redis queue push for compaction events
Called by OpenClaw session_before_compact hook
"""
import json
import sys
import redis
from datetime import datetime
from typing import List, Dict, Any
# Redis config
REDIS_HOST = "10.0.0.36"
REDIS_PORT = 6379
REDIS_DB = 0
QUEUE_KEY = "tr:compact_queue"
TAG_PREFIX = "tr:processed"
def get_redis_client():
return redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
decode_responses=True
)
def tag_turns(messages: List[Dict], user_id: str = "rob"):
"""Tag turns so v1 daily curator skips them"""
r = get_redis_client()
pipe = r.pipeline()
for msg in messages:
conv_id = msg.get("conversation_id", "unknown")
turn = msg.get("turn", 0)
tag_key = f"{TAG_PREFIX}:{conv_id}:{turn}"
pipe.setex(tag_key, 86400, "1") # 24h TTL
pipe.execute()
def queue_messages(messages: List[Dict], user_id: str = "rob"):
"""Push messages to Redis queue for background processing"""
r = get_redis_client()
queue_item = {
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"message_count": len(messages),
"messages": messages
}
# LPUSH to queue (newest first)
r.lpush(QUEUE_KEY, json.dumps(queue_item))
return len(messages)
def process_compaction_event(event_data: Dict):
"""
Process session_before_compact event from OpenClaw
Expected event_data:
{
"session_id": "uuid",
"user_id": "rob",
"messages_being_compacted": [
{"role": "user", "content": "...", "turn": 1, "conversation_id": "..."},
...
],
"compaction_reason": "context_limit"
}
"""
user_id = event_data.get("user_id", "rob")
messages = event_data.get("messages_being_compacted", [])
if not messages:
return {"status": "ok", "queued": 0, "reason": "no_messages"}
# Tag turns for v1 coordination
tag_turns(messages, user_id)
# Queue for background processing
count = queue_messages(messages, user_id)
return {
"status": "ok",
"queued": count,
"user_id": user_id,
"queue_key": QUEUE_KEY
}
def main():
"""CLI entry point - reads JSON from stdin"""
try:
event_data = json.load(sys.stdin)
result = process_compaction_event(event_data)
print(json.dumps(result))
sys.exit(0)
except Exception as e:
print(json.dumps({"status": "error", "error": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,101 +0,0 @@
# Turn-Based Curator
Extract gems every N turns instead of waiting for daily curation.
## Files
| File | Purpose |
|------|---------|
| `curator_turn_based.py` | Main script - checks turn count, extracts gems |
| `curator_cron.sh` | Cron wrapper to run every minute |
| `turn-curator.service` | Alternative systemd service (runs on-demand) |
## Usage
### Manual Run
```bash
# Check current status
python3 curator_turn_based.py --status
# Preview what would be curated
python3 curator_turn_based.py --threshold 10 --dry-run
# Execute curation
python3 curator_turn_based.py --threshold 10 --execute
```
### Automatic (Cron)
Add to crontab:
```bash
* * * * * /root/.openclaw/workspace/.projects/true-recall-v2/tr-continuous/curator_cron.sh
```
Or use systemd timer:
```bash
sudo cp turn-curator.service /etc/systemd/system/
sudo systemctl enable turn-curator.timer # If you create a timer
```
### Automatic (Integrated)
Alternative: Modify `realtime_qdrant_watcher.py` to trigger curation every 10 turns.
## How It Works
1. **Tracks turn count** - Stores last curation turn in `/tmp/curator_turn_state.json`
2. **Monitors delta** - Compares current turn count vs last curation
3. **Triggers at threshold** - When 10+ new turns exist, runs curation
4. **Extracts gems** - Sends conversation to qwen3, gets gems
5. **Stores results** - Saves gems to `gems_tr` collection
## State File
`/tmp/curator_turn_state.json`:
```json
{
"last_turn": 150,
"last_curation": "2026-02-24T17:00:00Z"
}
```
## Comparison with Daily Curator
| Feature | Daily Curator | Turn-Based Curator |
|---------|--------------|-------------------|
| Schedule | 2:45 AM daily | Every 10 turns (dynamic) |
| Time window | 24 hours | Variable (depends on chat frequency) |
| Trigger | Cron | Turn threshold |
| Use case | Nightly batch | Real-time-ish extraction |
| Overlap | Low | Possible with daily curator |
## Recommendation
Use **BOTH**:
- **Turn-based**: Every 10 turns for active conversations
- **Daily**: 2:45 AM as backup/catch-all
They'll deduplicate automatically (same embeddings → skipped).
## Testing
```bash
# Simulate 10 turns
for i in {1..10}; do
echo "Test message $i" > /dev/null
done
# Check status
python3 curator_turn_based.py --status
# Run manually
python3 curator_turn_based.py --threshold 10 --execute
```
## Status
- ✅ Script created: `curator_turn_based.py`
- ✅ Cron wrapper: `curator_cron.sh`
- ⏳ Deployment: Optional (manual or cron)
- ⏳ Testing: Pending

View File

@@ -1,194 +0,0 @@
#!/usr/bin/env python3
"""
Turn-Based Curator: Extract gems every N new memories (turns).
Usage:
python3 curator_by_count.py --threshold 10 --dry-run
python3 curator_by_count.py --threshold 10 --execute
python3 curator_by_count.py --status
"""
import argparse
import json
import requests
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
QDRANT_URL = "http://10.0.0.40:6333"
MEMORIES = "memories_tr"
GEMS = "gems_tr"
OLLAMA = "http://10.0.0.10:11434"
MODEL = "ollama-remote/qwen3:30b-a3b-instruct-2507-q8_0"
STATE_FILE = Path("/tmp/curator_count_state.json")
def load_state():
if STATE_FILE.exists():
with open(STATE_FILE) as f:
return json.load(f)
return {"last_count": 0, "last_time": None}
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def get_total_count():
try:
r = requests.get(f"{QDRANT_URL}/collections/{MEMORIES}", timeout=10)
return r.json()["result"]["points_count"]
except:
return 0
def get_recent_memories(hours=1):
"""Get memories from last N hours."""
since = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
try:
r = requests.post(
f"{QDRANT_URL}/collections/{MEMORIES}/points/scroll",
json={"limit": 1000, "with_payload": True},
timeout=30
)
points = r.json()["result"]["points"]
# Filter by timestamp
recent = [p for p in points if p.get("payload", {}).get("timestamp", "") > since]
return recent
except:
return []
def extract_gems(memories):
"""Send to LLM for gem extraction."""
if not memories:
return []
# Build conversation
parts = []
for m in memories:
role = m["payload"].get("role", "unknown")
content = m["payload"].get("content", "")[:500] # Limit per message
parts.append(f"{role.upper()}: {content}")
conversation = "\n\n".join(parts[:20]) # Max 20 messages
prompt = f"""Extract 3-5 key gems (insights, decisions, facts) from this conversation.
Conversation:
{conversation}
Return JSON: [{{"text": "gem", "category": "decision|fact|preference"}}]"""
try:
r = requests.post(
f"{OLLAMA}/v1/chat/completions",
json={
"model": MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
},
timeout=120
)
content = r.json()["choices"][0]["message"]["content"]
# Parse JSON
start = content.find('[')
end = content.rfind(']')
if start >= 0 and end > start:
return json.loads(content[start:end+1])
except:
pass
return []
def store_gem(gem):
"""Store gem to gems_tr."""
try:
# Get embedding
r = requests.post(
f"{OLLAMA}/api/embeddings",
json={"model": "snowflake-arctic-embed2", "prompt": gem["text"]},
timeout=30
)
vector = r.json()["embedding"]
# Store
r = requests.put(
f"{QDRANT_URL}/collections/{GEMS}/points",
json={
"points": [{
"id": abs(hash(gem["text"])) % (2**63),
"vector": vector,
"payload": {
"text": gem["text"],
"category": gem.get("category", "other"),
"createdAt": datetime.now(timezone.utc).isoformat(),
"source": "turn_curator"
}
}]
},
timeout=30
)
return r.status_code == 200
except:
return False
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--threshold", "-t", type=int, default=10)
parser.add_argument("--execute", "-e", action="store_true")
parser.add_argument("--dry-run", "-n", action="store_true")
parser.add_argument("--status", "-s", action="store_true")
args = parser.parse_args()
state = load_state()
current = get_total_count()
new_points = current - state.get("last_count", 0)
if args.status:
print(f"Total memories: {current}")
print(f"Last curated: {state.get('last_count', 0)}")
print(f"New since last: {new_points}")
print(f"Threshold: {args.threshold}")
print(f"Ready: {'YES' if new_points >= args.threshold else 'NO'}")
return
print(f"Curator: {new_points} new / {args.threshold} threshold")
if new_points < args.threshold:
print("Not enough new memories")
return
# Get recent memories (last hour should cover the new points)
memories = get_recent_memories(hours=1)
print(f"Fetched {len(memories)} recent memories")
if not memories:
print("No memories to process")
return
if args.dry_run:
print(f"[DRY RUN] Would process {len(memories)} memories")
return
if not args.execute:
print("Use --execute to run or --dry-run to preview")
return
# Extract gems
print("Extracting gems...")
gems = extract_gems(memories)
print(f"Extracted {len(gems)} gems")
# Store
success = 0
for gem in gems:
if store_gem(gem):
success += 1
print(f" Stored: {gem['text'][:60]}...")
# Update state
state["last_count"] = current
state["last_time"] = datetime.now(timezone.utc).isoformat()
save_state(state)
print(f"Done: {success}/{len(gems)} gems stored")
if __name__ == "__main__":
main()

View File

@@ -1,12 +0,0 @@
#!/bin/bash
# Turn-based curator cron - runs every minute to check if 10 turns reached
SCRIPT_DIR="/root/.openclaw/workspace/.projects/true-recall-v2/tr-continuous"
# Check if enough turns accumulated
/usr/bin/python3 "${SCRIPT_DIR}/curator_turn_based.py" --threshold 10 --status 2>/dev/null | grep -q "Ready to curate: YES"
if [ $? -eq 0 ]; then
# Run curation
/usr/bin/python3 "${SCRIPT_DIR}/curator_turn_based.py" --threshold 10 --execute 2>&1 | logger -t turn-curator
fi

View File

@@ -1,291 +0,0 @@
#!/usr/bin/env python3
"""
Turn-Based Curator: Extract gems every N turns (instead of daily).
Usage:
python3 curator_turn_based.py --threshold 10 --dry-run
python3 curator_turn_based.py --threshold 10 --execute
python3 curator_turn_based.py --status # Show turn counts
This tracks turn count since last curation and runs when threshold is reached.
"""
import argparse
import json
import os
import requests
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional
# Config
QDRANT_URL = "http://10.0.0.40:6333"
MEMORIES_COLLECTION = "memories_tr"
GEMS_COLLECTION = "gems_tr"
OLLAMA_URL = "http://10.0.0.10:11434"
CURATOR_MODEL = "ollama-remote/qwen3:30b-a3b-instruct-2507-q8_0"
# State file tracks last curation
STATE_FILE = Path("/tmp/curator_turn_state.json")
def get_curator_prompt(conversation_text: str) -> str:
"""Generate prompt for gem extraction."""
return f"""You are a memory curator. Extract only the most valuable gems (key insights) from this conversation.
Rules:
1. Extract only genuinely important information (decisions, preferences, key facts)
2. Skip transient/trivial content (greetings, questions, temporary requests)
3. Each gem should be self-contained and useful for future context
4. Format: concise, factual statements
5. Max 3-5 gems total
Conversation to curate:
---
{conversation_text}
---
Return ONLY a JSON array of gems like:
[{{"text": "User decided to use X approach for Y", "category": "decision"}}]
Categories: preference, fact, decision, entity, other
JSON:"""
def load_state() -> Dict[str, Any]:
"""Load curation state."""
if STATE_FILE.exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except:
pass
return {"last_turn": 0, "last_curation": None}
def save_state(state: Dict[str, Any]):
"""Save curation state."""
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def get_point_count_since(last_time: str) -> int:
"""Get count of points since last curation time."""
try:
response = requests.post(
f"{QDRANT_URL}/collections/{MEMORIES_COLLECTION}/points/count",
json={
"filter": {
"must": [
{
"key": "timestamp",
"range": {
"gt": last_time
}
}
]
}
},
timeout=30
)
response.raise_for_status()
return response.json().get("result", {}).get("count", 0)
except Exception as e:
print(f"Error getting count: {e}", file=sys.stderr)
return 0
def get_turns_since(last_turn: int, limit: int = 100) -> List[Dict[str, Any]]:
"""Get all turns since last curation."""
try:
response = requests.post(
f"{QDRANT_URL}/collections/{MEMORIES_COLLECTION}/points/scroll",
json={"limit": limit, "with_payload": True},
timeout=30
)
response.raise_for_status()
data = response.json()
turns = []
for point in data.get("result", {}).get("points", []):
turn_num = point.get("payload", {}).get("turn", 0)
if turn_num > last_turn:
turns.append(point)
# Sort by turn number
turns.sort(key=lambda x: x.get("payload", {}).get("turn", 0))
return turns
except Exception as e:
print(f"Error fetching turns: {e}", file=sys.stderr)
return []
def extract_gems_with_llm(conversation_text: str) -> List[Dict[str, str]]:
"""Send conversation to LLM for gem extraction."""
prompt = get_curator_prompt(conversation_text)
try:
response = requests.post(
f"{OLLAMA_URL}/v1/chat/completions",
json={
"model": CURATOR_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 1000
},
timeout=120
)
response.raise_for_status()
data = response.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "[]")
# Extract JSON from response
try:
# Try to find JSON array in response
start = content.find('[')
end = content.rfind(']')
if start != -1 and end != -1:
json_str = content[start:end+1]
gems = json.loads(json_str)
if isinstance(gems, list):
return gems
except:
pass
return []
except Exception as e:
print(f"Error calling LLM: {e}", file=sys.stderr)
return []
def store_gem(gem: Dict[str, str]) -> bool:
"""Store a single gem to gems_tr."""
try:
# Get embedding for gem
response = requests.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": "snowflake-arctic-embed2", "prompt": gem["text"]},
timeout=30
)
response.raise_for_status()
vector = response.json().get("embedding", [])
if not vector:
return False
# Store to gems_tr
response = requests.put(
f"{QDRANT_URL}/collections/{GEMS_COLLECTION}/points",
json={
"points": [{
"id": hash(gem["text"]) % (2**63),
"vector": vector,
"payload": {
"text": gem["text"],
"category": gem.get("category", "other"),
"createdAt": datetime.now(timezone.utc).isoformat(),
"source": "turn_based_curator"
}
}]
},
timeout=30
)
response.raise_for_status()
return True
except Exception as e:
print(f"Error storing gem: {e}", file=sys.stderr)
return False
def main():
parser = argparse.ArgumentParser(description="Turn-based curator")
parser.add_argument("--threshold", "-t", type=int, default=10,
help="Run curation every N turns (default: 10)")
parser.add_argument("--execute", "-e", action="store_true",
help="Execute curation")
parser.add_argument("--dry-run", "-n", action="store_true",
help="Preview what would be curated")
parser.add_argument("--status", "-s", action="store_true",
help="Show current turn status")
args = parser.parse_args()
# Load state
state = load_state()
current_turn = get_current_turn_count()
turns_since = current_turn - state["last_turn"]
if args.status:
print(f"Current turn: {current_turn}")
print(f"Last curation: {state['last_turn']}")
print(f"Turns since last curation: {turns_since}")
print(f"Threshold: {args.threshold}")
print(f"Ready to curate: {'YES' if turns_since >= args.threshold else 'NO'}")
return
print(f"Turn-based Curator")
print(f"Current turn: {current_turn}")
print(f"Last curation: {state['last_turn']}")
print(f"Turns since: {turns_since}")
print(f"Threshold: {args.threshold}")
print()
if turns_since < args.threshold:
print(f"Not enough turns. Need {args.threshold}, have {turns_since}")
return
# Get turns to process
print(f"Fetching {turns_since} turns...")
turns = get_turns_since(state["last_turn"], limit=turns_since + 10)
if not turns:
print("No new turns found")
return
# Build conversation text
conversation_parts = []
for turn in turns:
role = turn.get("payload", {}).get("role", "unknown")
content = turn.get("payload", {}).get("content", "")
conversation_parts.append(f"{role.upper()}: {content}")
conversation_text = "\n\n".join(conversation_parts)
print(f"Processing {len(turns)} turns ({len(conversation_text)} chars)")
print()
if args.dry_run:
print("=== CONVERSATION TEXT ===")
print(conversation_text[:500] + "..." if len(conversation_text) > 500 else conversation_text)
print()
print("[DRY RUN] Would extract gems and store to gems_tr")
return
if not args.execute:
print("Use --execute to run curation or --dry-run to preview")
return
# Extract gems
print("Extracting gems with LLM...")
gems = extract_gems_with_llm(conversation_text)
if not gems:
print("No gems extracted")
return
print(f"Extracted {len(gems)} gems:")
for i, gem in enumerate(gems, 1):
print(f" {i}. [{gem.get('category', 'other')}] {gem['text'][:80]}...")
print()
# Store gems
print("Storing gems...")
success = 0
for gem in gems:
if store_gem(gem):
success += 1
# Update state
state["last_turn"] = current_turn
state["last_curation"] = datetime.now(timezone.utc).isoformat()
save_state(state)
print(f"Done! Stored {success}/{len(gems)} gems")
if __name__ == "__main__":
main()

View File

@@ -1,85 +0,0 @@
#!/usr/bin/env python3
"""
Migration: Add 'curated: false' to existing memories_tr entries.
Run once to update all existing memories for the new timer curator.
Uses POST /collections/{name}/points/payload with {"points": [ids], "payload": {...}}
"""
import requests
import time
import sys
QDRANT_URL = "http://10.0.0.40:6333"
COLLECTION = "memories_tr"
def update_existing_memories():
"""Add curated=false to all memories that don't have the field."""
print("🔧 Migrating existing memories...")
offset = None
updated = 0
batch_size = 100
max_iterations = 200
iterations = 0
while iterations < max_iterations:
iterations += 1
scroll_data = {
"limit": batch_size,
"with_payload": True
}
if offset:
scroll_data["offset"] = offset
try:
response = requests.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll",
json=scroll_data,
headers={"Content-Type": "application/json"},
timeout=30
)
response.raise_for_status()
result = response.json()
points = result.get("result", {}).get("points", [])
if not points:
break
# Collect IDs that need curated=false
ids_to_update = []
for point in points:
payload = point.get("payload", {})
if "curated" not in payload:
ids_to_update.append(point["id"])
if ids_to_update:
# POST /points/payload with {"points": [ids], "payload": {...}}
update_response = requests.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/payload",
json={
"points": ids_to_update,
"payload": {"curated": False}
},
timeout=30
)
update_response.raise_for_status()
updated += len(ids_to_update)
print(f" Updated batch: {len(ids_to_update)} memories (total: {updated})")
time.sleep(0.05)
offset = result.get("result", {}).get("next_page_offset")
if not offset:
break
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
break
print(f"✅ Migration complete: {updated} memories updated with curated=false")
if __name__ == "__main__":
update_existing_memories()

View File

@@ -1,14 +0,0 @@
[Unit]
Description=TrueRecall Turn-Based Curator (every 10 turns)
After=network.target mem-qdrant-watcher.service
[Service]
Type=simple
User=root
WorkingDirectory=/root/.openclaw/workspace/.projects/true-recall-v2/tr-continuous
ExecStart=/usr/bin/python3 /root/.openclaw/workspace/.projects/true-recall-v2/tr-continuous/curator_turn_based.py --threshold 10 --execute
Restart=on-failure
RestartSec=60
[Install]
WantedBy=multi-user.target

View File

@@ -1,358 +0,0 @@
#!/usr/bin/env python3
"""
True-Recall v2 Curator: Reads from Qdrant kimi_memories
Reads 24 hours of conversation from Qdrant kimi_memories collection,
extracts contextual gems using qwen3, stores to Qdrant gems_tr with mxbai embeddings.
Usage:
python curate_from_qdrant.py --user-id rob
python curate_from_qdrant.py --user-id rob --date 2026-02-23
"""
import json
import argparse
import requests
import urllib.request
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional
import hashlib
# Configuration
QDRANT_URL = "http://10.0.0.40:6333"
SOURCE_COLLECTION = "memories_tr"
TARGET_COLLECTION = "gems_tr"
OLLAMA_URL = "http://10.0.0.10:11434"
EMBEDDING_MODEL = "mxbai-embed-large"
CURATION_MODEL = "qwen3:4b-instruct"
# Load curator prompt
CURATOR_PROMPT_PATH = "/root/.openclaw/workspace/.projects/true-recall/curator-prompt.md"
def load_curator_prompt() -> str:
"""Load the curator system prompt."""
try:
with open(CURATOR_PROMPT_PATH, 'r') as f:
return f.read()
except FileNotFoundError:
# Fallback to v2 location
CURATOR_PROMPT_PATH_V2 = "/root/.openclaw/workspace/.projects/true-recall-v2/curator-prompt.md"
with open(CURATOR_PROMPT_PATH_V2, 'r') as f:
return f.read()
def get_turns_from_qdrant(user_id: str, date_str: str) -> List[Dict[str, Any]]:
"""
Get all conversation turns from Qdrant for a specific user and date.
Returns turns sorted by conversation_id and turn_number.
"""
# Build filter for user_id and date
filter_data = {
"must": [
{"key": "user_id", "match": {"value": user_id}},
{"key": "date", "match": {"value": date_str}}
]
}
# Use scroll API to get all matching points
all_points = []
offset = None
max_iterations = 100 # Safety limit
iterations = 0
while iterations < max_iterations:
iterations += 1
scroll_data = {
"limit": 100,
"with_payload": True,
"filter": filter_data
}
if offset:
scroll_data["offset"] = offset
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{SOURCE_COLLECTION}/points/scroll",
data=json.dumps(scroll_data).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=30) as response:
result = json.loads(response.read().decode())
points = result.get("result", {}).get("points", [])
if not points:
break
all_points.extend(points)
# Check if there's more
offset = result.get("result", {}).get("next_page_offset")
if not offset:
break
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"⚠️ Collection {SOURCE_COLLECTION} not found")
return []
raise
# Convert points to turn format (harvested summaries)
turns = []
for point in all_points:
payload = point.get("payload", {})
# Extract user and AI messages
user_msg = payload.get("user_message", "")
ai_msg = payload.get("ai_response", "")
# Get timestamp from created_at
created_at = payload.get("created_at", "")
turn = {
"turn": payload.get("turn_number", 0),
"user_id": payload.get("user_id", user_id),
"user": user_msg,
"ai": ai_msg,
"conversation_id": payload.get("conversation_id", ""),
"session_id": payload.get("session_id", ""),
"timestamp": created_at,
"date": payload.get("date", date_str),
"content_hash": payload.get("content_hash", "")
}
# Skip if no content
if turn["user"] or turn["ai"]:
turns.append(turn)
# Sort by conversation_id, then by turn number
turns.sort(key=lambda x: (x.get("conversation_id", ""), x.get("turn", 0)))
return turns
def extract_gems_with_curator(turns: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Use qwen3 to extract gems from conversation turns."""
if not turns:
return []
prompt = load_curator_prompt()
# Build the conversation input
conversation_json = json.dumps(turns, indent=2)
# Call Ollama with native system prompt
response = requests.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": CURATION_MODEL,
"system": prompt,
"prompt": f"## Input Conversation\n\n```json\n{conversation_json}\n```\n\n## Output\n",
"stream": False,
"options": {
"temperature": 0.1,
"num_predict": 4000
}
}
)
if response.status_code != 200:
raise RuntimeError(f"Curation failed: {response.text}")
result = response.json()
output = result.get('response', '').strip()
# Extract JSON from output (handle markdown code blocks)
if '```json' in output:
output = output.split('```json')[1].split('```')[0].strip()
elif '```' in output:
output = output.split('```')[1].split('```')[0].strip()
try:
# Extract JSON array - find first [ and last ]
start_idx = output.find('[')
end_idx = output.rfind(']')
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
output = output[start_idx:end_idx+1]
gems = json.loads(output)
if not isinstance(gems, list):
print(f"Warning: Curator returned non-list, wrapping: {type(gems)}")
gems = [gems] if gems else []
return gems
except json.JSONDecodeError as e:
print(f"Error parsing curator output: {e}")
print(f"Raw output: {output[:500]}...")
return []
def get_embedding(text: str) -> List[float]:
"""Get embedding vector from Ollama using mxbai-embed-large."""
response = requests.post(
f"{OLLAMA_URL}/api/embeddings",
json={
"model": EMBEDDING_MODEL,
"prompt": text
}
)
if response.status_code != 200:
raise RuntimeError(f"Embedding failed: {response.text}")
return response.json()['embedding']
def get_gem_id(gem: Dict[str, Any], user_id: str) -> int:
"""Generate deterministic integer ID for a gem."""
hash_bytes = hashlib.sha256(
f"{user_id}:{gem.get('conversation_id', '')}:{gem.get('turn_range', '')}".encode()
).digest()[:8]
return int.from_bytes(hash_bytes, byteorder='big') % (2**63)
def check_duplicate(gem: Dict[str, Any], user_id: str) -> bool:
"""Check if a similar gem already exists in gems_tr."""
gem_id = get_gem_id(gem, user_id)
# Check if point exists
try:
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{TARGET_COLLECTION}/points/{gem_id}",
headers={"Content-Type": "application/json"},
method="GET"
)
with urllib.request.urlopen(req, timeout=10) as response:
return True # Point exists
except urllib.error.HTTPError as e:
if e.code == 404:
return False # Point doesn't exist
raise
def store_gem_to_qdrant(gem: Dict[str, Any], user_id: str) -> bool:
"""Store a gem to Qdrant with embedding."""
# Create embedding from gem text
embedding_text = f"{gem.get('gem', '')} {gem.get('context', '')} {gem.get('snippet', '')}"
vector = get_embedding(embedding_text)
# Prepare payload
payload = {
"user_id": user_id,
**gem
}
# Generate deterministic integer ID
gem_id = get_gem_id(gem, user_id)
# Store to Qdrant
response = requests.put(
f"{QDRANT_URL}/collections/{TARGET_COLLECTION}/points",
json={
"points": [{
"id": gem_id,
"vector": vector,
"payload": payload
}]
}
)
return response.status_code == 200
def main():
parser = argparse.ArgumentParser(description="True-Recall Curator v2 - Reads from Qdrant")
parser.add_argument("--user-id", required=True, help="User ID to process")
parser.add_argument("--date", help="Specific date to process (YYYY-MM-DD), defaults to yesterday")
parser.add_argument("--dry-run", action="store_true", help="Don't store, just preview")
args = parser.parse_args()
# Determine date (yesterday by default)
if args.date:
date_str = args.date
else:
yesterday = datetime.now() - timedelta(days=1)
date_str = yesterday.strftime("%Y-%m-%d")
print(f"🔍 True-Recall Curator v2 for {args.user_id}")
print(f"📅 Processing date: {date_str}")
print(f"🧠 Embedding model: {EMBEDDING_MODEL}")
print(f"💎 Target collection: {TARGET_COLLECTION}")
print()
# Get turns from Qdrant
print(f"📥 Fetching conversation turns from {SOURCE_COLLECTION}...")
turns = get_turns_from_qdrant(args.user_id, date_str)
print(f"✅ Found {len(turns)} turns")
if not turns:
print("⚠️ No turns to process. Exiting.")
return
# Show sample
print("\n📄 Sample turns:")
for i, turn in enumerate(turns[:3], 1):
user_msg = turn.get("user", "")[:60]
ai_msg = turn.get("ai", "")[:60]
print(f" Turn {turn.get('turn')}: User: {user_msg}...")
print(f" AI: {ai_msg}...")
if len(turns) > 3:
print(f" ... and {len(turns) - 3} more")
# Extract gems
print("\n🧠 Extracting gems with The Curator (qwen3)...")
gems = extract_gems_with_curator(turns)
print(f"✅ Extracted {len(gems)} gems")
if not gems:
print("⚠️ No gems extracted. Exiting.")
return
# Preview gems
print("\n💎 Preview of extracted gems:")
for i, gem in enumerate(gems[:3], 1):
print(f"\n--- Gem {i} ---")
print(f"Gem: {gem.get('gem', 'N/A')[:100]}...")
print(f"Categories: {gem.get('categories', [])}")
print(f"Importance: {gem.get('importance', 'N/A')}")
print(f"Confidence: {gem.get('confidence', 'N/A')}")
if len(gems) > 3:
print(f"\n... and {len(gems) - 3} more gems")
if args.dry_run:
print("\n🏃 DRY RUN: Not storing gems.")
return
# Check for duplicates and store
print("\n💾 Storing gems to Qdrant...")
stored = 0
skipped = 0
failed = 0
for gem in gems:
# Check for duplicates
if check_duplicate(gem, args.user_id):
print(f" ⏭️ Skipping duplicate: {gem.get('gem', 'N/A')[:50]}...")
skipped += 1
continue
if store_gem_to_qdrant(gem, args.user_id):
stored += 1
else:
print(f" ⚠️ Failed to store gem: {gem.get('gem', 'N/A')[:50]}...")
failed += 1
print(f"\n✅ Stored: {stored}")
print(f"⏭️ Skipped (duplicates): {skipped}")
print(f"❌ Failed: {failed}")
print("\n🎉 Curation complete!")
if __name__ == "__main__":
main()