#!/usr/bin/env python3 """ Log Monitor & Auto-Repair Script Scans system logs for errors and attempts safe auto-fixes. Runs daily at 2 AM via cron. """ import subprocess import re import sys import os from datetime import datetime, timedelta # Config LOG_HOURS = 24 # Check last 24 hours REPORT_FILE = "/tmp/log_monitor_report.txt" SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # Patterns to exclude (noise, not real errors) EXCLUDE_PATTERNS = [ r"sabnzbd", # Download manager references (not errors) r"github\.com/sabnzbd", # GitHub repo references r"functions\.(read|edit|exec) failed.*Missing required parameter", # My own tool errors r"log_monitor\.py", # Don't report on myself r"SyntaxWarning.*invalid escape sequence", # My own script warnings r'"type":"thinking"', # My internal thinking blocks r'"thinking":', # More thinking content r"The user has pasted a log of errors", # My own analysis text r"Let me respond appropriately", # My response planning r"functions\.(read|edit|exec) failed", # Tool failures in logs r"agent/embedded.*read tool called without path", # Embedded session errors r"rs_\d+", # Reasoning signature IDs r"encrypted_content", # Encrypted thinking blocks r"Missing required parameter.*newText", # My edit tool errors # Filter session log content showing file reads of this script r"content.*report\.append.*OpenClaw Logs: No errors found", # My own code appearing in logs r"file_path.*log_monitor\.py", # File operations on this script # Container-specific harmless errors r"nvidia", # NVIDIA modules not available in container r"nvidia-uvm", # NVIDIA UVM module r"nvidia-persistenced", # NVIDIA persistence daemon r"Failed to find module 'nvidia", # NVIDIA module load failure r"Failed to query NVIDIA devices", # No GPU in container r"rsyslogd.*imklog", # rsyslog kernel log issues (expected in container) r"imklog.*cannot open kernel log", # Kernel log not available r"imklog.*failed", # imklog activation failures r"activation of module imklog failed", # imklog module activation r"pam_lastlog\.so", # PAM module not in container r"PAM unable to dlopen", # PAM module load failure r"PAM adding faulty module", # PAM module error r"pam_systemd.*Failed to create session", # Session creation (expected in container) r"Failed to start motd-news\.service", # MOTD news (expected in container) ] # Known error patterns and their fixes AUTO_FIXES = { # Python module missing r"ModuleNotFoundError: No module named '([^']+)'": { "fix_cmd": "pip install {module}", "description": "Install missing Python module: {module}" }, # Permission denied on common paths r"Permission denied: (/tmp/[^\s]+)": { "fix_cmd": "chmod 755 {path}", "description": "Fix permissions on {path}" }, # Disk space issues r"No space left on device": { "fix_cmd": None, # Can't auto-fix, needs human "description": "CRITICAL: Disk full - manual cleanup required", "alert": True }, # Connection refused (services down) r"Connection refused.*:(\d+)": { "fix_cmd": None, "description": "Service on port {port} may be down - check status", "alert": True }, # Ollama connection issues r"ollama.*connection.*refused": { "fix_cmd": "systemctl restart ollama", "description": "Restart ollama service" }, # Redis connection issues r"redis.*connection.*refused": { "fix_cmd": "systemctl restart redis-server || docker restart redis", "description": "Restart Redis service" }, } def should_exclude(line): """Check if a log line should be excluded as noise""" for pattern in EXCLUDE_PATTERNS: if re.search(pattern, line, re.IGNORECASE): return True return False def run_cmd(cmd, timeout=30): """Run shell command and return output""" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout ) return result.stdout + result.stderr except Exception as e: return f"Command failed: {e}" def check_redis(): """Check Redis health using Python (redis-cli not available in container)""" try: import redis r = redis.Redis(host='10.0.0.36', port=6379, socket_timeout=5, decode_responses=True) if r.ping(): return "Redis: ✅ Connected (10.0.0.36:6379)" else: return "Redis: ❌ Ping failed" except ImportError: return "Redis: ⚠️ redis module not installed, cannot check" except Exception as e: return f"Redis: ❌ Error - {str(e)[:50]}" def get_journal_errors(): """Get errors from systemd journal (last 24h)""" since = (datetime.now() - timedelta(hours=LOG_HOURS)).strftime("%Y-%m-%d %H:%M:%S") cmd = f"journalctl --since='{since}' --priority=err --no-pager -q" output = run_cmd(cmd) # Filter out noise lines = output.strip().split('\n') filtered = [line for line in lines if line.strip() and not should_exclude(line)] return '\n'.join(filtered) if filtered else "" def get_cron_errors(): """Get cron-related errors""" cron_logs = [] # Try common cron log locations for log_path in ["/var/log/cron", "/var/log/syslog", "/var/log/messages"]: if os.path.exists(log_path): # Use proper shell escaping - pipe character needs to be in the pattern cmd = rf"grep -iE 'cron.*error|CRON.*FAILED| exited with ' {log_path} 2>/dev/null | tail -20" output = run_cmd(cmd) if output.strip(): # Filter noise lines = output.strip().split('\n') filtered = [line for line in lines if not should_exclude(line)] if filtered: cron_logs.append(f"=== {log_path} ===\n" + '\n'.join(filtered)) return "\n\n".join(cron_logs) if cron_logs else "" def get_openclaw_errors(): """Check OpenClaw session logs for errors""" # Find files with errors from last 24h, excluding this script's runs cmd = rf"find /root/.openclaw/agents -name '*.jsonl' -mtime -1 -exec grep -l 'error|Error|FAILED|Traceback' {{}} \; 2>/dev/null" files = run_cmd(cmd).strip().split("\n") errors = [] for f in files: if f and SCRIPT_DIR not in f: # Skip my own script's logs # Get recent errors from each file cmd = rf"grep -iE 'error|traceback|failed' '{f}' 2>/dev/null | tail -5" output = run_cmd(cmd) if output.strip(): # Filter noise aggressively for OpenClaw logs lines = output.strip().split('\n') filtered = [line for line in lines if not should_exclude(line)] # Additional filter: skip lines that are just me analyzing errors filtered = [line for line in filtered if not re.search(r'I (can )?see', line, re.IGNORECASE)] filtered = [line for line in filtered if not re.search(r'meta and kind of funny', line, re.IGNORECASE)] # Filter very long content blocks (file reads) filtered = [line for line in filtered if len(line) < 500] if filtered: errors.append(f"=== {os.path.basename(f)} ===\n" + '\n'.join(filtered)) return "\n\n".join(errors) if errors else "" def scan_and_fix(log_content, source_name): """Scan log content for known errors and attempt fixes""" fixes_applied = [] alerts_needed = [] # Track which fixes we've already tried (avoid duplicates) tried_fixes = set() for pattern, fix_info in AUTO_FIXES.items(): matches = re.finditer(pattern, log_content, re.IGNORECASE) for match in matches: # Extract groups if any groups = match.groups() description = fix_info["description"] fix_cmd = fix_info.get("fix_cmd") needs_alert = fix_info.get("alert", False) # Format description with extracted values if groups: for i, group in enumerate(groups): placeholder = ["module", "path", "port", "service"][i] if i < 4 else f"group{i}" description = description.replace(f"{{{placeholder}}}", group) if fix_cmd: fix_cmd = fix_cmd.replace(f"{{{placeholder}}}", group) # Skip if we already tried this exact fix fix_key = f"{description}:{fix_cmd}" if fix_key in tried_fixes: continue tried_fixes.add(fix_key) if needs_alert: alerts_needed.append({ "error": match.group(0), "description": description, "source": source_name }) elif fix_cmd: # Attempt the fix print(f"[FIXING] {description}") result = run_cmd(fix_cmd) success = "error" not in result.lower() and "failed" not in result.lower() fixes_applied.append({ "description": description, "command": fix_cmd, "success": success, "result": result[:200] if result else "OK" }) return fixes_applied, alerts_needed def main(): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") report = [f"=== Log Monitor Report: {timestamp} ===\n"] all_fixes = [] all_alerts = [] # Check service health (parallel-style in Python) print("Checking service health...") redis_status = check_redis() report.append(f"\n--- Service Health ---\n{redis_status}") # Check systemd journal print("Checking systemd journal...") journal_errors = get_journal_errors() if journal_errors: report.append(f"\n--- Systemd Journal Errors ---\n{journal_errors[:2000]}") fixes, alerts = scan_and_fix(journal_errors, "journal") all_fixes.extend(fixes) all_alerts.extend(alerts) else: report.append("\n--- Systemd Journal: No errors found ---") # Check cron logs print("Checking cron logs...") cron_errors = get_cron_errors() if cron_errors: report.append(f"\n--- Cron Errors ---\n{cron_errors[:2000]}") fixes, alerts = scan_and_fix(cron_errors, "cron") all_fixes.extend(fixes) all_alerts.extend(alerts) else: report.append("\n--- Cron Logs: No errors found ---") # Check OpenClaw logs print("Checking OpenClaw logs...") oc_errors = get_openclaw_errors() if oc_errors: report.append(f"\n--- OpenClaw Errors ---\n{oc_errors[:2000]}") fixes, alerts = scan_and_fix(oc_errors, "openclaw") all_fixes.extend(fixes) all_alerts.extend(alerts) else: report.append("\n--- OpenClaw Logs: No errors found ---") # Summarize fixes report.append(f"\n\n=== FIXES APPLIED: {len(all_fixes)} ===") for fix in all_fixes: status = "✅" if fix["success"] else "❌" report.append(f"\n{status} {fix['description']}") report.append(f" Command: {fix['command']}") if not fix["success"]: report.append(f" Result: {fix['result']}") # Summarize alerts (need human attention) if all_alerts: report.append(f"\n\n=== ALERTS NEEDING ATTENTION: {len(all_alerts)} ===") for alert in all_alerts: report.append(f"\n⚠️ {alert['description']}") report.append(f" Source: {alert['source']}") report.append(f" Error: {alert['error'][:100]}") # Save report report_text = "\n".join(report) with open(REPORT_FILE, "w") as f: f.write(report_text) # Print summary print(f"\n{report_text}") # Return non-zero if there are unhandled alerts (for cron notification) if all_alerts: print(f"\n⚠️ {len(all_alerts)} issue(s) need human attention") return 1 print("\n✅ Log check complete. All issues resolved or no errors found.") return 0 if __name__ == "__main__": sys.exit(main())