Files
jarvis-memory/skills/qdrant-memory/scripts/memory_decay.py

213 lines
6.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Memory decay system - handle expiration and cleanup
Usage: memory_decay.py check|cleanup
"""
import argparse
import json
import sys
import urllib.request
from datetime import datetime, timedelta
QDRANT_URL = "http://10.0.0.40:6333"
COLLECTION_NAME = "openclaw_memories"
def get_expired_memories():
"""Find memories that have passed their expiration date"""
today = datetime.now().strftime("%Y-%m-%d")
# Search for memories with expires_at <= today
search_body = {
"filter": {
"must": [
{
"key": "expires_at",
"range": {
"lte": today
}
}
]
},
"limit": 100,
"with_payload": True
}
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points/scroll",
data=json.dumps(search_body).encode(),
headers={"Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=10) as response:
result = json.loads(response.read().decode())
return result.get("result", {}).get("points", [])
except Exception as e:
print(f"Error finding expired memories: {e}", file=sys.stderr)
return []
def get_stale_memories(days=90):
"""Find memories not accessed in a long time"""
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
search_body = {
"filter": {
"must": [
{
"key": "last_accessed",
"range": {
"lte": cutoff
}
},
{
"key": "importance",
"match": {
"value": "low"
}
}
]
},
"limit": 100,
"with_payload": True
}
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points/scroll",
data=json.dumps(search_body).encode(),
headers={"Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=10) as response:
result = json.loads(response.read().decode())
return result.get("result", {}).get("points", [])
except Exception as e:
print(f"Error finding stale memories: {e}", file=sys.stderr)
return []
def delete_memory(point_id):
"""Delete a memory from Qdrant"""
delete_body = {
"points": [point_id]
}
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points/delete?wait=true",
data=json.dumps(delete_body).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=10) as response:
result = json.loads(response.read().decode())
return result.get("status") == "ok"
except Exception as e:
print(f"Error deleting memory {point_id}: {e}", file=sys.stderr)
return False
def update_access_count(point_id):
"""Increment access count for a memory"""
# This would require reading then writing the point
# Simplified: just update last_accessed
pass
def check_decay():
"""Check what memories are expired or stale"""
print("🔍 Memory Decay Check")
print("=" * 40)
expired = get_expired_memories()
print(f"\n📅 Expired memories: {len(expired)}")
for m in expired:
text = m["payload"].get("text", "")[:60]
expires = m["payload"].get("expires_at", "unknown")
print(f" [{expires}] {text}...")
stale = get_stale_memories(90)
print(f"\n🕐 Stale memories (90+ days): {len(stale)}")
for m in stale:
text = m["payload"].get("text", "")[:60]
last_access = m["payload"].get("last_accessed", "unknown")
print(f" [{last_access[:10]}] {text}...")
return expired, stale
def cleanup_memories(dry_run=True):
"""Remove expired and very stale memories"""
print("🧹 Memory Cleanup")
print("=" * 40)
if dry_run:
print("(DRY RUN - no actual deletions)")
expired = get_expired_memories()
deleted = 0
print(f"\nDeleting {len(expired)} expired memories...")
for m in expired:
point_id = m["id"]
text = m["payload"].get("text", "")[:40]
if not dry_run:
if delete_memory(point_id):
print(f" ✅ Deleted: {text}...")
deleted += 1
else:
print(f" ❌ Failed: {text}...")
else:
print(f" [would delete] {text}...")
# Only delete very stale (180 days) low-importance memories
very_stale = get_stale_memories(180)
print(f"\nDeleting {len(very_stale)} very stale (180+ days) low-importance memories...")
for m in very_stale:
point_id = m["id"]
text = m["payload"].get("text", "")[:40]
if not dry_run:
if delete_memory(point_id):
print(f" ✅ Deleted: {text}...")
deleted += 1
else:
print(f" ❌ Failed: {text}...")
else:
print(f" [would delete] {text}...")
if dry_run:
print(f"\n⚠️ This was a dry run. Use --no-dry-run to actually delete.")
else:
print(f"\n✅ Deleted {deleted} memories")
return deleted
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Memory decay management")
parser.add_argument("action", choices=["check", "cleanup", "status"])
parser.add_argument("--no-dry-run", action="store_true", help="Actually delete (default is dry run)")
parser.add_argument("--days", type=int, default=90, help="Days for stale threshold")
args = parser.parse_args()
if args.action == "check":
expired, stale = check_decay()
total = len(expired) + len(stale)
print(f"\n📊 Total decayed memories: {total}")
sys.exit(0 if total == 0 else 1)
elif args.action == "cleanup":
deleted = cleanup_memories(dry_run=not args.no_dry_run)
sys.exit(0)
elif args.action == "status":
expired, stale = check_decay()
print(f"\n📊 Decay Status")
print(f" Expired: {len(expired)}")
print(f" Stale ({args.days}+ days): {len(stale)}")
print(f" Total decayed: {len(expired) + len(stale)}")