#!/usr/bin/env python3 import argparse import fnmatch import os import re import sys from datetime import datetime, timedelta from uptime_kuma_api import UptimeKumaApi class KumaClient: def __init__(self, url, username=None, password=None): self.url = url self.username = username self.password = password self.api = None def parse_duration(self, duration_str): """Parse duration string like '90m', '1h', '3600s' into seconds""" if not duration_str: return 90 * 60 # Default 90 minutes in seconds match = re.match(r"^(\d+)([smh])$", duration_str.lower()) if not match: raise ValueError( f"Invalid duration format: {duration_str}. Use format like '90m', '1h', '3600s'" ) value, unit = match.groups() value = int(value) if unit == "s": return value elif unit == "m": return value * 60 elif unit == "h": return value * 3600 raise ValueError(f"Invalid duration unit: {unit}") def parse_start_time(self, start_str): """Parse start time string or use current time if None""" if not start_str: return datetime.utcnow() # Try to parse ISO format first try: return datetime.fromisoformat(start_str.replace("Z", "+00:00")).replace( tzinfo=None ) except ValueError: pass # Try common formats formats = [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d", ] for fmt in formats: try: return datetime.strptime(start_str, fmt) except ValueError: continue raise ValueError( f"Invalid start time format: {start_str}. Use ISO format or YYYY-MM-DD HH:MM:SS" ) def connect(self): """Connect and login to Uptime Kuma""" try: self.api = UptimeKumaApi(self.url) if self.username and self.password: result = self.api.login(self.username, self.password) print(f"Connected to {self.url}") return True except Exception as e: print(f"Failed to connect: {e}") return False def disconnect(self): """Disconnect from Uptime Kuma""" if self.api: self.api.disconnect() def list_monitors(self, monitor_patterns=None, group_patterns=None): """List all monitors or filter by patterns and/or groups""" try: all_monitors = self.api.get_monitors() if not all_monitors: print("No monitors found") return monitors = all_monitors[:] # Start with all monitors # Filter by group if specified if group_patterns: group_monitors = self.get_monitors_in_groups(group_patterns) if not group_monitors: print("No monitors found in the specified groups") return monitor_ids = {m["id"] for m in group_monitors} monitors = [m for m in monitors if m.get("id") in monitor_ids] # Filter monitors by name patterns if specified if monitor_patterns: matched_monitors = self.find_monitors_by_pattern(monitor_patterns) if not matched_monitors: print("No monitors found matching the specified patterns") return pattern_monitor_ids = {m["id"] for m in matched_monitors} monitors = [m for m in monitors if m.get("id") in pattern_monitor_ids] # Display results with group information print( f"{'ID':<5} {'Name':<25} {'Type':<12} {'Group':<20} {'URL':<35} {'Status':<10}" ) print("-" * 112) for monitor in monitors: monitor_id = monitor.get("id", "N/A") name = monitor.get("name", "N/A") monitor_type = monitor.get("type", "N/A") url = monitor.get("url", "N/A") active = "Active" if monitor.get("active") else "Inactive" # Find parent group name parent_id = monitor.get("parent") parent_name = "None" if parent_id: parent_monitor = next( (m for m in all_monitors if m.get("id") == parent_id), None ) if parent_monitor: parent_name = parent_monitor.get("name", f"Group {parent_id}") print( f"{monitor_id:<5} {name:<25} {monitor_type:<12} {parent_name:<20} {url:<35} {active:<10}" ) except Exception as e: print(f"Error listing monitors: {e}") def list_maintenances(self): """List all maintenances""" try: maintenances = self.api.get_maintenances() if not maintenances: print("No maintenances found") return print( f"{'ID':<5} {'Title':<30} {'Strategy':<15} {'Active':<10} {'Description':<50}" ) print("-" * 115) for maintenance in maintenances: maintenance_id = maintenance.get("id", "N/A") title = maintenance.get("title", "N/A") strategy = maintenance.get("strategy", "N/A") active = "Active" if maintenance.get("active") else "Inactive" description = maintenance.get("description", "N/A") print( f"{maintenance_id:<5} {title:<30} {strategy:<15} {active:<10} {description:<50}" ) except Exception as e: print(f"Error listing maintenances: {e}") def find_monitors_by_pattern(self, patterns): """Find monitor IDs by name patterns (case insensitive, supports wildcards)""" try: monitors = self.api.get_monitors() matched_monitors = [] for pattern in patterns: pattern_lower = pattern.lower() for monitor in monitors: monitor_name = monitor.get("name", "").lower() if fnmatch.fnmatch(monitor_name, pattern_lower): matched_monitors.append( {"id": monitor.get("id"), "name": monitor.get("name")} ) # Remove duplicates while preserving order seen = set() unique_monitors = [] for monitor in matched_monitors: if monitor["id"] not in seen: seen.add(monitor["id"]) unique_monitors.append(monitor) return unique_monitors except Exception as e: print(f"Error finding monitors: {e}") return [] def find_groups_by_pattern(self, patterns): """Find group IDs by name patterns (case insensitive, supports wildcards)""" try: monitors = self.api.get_monitors() groups = [m for m in monitors if m.get("type") == "group"] matched_groups = [] for pattern in patterns: pattern_lower = pattern.lower() for group in groups: group_name = group.get("name", "").lower() if fnmatch.fnmatch(group_name, pattern_lower): matched_groups.append( {"id": group.get("id"), "name": group.get("name")} ) # Remove duplicates while preserving order seen = set() unique_groups = [] for group in matched_groups: if group["id"] not in seen: seen.add(group["id"]) unique_groups.append(group) return unique_groups except Exception as e: print(f"Error finding groups: {e}") return [] def get_monitors_in_groups(self, group_patterns): """Get all monitors that belong to specified groups""" try: monitors = self.api.get_monitors() matched_groups = self.find_groups_by_pattern(group_patterns) if not matched_groups: return [] print(f"Found {len(matched_groups)} matching groups:") for group in matched_groups: print(f" - {group['name']} (ID: {group['id']})") group_ids = {g["id"] for g in matched_groups} group_members = [] for monitor in monitors: # Check if monitor's parent is in our group list if monitor.get("parent") in group_ids: group_members.append(monitor) return group_members except Exception as e: print(f"Error getting group members: {e}") return [] def add_maintenance( self, title, description, start_time=None, duration="90m", monitor_patterns=None, group_patterns=None, ): """Add a new maintenance""" try: # Check if we have either monitor patterns or group patterns if not monitor_patterns and not group_patterns: print( "Error: Either --monitor or --group flag is required. Specify at least one pattern." ) return matched_monitors = [] # Find monitors by patterns if specified if monitor_patterns: pattern_monitors = self.find_monitors_by_pattern(monitor_patterns) matched_monitors.extend(pattern_monitors) # Find monitors by groups if specified if group_patterns: group_monitors = self.get_monitors_in_groups(group_patterns) # Convert to the same format as find_monitors_by_pattern group_monitor_objs = [ {"id": m.get("id"), "name": m.get("name")} for m in group_monitors ] matched_monitors.extend(group_monitor_objs) # Remove duplicates while preserving order seen = set() unique_monitors = [] for monitor in matched_monitors: if monitor["id"] not in seen: seen.add(monitor["id"]) unique_monitors.append(monitor) matched_monitors = unique_monitors if not matched_monitors: print( "Error: No monitors found matching the specified patterns or groups" ) return print(f"Found {len(matched_monitors)} matching monitors:") for monitor in matched_monitors: print(f" - {monitor['name']} (ID: {monitor['id']})") # Set default description if not provided if not description: monitor_names = [monitor["name"] for monitor in matched_monitors] description = "Maintenance on:\n" + "\n".join(monitor_names) # Parse start time and duration start_dt = self.parse_start_time(start_time) duration_seconds = self.parse_duration(duration) end_dt = start_dt + timedelta(seconds=duration_seconds) print( f"Maintenance window: {start_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} - {end_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} ({duration})" ) # Create the maintenance with single strategy and date range maintenance_data = { "title": title, "description": description, "strategy": "single", "active": True, "dateRange": [ start_dt.strftime("%Y-%m-%d %H:%M:%S"), end_dt.strftime("%Y-%m-%d %H:%M:%S"), ], } result = self.api.add_maintenance(**maintenance_data) maintenance_id = result.get("maintenanceID") # Add monitors to maintenance if maintenance_id: print(f"Maintenance added successfully: {result}") # Prepare monitors list in the correct format for the API monitors_list = [{"id": monitor["id"]} for monitor in matched_monitors] try: result = self.api.add_monitor_maintenance( maintenance_id, monitors_list ) print( f"Successfully added {len(matched_monitors)} monitors to maintenance" ) print(f"API response: {result}") except Exception as e: print(f"Error: Failed to add monitors to maintenance: {e}") print( "This might be due to API compatibility issues or server configuration" ) except Exception as e: print(f"Error adding maintenance: {e}") def delete_maintenance(self, maintenance_id=None, delete_all=False): """Delete a specific maintenance or all maintenances""" try: if delete_all: # Get all maintenances first maintenances = self.api.get_maintenances() if not maintenances: print("No maintenances found to delete") return print(f"Found {len(maintenances)} maintenances to delete:") for maintenance in maintenances: print( f" - {maintenance.get('title', 'N/A')} (ID: {maintenance.get('id', 'N/A')})" ) # Delete all maintenances deleted_count = 0 for maintenance in maintenances: try: result = self.api.delete_maintenance(maintenance.get("id")) print( f"Deleted maintenance '{maintenance.get('title', 'N/A')}' (ID: {maintenance.get('id')})" ) deleted_count += 1 except Exception as e: print( f"Failed to delete maintenance '{maintenance.get('title', 'N/A')}': {e}" ) print( f"Successfully deleted {deleted_count} out of {len(maintenances)} maintenances" ) elif maintenance_id: # Delete specific maintenance try: # First get the maintenance info for confirmation maintenance = self.api.get_maintenance(maintenance_id) maintenance_title = maintenance.get("title", "N/A") result = self.api.delete_maintenance(maintenance_id) print( f"Successfully deleted maintenance '{maintenance_title}' (ID: {maintenance_id})" ) print(f"API response: {result}") except Exception as e: print(f"Failed to delete maintenance ID {maintenance_id}: {e}") else: print( "Error: Either --id or --all flag is required for delete operation" ) except Exception as e: print(f"Error during maintenance deletion: {e}") def main(): parser = argparse.ArgumentParser(description="Uptime Kuma CLI Client") parser.add_argument( "--url", help="Uptime Kuma server URL (can also use KUMA_URL env var)" ) parser.add_argument( "--username", help="Username for authentication (can also use KUMA_USERNAME env var)", ) parser.add_argument( "--password", help="Password for authentication (can also use KUMA_PASSWORD env var)", ) subparsers = parser.add_subparsers(dest="resource", help="Resource to operate on") # Monitor commands monitor_parser = subparsers.add_parser("monitor", help="Monitor operations") monitor_subparsers = monitor_parser.add_subparsers( dest="monitor_action", help="Monitor actions" ) list_monitors_parser = monitor_subparsers.add_parser( "list", help="List all monitors" ) list_monitors_parser.add_argument( "--monitor", action="append", help="Monitor name pattern to filter by (supports wildcards, can be repeated)", ) list_monitors_parser.add_argument( "--group", action="append", help="Group name pattern to filter by (supports wildcards, can be repeated)", ) # Maintenance commands maintenance_parser = subparsers.add_parser( "maintenance", help="Maintenance operations" ) maintenance_subparsers = maintenance_parser.add_subparsers( dest="maintenance_action", help="Maintenance actions" ) maintenance_subparsers.add_parser("list", help="List all maintenances") # Delete maintenance command delete_maintenance_parser = maintenance_subparsers.add_parser( "delete", help="Delete maintenance(s)" ) delete_maintenance_parser.add_argument( "--id", type=int, help="Maintenance ID to delete" ) delete_maintenance_parser.add_argument( "--all", action="store_true", help="Delete all maintenances" ) # Add maintenance command add_maintenance_parser = maintenance_subparsers.add_parser( "add", help="Add a new maintenance" ) add_maintenance_parser.add_argument( "--title", help='Maintenance title (defaults to "Update started on ")', ) add_maintenance_parser.add_argument("--description", help="Maintenance description") add_maintenance_parser.add_argument( "--start", help="Start time (defaults to now, format: YYYY-MM-DD HH:MM:SS or ISO format)", ) add_maintenance_parser.add_argument( "--duration", default="90m", help="Duration (default: 90m, format: 3600s, 1h, 60m)", ) add_maintenance_parser.add_argument( "--monitor", action="append", help="Monitor name pattern to add to maintenance (supports wildcards like *NextCloud*, can be repeated)", ) add_maintenance_parser.add_argument( "--group", action="append", help="Group name pattern to add all group members to maintenance (supports wildcards, can be repeated)", ) args = parser.parse_args() if not args.resource: parser.print_help() sys.exit(1) # Get configuration from environment variables or command line args url = args.url or os.getenv("KUMA_URL") username = args.username or os.getenv("KUMA_USERNAME") password = args.password or os.getenv("KUMA_PASSWORD") if not url: print( "Error: URL is required. Provide --url or set KUMA_URL environment variable." ) sys.exit(1) client = KumaClient(url, username, password) if not client.connect(): sys.exit(1) try: if args.resource == "monitor": if args.monitor_action == "list": client.list_monitors( monitor_patterns=args.monitor, group_patterns=args.group ) else: monitor_parser.print_help() elif args.resource == "maintenance": if args.maintenance_action == "list": client.list_maintenances() elif args.maintenance_action == "add": title = ( args.title or f"Update started on {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}" ) client.add_maintenance( title, args.description, args.start, args.duration, monitor_patterns=args.monitor, group_patterns=args.group, ) elif args.maintenance_action == "delete": client.delete_maintenance(maintenance_id=args.id, delete_all=args.all) else: maintenance_parser.print_help() finally: client.disconnect() if __name__ == "__main__": main()