commit ae853172b7ad2eafde6ea0e3f08b697ca77ea46d Author: Pim van Pelt Date: Fri Aug 1 16:17:29 2025 +0200 initial checking; monitor list, maintenance {add,list,delete} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c4727b4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +bin +include +lib +lib64 +pyvenv.cfg +__pycache__ diff --git a/kuma_client.py b/kuma_client.py new file mode 100755 index 0000000..2be76ec --- /dev/null +++ b/kuma_client.py @@ -0,0 +1,564 @@ +#!/usr/bin/env python3 + +import argparse +import fnmatch +import os +import re +import sys +from datetime import datetime, timedelta +from uptime_kuma_api import UptimeKumaApi + + +class KumaClient: + def __init__(self, url, username=None, password=None): + self.url = url + self.username = username + self.password = password + self.api = None + + def parse_duration(self, duration_str): + """Parse duration string like '90m', '1h', '3600s' into seconds""" + if not duration_str: + return 90 * 60 # Default 90 minutes in seconds + + match = re.match(r"^(\d+)([smh])$", duration_str.lower()) + if not match: + raise ValueError( + f"Invalid duration format: {duration_str}. Use format like '90m', '1h', '3600s'" + ) + + value, unit = match.groups() + value = int(value) + + if unit == "s": + return value + elif unit == "m": + return value * 60 + elif unit == "h": + return value * 3600 + + raise ValueError(f"Invalid duration unit: {unit}") + + def parse_start_time(self, start_str): + """Parse start time string or use current time if None""" + if not start_str: + return datetime.utcnow() + + # Try to parse ISO format first + try: + return datetime.fromisoformat(start_str.replace("Z", "+00:00")).replace( + tzinfo=None + ) + except ValueError: + pass + + # Try common formats + formats = [ + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d", + ] + + for fmt in formats: + try: + return datetime.strptime(start_str, fmt) + except ValueError: + continue + + raise ValueError( + f"Invalid start time format: {start_str}. Use ISO format or YYYY-MM-DD HH:MM:SS" + ) + + def connect(self): + """Connect and login to Uptime Kuma""" + try: + self.api = UptimeKumaApi(self.url) + if self.username and self.password: + result = self.api.login(self.username, self.password) + print(f"Connected to {self.url}") + return True + except Exception as e: + print(f"Failed to connect: {e}") + return False + + def disconnect(self): + """Disconnect from Uptime Kuma""" + if self.api: + self.api.disconnect() + + def list_monitors(self, monitor_patterns=None, group_patterns=None): + """List all monitors or filter by patterns and/or groups""" + try: + all_monitors = self.api.get_monitors() + if not all_monitors: + print("No monitors found") + return + + monitors = all_monitors[:] # Start with all monitors + + # Filter by group if specified + if group_patterns: + group_monitors = self.get_monitors_in_groups(group_patterns) + if not group_monitors: + print("No monitors found in the specified groups") + return + monitor_ids = {m["id"] for m in group_monitors} + monitors = [m for m in monitors if m.get("id") in monitor_ids] + + # Filter monitors by name patterns if specified + if monitor_patterns: + matched_monitors = self.find_monitors_by_pattern(monitor_patterns) + if not matched_monitors: + print("No monitors found matching the specified patterns") + return + pattern_monitor_ids = {m["id"] for m in matched_monitors} + monitors = [m for m in monitors if m.get("id") in pattern_monitor_ids] + + # Display results with group information + print( + f"{'ID':<5} {'Name':<25} {'Type':<12} {'Group':<20} {'URL':<35} {'Status':<10}" + ) + print("-" * 112) + + for monitor in monitors: + monitor_id = monitor.get("id", "N/A") + name = monitor.get("name", "N/A") + monitor_type = monitor.get("type", "N/A") + url = monitor.get("url", "N/A") + active = "Active" if monitor.get("active") else "Inactive" + + # Find parent group name + parent_id = monitor.get("parent") + parent_name = "None" + if parent_id: + parent_monitor = next( + (m for m in all_monitors if m.get("id") == parent_id), None + ) + if parent_monitor: + parent_name = parent_monitor.get("name", f"Group {parent_id}") + + print( + f"{monitor_id:<5} {name:<25} {monitor_type:<12} {parent_name:<20} {url:<35} {active:<10}" + ) + + except Exception as e: + print(f"Error listing monitors: {e}") + + def list_maintenances(self): + """List all maintenances""" + try: + maintenances = self.api.get_maintenances() + if not maintenances: + print("No maintenances found") + return + + print( + f"{'ID':<5} {'Title':<30} {'Strategy':<15} {'Active':<10} {'Description':<50}" + ) + print("-" * 115) + + for maintenance in maintenances: + maintenance_id = maintenance.get("id", "N/A") + title = maintenance.get("title", "N/A") + strategy = maintenance.get("strategy", "N/A") + active = "Active" if maintenance.get("active") else "Inactive" + description = maintenance.get("description", "N/A") + + print( + f"{maintenance_id:<5} {title:<30} {strategy:<15} {active:<10} {description:<50}" + ) + + except Exception as e: + print(f"Error listing maintenances: {e}") + + def find_monitors_by_pattern(self, patterns): + """Find monitor IDs by name patterns (case insensitive, supports wildcards)""" + try: + monitors = self.api.get_monitors() + matched_monitors = [] + + for pattern in patterns: + pattern_lower = pattern.lower() + for monitor in monitors: + monitor_name = monitor.get("name", "").lower() + if fnmatch.fnmatch(monitor_name, pattern_lower): + matched_monitors.append( + {"id": monitor.get("id"), "name": monitor.get("name")} + ) + + # Remove duplicates while preserving order + seen = set() + unique_monitors = [] + for monitor in matched_monitors: + if monitor["id"] not in seen: + seen.add(monitor["id"]) + unique_monitors.append(monitor) + + return unique_monitors + + except Exception as e: + print(f"Error finding monitors: {e}") + return [] + + def find_groups_by_pattern(self, patterns): + """Find group IDs by name patterns (case insensitive, supports wildcards)""" + try: + monitors = self.api.get_monitors() + groups = [m for m in monitors if m.get("type") == "group"] + matched_groups = [] + + for pattern in patterns: + pattern_lower = pattern.lower() + for group in groups: + group_name = group.get("name", "").lower() + if fnmatch.fnmatch(group_name, pattern_lower): + matched_groups.append( + {"id": group.get("id"), "name": group.get("name")} + ) + + # Remove duplicates while preserving order + seen = set() + unique_groups = [] + for group in matched_groups: + if group["id"] not in seen: + seen.add(group["id"]) + unique_groups.append(group) + + return unique_groups + + except Exception as e: + print(f"Error finding groups: {e}") + return [] + + def get_monitors_in_groups(self, group_patterns): + """Get all monitors that belong to specified groups""" + try: + monitors = self.api.get_monitors() + matched_groups = self.find_groups_by_pattern(group_patterns) + + if not matched_groups: + return [] + + print(f"Found {len(matched_groups)} matching groups:") + for group in matched_groups: + print(f" - {group['name']} (ID: {group['id']})") + + group_ids = {g["id"] for g in matched_groups} + group_members = [] + + for monitor in monitors: + # Check if monitor's parent is in our group list + if monitor.get("parent") in group_ids: + group_members.append(monitor) + + return group_members + + except Exception as e: + print(f"Error getting group members: {e}") + return [] + + def add_maintenance( + self, + title, + description, + start_time=None, + duration="90m", + monitor_patterns=None, + group_patterns=None, + ): + """Add a new maintenance""" + try: + # Check if we have either monitor patterns or group patterns + if not monitor_patterns and not group_patterns: + print( + "Error: Either --monitor or --group flag is required. Specify at least one pattern." + ) + return + + matched_monitors = [] + + # Find monitors by patterns if specified + if monitor_patterns: + pattern_monitors = self.find_monitors_by_pattern(monitor_patterns) + matched_monitors.extend(pattern_monitors) + + # Find monitors by groups if specified + if group_patterns: + group_monitors = self.get_monitors_in_groups(group_patterns) + # Convert to the same format as find_monitors_by_pattern + group_monitor_objs = [ + {"id": m.get("id"), "name": m.get("name")} for m in group_monitors + ] + matched_monitors.extend(group_monitor_objs) + + # Remove duplicates while preserving order + seen = set() + unique_monitors = [] + for monitor in matched_monitors: + if monitor["id"] not in seen: + seen.add(monitor["id"]) + unique_monitors.append(monitor) + + matched_monitors = unique_monitors + + if not matched_monitors: + print( + "Error: No monitors found matching the specified patterns or groups" + ) + return + + print(f"Found {len(matched_monitors)} matching monitors:") + for monitor in matched_monitors: + print(f" - {monitor['name']} (ID: {monitor['id']})") + + # Set default description if not provided + if not description: + monitor_names = [monitor["name"] for monitor in matched_monitors] + description = "Maintenance on:\n" + "\n".join(monitor_names) + + # Parse start time and duration + start_dt = self.parse_start_time(start_time) + duration_seconds = self.parse_duration(duration) + end_dt = start_dt + timedelta(seconds=duration_seconds) + + print( + f"Maintenance window: {start_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} - {end_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} ({duration})" + ) + + # Create the maintenance with single strategy and date range + maintenance_data = { + "title": title, + "description": description, + "strategy": "single", + "active": True, + "dateRange": [ + start_dt.strftime("%Y-%m-%d %H:%M:%S"), + end_dt.strftime("%Y-%m-%d %H:%M:%S"), + ], + } + + result = self.api.add_maintenance(**maintenance_data) + maintenance_id = result.get("maintenanceID") + + # Add monitors to maintenance + if maintenance_id: + print(f"Maintenance added successfully: {result}") + + # Prepare monitors list in the correct format for the API + monitors_list = [{"id": monitor["id"]} for monitor in matched_monitors] + + try: + result = self.api.add_monitor_maintenance( + maintenance_id, monitors_list + ) + print( + f"Successfully added {len(matched_monitors)} monitors to maintenance" + ) + print(f"API response: {result}") + except Exception as e: + print(f"Error: Failed to add monitors to maintenance: {e}") + print( + "This might be due to API compatibility issues or server configuration" + ) + + except Exception as e: + print(f"Error adding maintenance: {e}") + + def delete_maintenance(self, maintenance_id=None, delete_all=False): + """Delete a specific maintenance or all maintenances""" + try: + if delete_all: + # Get all maintenances first + maintenances = self.api.get_maintenances() + if not maintenances: + print("No maintenances found to delete") + return + + print(f"Found {len(maintenances)} maintenances to delete:") + for maintenance in maintenances: + print( + f" - {maintenance.get('title', 'N/A')} (ID: {maintenance.get('id', 'N/A')})" + ) + + # Delete all maintenances + deleted_count = 0 + for maintenance in maintenances: + try: + result = self.api.delete_maintenance(maintenance.get("id")) + print( + f"Deleted maintenance '{maintenance.get('title', 'N/A')}' (ID: {maintenance.get('id')})" + ) + deleted_count += 1 + except Exception as e: + print( + f"Failed to delete maintenance '{maintenance.get('title', 'N/A')}': {e}" + ) + + print( + f"Successfully deleted {deleted_count} out of {len(maintenances)} maintenances" + ) + + elif maintenance_id: + # Delete specific maintenance + try: + # First get the maintenance info for confirmation + maintenance = self.api.get_maintenance(maintenance_id) + maintenance_title = maintenance.get("title", "N/A") + + result = self.api.delete_maintenance(maintenance_id) + print( + f"Successfully deleted maintenance '{maintenance_title}' (ID: {maintenance_id})" + ) + print(f"API response: {result}") + + except Exception as e: + print(f"Failed to delete maintenance ID {maintenance_id}: {e}") + else: + print( + "Error: Either --id or --all flag is required for delete operation" + ) + + except Exception as e: + print(f"Error during maintenance deletion: {e}") + + +def main(): + parser = argparse.ArgumentParser(description="Uptime Kuma CLI Client") + parser.add_argument( + "--url", help="Uptime Kuma server URL (can also use KUMA_URL env var)" + ) + parser.add_argument( + "--username", + help="Username for authentication (can also use KUMA_USERNAME env var)", + ) + parser.add_argument( + "--password", + help="Password for authentication (can also use KUMA_PASSWORD env var)", + ) + + subparsers = parser.add_subparsers(dest="resource", help="Resource to operate on") + + # Monitor commands + monitor_parser = subparsers.add_parser("monitor", help="Monitor operations") + monitor_subparsers = monitor_parser.add_subparsers( + dest="monitor_action", help="Monitor actions" + ) + list_monitors_parser = monitor_subparsers.add_parser( + "list", help="List all monitors" + ) + list_monitors_parser.add_argument( + "--monitor", + action="append", + help="Monitor name pattern to filter by (supports wildcards, can be repeated)", + ) + list_monitors_parser.add_argument( + "--group", + action="append", + help="Group name pattern to filter by (supports wildcards, can be repeated)", + ) + + # Maintenance commands + maintenance_parser = subparsers.add_parser( + "maintenance", help="Maintenance operations" + ) + maintenance_subparsers = maintenance_parser.add_subparsers( + dest="maintenance_action", help="Maintenance actions" + ) + maintenance_subparsers.add_parser("list", help="List all maintenances") + + # Delete maintenance command + delete_maintenance_parser = maintenance_subparsers.add_parser( + "delete", help="Delete maintenance(s)" + ) + delete_maintenance_parser.add_argument( + "--id", type=int, help="Maintenance ID to delete" + ) + delete_maintenance_parser.add_argument( + "--all", action="store_true", help="Delete all maintenances" + ) + + # Add maintenance command + add_maintenance_parser = maintenance_subparsers.add_parser( + "add", help="Add a new maintenance" + ) + add_maintenance_parser.add_argument( + "--title", + help='Maintenance title (defaults to "Update started on ")', + ) + add_maintenance_parser.add_argument("--description", help="Maintenance description") + add_maintenance_parser.add_argument( + "--start", + help="Start time (defaults to now, format: YYYY-MM-DD HH:MM:SS or ISO format)", + ) + add_maintenance_parser.add_argument( + "--duration", + default="90m", + help="Duration (default: 90m, format: 3600s, 1h, 60m)", + ) + add_maintenance_parser.add_argument( + "--monitor", + action="append", + help="Monitor name pattern to add to maintenance (supports wildcards like *NextCloud*, can be repeated)", + ) + add_maintenance_parser.add_argument( + "--group", + action="append", + help="Group name pattern to add all group members to maintenance (supports wildcards, can be repeated)", + ) + + args = parser.parse_args() + + if not args.resource: + parser.print_help() + sys.exit(1) + + # Get configuration from environment variables or command line args + url = args.url or os.getenv("KUMA_URL") + username = args.username or os.getenv("KUMA_USERNAME") + password = args.password or os.getenv("KUMA_PASSWORD") + + if not url: + print( + "Error: URL is required. Provide --url or set KUMA_URL environment variable." + ) + sys.exit(1) + + client = KumaClient(url, username, password) + + if not client.connect(): + sys.exit(1) + + try: + if args.resource == "monitor": + if args.monitor_action == "list": + client.list_monitors( + monitor_patterns=args.monitor, group_patterns=args.group + ) + else: + monitor_parser.print_help() + elif args.resource == "maintenance": + if args.maintenance_action == "list": + client.list_maintenances() + elif args.maintenance_action == "add": + title = ( + args.title + or f"Update started on {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}" + ) + client.add_maintenance( + title, + args.description, + args.start, + args.duration, + monitor_patterns=args.monitor, + group_patterns=args.group, + ) + elif args.maintenance_action == "delete": + client.delete_maintenance(maintenance_id=args.id, delete_all=args.all) + else: + maintenance_parser.print_help() + finally: + client.disconnect() + + +if __name__ == "__main__": + main()