Compare commits
19 Commits
5824d1d0a7
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
cf6d8712fb | ||
|
17c31ce6c9 | ||
|
d6ecb9e4c9 | ||
|
65f732e497 | ||
|
4690d13cb4 | ||
|
41692b7dc1 | ||
|
7eafb1e68e | ||
|
11aad79720 | ||
|
64e1ca124f | ||
|
f1d10458c6 | ||
|
1e3999eee2 | ||
|
2ddcc00cda | ||
|
c1d70cd9b6 | ||
|
81c7f9c9bd | ||
|
e2e65add2e | ||
|
5eb76736cc | ||
|
3000b0799b | ||
|
3b61b13eef | ||
|
e3c6dd5513 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -1,8 +1,4 @@
|
||||
bin
|
||||
include
|
||||
lib
|
||||
lib64
|
||||
pyvenv.cfg
|
||||
__pycache__
|
||||
dist/
|
||||
kumacli.egg-info
|
||||
.pytest_cache
|
||||
|
23
Makefile
23
Makefile
@@ -1,4 +1,4 @@
|
||||
.PHONY: clean build install test help
|
||||
.PHONY: clean build install test test-deps help
|
||||
|
||||
# Default target
|
||||
help:
|
||||
@@ -6,19 +6,15 @@ help:
|
||||
@echo " clean - Remove build artifacts and cache files"
|
||||
@echo " build - Build the wheel package"
|
||||
@echo " install - Install the package in development mode"
|
||||
@echo " test - Run tests (if available)"
|
||||
@echo " test - Run the test suite"
|
||||
@echo " test-deps - Install test dependencies"
|
||||
@echo " help - Show this help message"
|
||||
|
||||
# Clean build artifacts
|
||||
clean:
|
||||
@echo "Cleaning build artifacts..."
|
||||
rm -rf build/
|
||||
rm -rf dist/
|
||||
rm -rf src/kumacli.egg-info/
|
||||
rm -rf src/kumacli/__pycache__/
|
||||
rm -rf src/kumacli/cmd/__pycache__/
|
||||
find . -name "*.pyc" -delete
|
||||
find . -name "*.pyo" -delete
|
||||
rm -rf build/ dist/ src/kumacli.egg-info/
|
||||
find . -name "*.pyc" -o -name "*.pyo" -delete
|
||||
find . -name "__pycache__" -type d -exec rm -rf {} + 2>/dev/null || true
|
||||
@echo "Clean complete."
|
||||
|
||||
@@ -33,10 +29,15 @@ install:
|
||||
@echo "Installing package in development mode..."
|
||||
pip install -e .
|
||||
|
||||
# Test the package (placeholder for when tests are added)
|
||||
# Install test dependencies
|
||||
test-deps:
|
||||
@echo "Installing test dependencies..."
|
||||
pip install -e ".[test]"
|
||||
|
||||
# Test the package
|
||||
test:
|
||||
@echo "Running tests..."
|
||||
@echo "No tests configured yet."
|
||||
python3 -m pytest tests/ -v --tb=short
|
||||
|
||||
# Rebuild and reinstall (useful during development)
|
||||
dev: clean build
|
||||
|
15
README.md
15
README.md
@@ -29,6 +29,9 @@ kumacli --url http://localhost:3001 --username admin --password password monitor
|
||||
### Monitor Commands
|
||||
|
||||
```bash
|
||||
# Show available subcommands
|
||||
kumacli monitor
|
||||
|
||||
# List all monitors
|
||||
kumacli monitor list
|
||||
|
||||
@@ -40,11 +43,23 @@ kumacli monitor list --group "production*"
|
||||
|
||||
# Combine filters
|
||||
kumacli monitor list --monitor "*api*" --group "web*"
|
||||
|
||||
# Pause monitors
|
||||
kumacli monitor pause --monitor "*api*"
|
||||
kumacli monitor pause --group "production*"
|
||||
|
||||
# Resume monitors
|
||||
kumacli monitor resume --monitor "*api*"
|
||||
kumacli monitor resume --group "production*"
|
||||
kumacli monitor resume --all
|
||||
```
|
||||
|
||||
### Maintenance Commands
|
||||
|
||||
```bash
|
||||
# Show available subcommands
|
||||
kumacli maintenance
|
||||
|
||||
# Create maintenance for specific monitors (90 minutes, starting now)
|
||||
kumacli maintenance add --monitor "*nextcloud*"
|
||||
|
||||
|
26
setup.py
26
setup.py
@@ -1,13 +1,27 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import re
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
# Read version directly from version.py file without importing
|
||||
def get_version():
|
||||
version_file = os.path.join(os.path.dirname(__file__), 'src', 'kumacli', 'cmd', 'version.py')
|
||||
with open(version_file, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", content, re.M)
|
||||
if version_match:
|
||||
return version_match.group(1)
|
||||
raise RuntimeError("Unable to find version string.")
|
||||
|
||||
__version__ = get_version()
|
||||
|
||||
with open("README.md", "r", encoding="utf-8") as fh:
|
||||
long_description = fh.read()
|
||||
|
||||
setup(
|
||||
name="kumacli",
|
||||
version="1.1.0",
|
||||
version=__version__,
|
||||
author="Uptime Kuma CLI",
|
||||
description="A command-line interface for Uptime Kuma",
|
||||
long_description=long_description,
|
||||
@@ -34,6 +48,16 @@ setup(
|
||||
install_requires=[
|
||||
"uptime-kuma-api>=1.0.0",
|
||||
],
|
||||
extras_require={
|
||||
"dev": [
|
||||
"pytest>=6.0",
|
||||
"pytest-cov>=2.0",
|
||||
],
|
||||
"test": [
|
||||
"pytest>=6.0",
|
||||
"pytest-cov>=2.0",
|
||||
],
|
||||
},
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"kumacli=kumacli.kumacli:main",
|
||||
|
@@ -2,10 +2,10 @@
|
||||
KumaCLI - A command-line interface for Uptime Kuma
|
||||
"""
|
||||
|
||||
__version__ = "1.0.0"
|
||||
from kumacli.kumacli import main
|
||||
from kumacli.cmd.version import __version__
|
||||
|
||||
__author__ = "KumaCLI Team"
|
||||
__email__ = "info@kumacli.com"
|
||||
|
||||
from kumacli.kumacli import main
|
||||
|
||||
__all__ = ["main"]
|
||||
|
@@ -1,12 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Uptime Kuma client wrapper for API operations."""
|
||||
|
||||
import fnmatch
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from uptime_kuma_api import UptimeKumaApi
|
||||
from datetime import datetime
|
||||
from uptime_kuma_api import UptimeKumaApi, UptimeKumaException
|
||||
|
||||
|
||||
class KumaClient:
|
||||
"""Client wrapper for Uptime Kuma API operations."""
|
||||
|
||||
def __init__(self, url, username=None, password=None):
|
||||
self.url = url
|
||||
self.username = username
|
||||
@@ -29,9 +32,9 @@ class KumaClient:
|
||||
|
||||
if unit == "s":
|
||||
return value
|
||||
elif unit == "m":
|
||||
if unit == "m":
|
||||
return value * 60
|
||||
elif unit == "h":
|
||||
if unit == "h":
|
||||
return value * 3600
|
||||
|
||||
raise ValueError(f"Invalid duration unit: {unit}")
|
||||
@@ -71,10 +74,10 @@ class KumaClient:
|
||||
try:
|
||||
self.api = UptimeKumaApi(self.url)
|
||||
if self.username and self.password:
|
||||
result = self.api.login(self.username, self.password)
|
||||
self.api.login(self.username, self.password)
|
||||
print(f"Connected to {self.url}")
|
||||
return True
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Failed to connect: {e}")
|
||||
return False
|
||||
|
||||
@@ -108,7 +111,7 @@ class KumaClient:
|
||||
|
||||
return unique_monitors
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error finding monitors: {e}")
|
||||
return []
|
||||
|
||||
@@ -138,7 +141,7 @@ class KumaClient:
|
||||
|
||||
return unique_groups
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error finding groups: {e}")
|
||||
return []
|
||||
|
||||
@@ -165,6 +168,108 @@ class KumaClient:
|
||||
|
||||
return group_members
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error getting group members: {e}")
|
||||
return []
|
||||
|
||||
def find_monitors_by_globs(self, monitor_patterns=None, group_patterns=None):
|
||||
"""Find monitor IDs by name patterns and/or group patterns.
|
||||
|
||||
Args:
|
||||
monitor_patterns: List of monitor name patterns (supports wildcards)
|
||||
group_patterns: List of group name patterns (supports wildcards)
|
||||
|
||||
Returns:
|
||||
List of monitor IDs (integers) that match the criteria
|
||||
"""
|
||||
try:
|
||||
# Check if we have either monitor patterns or group patterns
|
||||
if not monitor_patterns and not group_patterns:
|
||||
print(
|
||||
"Error: Either monitor or group patterns required. "
|
||||
"Specify at least one pattern."
|
||||
)
|
||||
return []
|
||||
|
||||
matched_monitors = []
|
||||
|
||||
# Find monitors by patterns if specified
|
||||
if monitor_patterns:
|
||||
pattern_monitors = self.find_monitors_by_pattern(monitor_patterns)
|
||||
matched_monitors.extend(pattern_monitors)
|
||||
|
||||
# Find monitors by groups if specified
|
||||
if group_patterns:
|
||||
group_monitors = self.get_monitors_in_groups(group_patterns)
|
||||
# Convert to same format as find_monitors_by_pattern
|
||||
group_monitor_objs = [
|
||||
{"id": m.get("id"), "name": m.get("name")} for m in group_monitors
|
||||
]
|
||||
matched_monitors.extend(group_monitor_objs)
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
seen = set()
|
||||
unique_monitors = []
|
||||
for monitor in matched_monitors:
|
||||
if monitor["id"] not in seen:
|
||||
seen.add(monitor["id"])
|
||||
unique_monitors.append(monitor)
|
||||
|
||||
matched_monitors = unique_monitors
|
||||
|
||||
if not matched_monitors:
|
||||
print(
|
||||
"Error: No monitors found matching the specified patterns or groups"
|
||||
)
|
||||
return []
|
||||
|
||||
# Return list of monitor IDs
|
||||
return [monitor["id"] for monitor in matched_monitors]
|
||||
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error finding monitors by globs: {e}")
|
||||
return []
|
||||
|
||||
def get_monitor_details(self, monitor_ids):
|
||||
"""Get monitor details for display purposes.
|
||||
|
||||
Args:
|
||||
monitor_ids: List of monitor IDs
|
||||
|
||||
Returns:
|
||||
List of dicts with 'id' and 'name' keys
|
||||
"""
|
||||
try:
|
||||
all_monitors = self.api.get_monitors()
|
||||
return [
|
||||
{
|
||||
"id": mid,
|
||||
"name": next(
|
||||
(
|
||||
m.get("name", f"Monitor {mid}")
|
||||
for m in all_monitors
|
||||
if m.get("id") == mid
|
||||
),
|
||||
f"Monitor {mid}",
|
||||
),
|
||||
}
|
||||
for mid in monitor_ids
|
||||
]
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error getting monitor details: {e}")
|
||||
return []
|
||||
|
||||
def find_and_get_monitors(self, monitor_patterns=None, group_patterns=None):
|
||||
"""Find monitors by patterns/groups and return detailed info.
|
||||
|
||||
Args:
|
||||
monitor_patterns: List of monitor name patterns (supports wildcards)
|
||||
group_patterns: List of group name patterns (supports wildcards)
|
||||
|
||||
Returns:
|
||||
List of dicts with 'id' and 'name' keys, or empty list if none found
|
||||
"""
|
||||
monitor_ids = self.find_monitors_by_globs(monitor_patterns, group_patterns)
|
||||
if not monitor_ids:
|
||||
return []
|
||||
return self.get_monitor_details(monitor_ids)
|
||||
|
40
src/kumacli/cmd/info.py
Normal file
40
src/kumacli/cmd/info.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Info command implementations for Uptime Kuma CLI."""
|
||||
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
from ..client import KumaClient
|
||||
|
||||
|
||||
class InfoCommands:
|
||||
"""Commands for retrieving server information."""
|
||||
|
||||
def __init__(self, client: KumaClient):
|
||||
self.client = client
|
||||
|
||||
def get_info(self):
|
||||
"""Get server info"""
|
||||
try:
|
||||
info = self.client.api.info()
|
||||
if not info:
|
||||
print("No server info available")
|
||||
return
|
||||
|
||||
print("Server Information:")
|
||||
for key, value in info.items():
|
||||
print(f" {key}: {value}")
|
||||
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error getting server info: {e}")
|
||||
|
||||
|
||||
def setup_info_parser(subparsers):
|
||||
"""Setup info command parser"""
|
||||
info_parser = subparsers.add_parser("info", help="Get server information")
|
||||
return info_parser
|
||||
|
||||
|
||||
def handle_info_command(args, client): # pylint: disable=unused-argument
|
||||
"""Handle info command execution"""
|
||||
info_commands = InfoCommands(client)
|
||||
info_commands.get_info()
|
||||
return True
|
@@ -1,10 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Maintenance command implementations for Uptime Kuma CLI."""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
from ..client import KumaClient
|
||||
|
||||
|
||||
class MaintenanceCommands:
|
||||
"""Commands for managing maintenance windows."""
|
||||
|
||||
def __init__(self, client: KumaClient):
|
||||
self.client = client
|
||||
|
||||
@@ -32,7 +36,7 @@ class MaintenanceCommands:
|
||||
f"{maintenance_id:<5} {title:<30} {strategy:<15} {active:<10} {description:<50}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error listing maintenances: {e}")
|
||||
|
||||
def add_maintenance(
|
||||
@@ -46,45 +50,10 @@ class MaintenanceCommands:
|
||||
):
|
||||
"""Add a new maintenance"""
|
||||
try:
|
||||
# Check if we have either monitor patterns or group patterns
|
||||
if not monitor_patterns and not group_patterns:
|
||||
print(
|
||||
"Error: Either --monitor or --group flag is required. Specify at least one pattern."
|
||||
)
|
||||
return
|
||||
|
||||
matched_monitors = []
|
||||
|
||||
# Find monitors by patterns if specified
|
||||
if monitor_patterns:
|
||||
pattern_monitors = self.client.find_monitors_by_pattern(
|
||||
monitor_patterns
|
||||
)
|
||||
matched_monitors.extend(pattern_monitors)
|
||||
|
||||
# Find monitors by groups if specified
|
||||
if group_patterns:
|
||||
group_monitors = self.client.get_monitors_in_groups(group_patterns)
|
||||
# Convert to the same format as find_monitors_by_pattern
|
||||
group_monitor_objs = [
|
||||
{"id": m.get("id"), "name": m.get("name")} for m in group_monitors
|
||||
]
|
||||
matched_monitors.extend(group_monitor_objs)
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
seen = set()
|
||||
unique_monitors = []
|
||||
for monitor in matched_monitors:
|
||||
if monitor["id"] not in seen:
|
||||
seen.add(monitor["id"])
|
||||
unique_monitors.append(monitor)
|
||||
|
||||
matched_monitors = unique_monitors
|
||||
|
||||
matched_monitors = self.client.find_and_get_monitors(
|
||||
monitor_patterns, group_patterns
|
||||
)
|
||||
if not matched_monitors:
|
||||
print(
|
||||
"Error: No monitors found matching the specified patterns or groups"
|
||||
)
|
||||
return
|
||||
|
||||
print(f"Found {len(matched_monitors)} matching monitors:")
|
||||
@@ -102,7 +71,8 @@ class MaintenanceCommands:
|
||||
end_dt = start_dt + timedelta(seconds=duration_seconds)
|
||||
|
||||
print(
|
||||
f"Maintenance window: {start_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} - {end_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} ({duration})"
|
||||
f"Maintenance window: {start_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} - "
|
||||
f"{end_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} ({duration})"
|
||||
)
|
||||
|
||||
# Create the maintenance with single strategy and date range
|
||||
@@ -135,13 +105,13 @@ class MaintenanceCommands:
|
||||
f"Successfully added {len(matched_monitors)} monitors to maintenance"
|
||||
)
|
||||
print(f"API response: {result}")
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error: Failed to add monitors to maintenance: {e}")
|
||||
print(
|
||||
"This might be due to API compatibility issues or server configuration"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error adding maintenance: {e}")
|
||||
|
||||
def delete_maintenance(self, maintenance_id=None, delete_all=False):
|
||||
@@ -157,7 +127,8 @@ class MaintenanceCommands:
|
||||
print(f"Found {len(maintenances)} maintenances to delete:")
|
||||
for maintenance in maintenances:
|
||||
print(
|
||||
f" - {maintenance.get('title', 'N/A')} (ID: {maintenance.get('id', 'N/A')})"
|
||||
f" - {maintenance.get('title', 'N/A')} "
|
||||
f"(ID: {maintenance.get('id', 'N/A')})"
|
||||
)
|
||||
|
||||
# Delete all maintenances
|
||||
@@ -168,10 +139,11 @@ class MaintenanceCommands:
|
||||
maintenance.get("id")
|
||||
)
|
||||
print(
|
||||
f"Deleted maintenance '{maintenance.get('title', 'N/A')}' (ID: {maintenance.get('id')})"
|
||||
f"Deleted maintenance '{maintenance.get('title', 'N/A')}' "
|
||||
f"(ID: {maintenance.get('id')})"
|
||||
)
|
||||
deleted_count += 1
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(
|
||||
f"Failed to delete maintenance '{maintenance.get('title', 'N/A')}': {e}"
|
||||
)
|
||||
@@ -189,18 +161,19 @@ class MaintenanceCommands:
|
||||
|
||||
result = self.client.api.delete_maintenance(maintenance_id)
|
||||
print(
|
||||
f"Successfully deleted maintenance '{maintenance_title}' (ID: {maintenance_id})"
|
||||
f"Successfully deleted maintenance '{maintenance_title}' "
|
||||
f"(ID: {maintenance_id})"
|
||||
)
|
||||
print(f"API response: {result}")
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Failed to delete maintenance ID {maintenance_id}: {e}")
|
||||
else:
|
||||
print(
|
||||
"Error: Either --id or --all flag is required for delete operation"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error during maintenance deletion: {e}")
|
||||
|
||||
|
||||
@@ -209,6 +182,7 @@ def setup_maintenance_parser(subparsers):
|
||||
maintenance_parser = subparsers.add_parser(
|
||||
"maintenance", help="Maintenance operations"
|
||||
)
|
||||
setup_maintenance_parser.parser = maintenance_parser
|
||||
maintenance_subparsers = maintenance_parser.add_subparsers(
|
||||
dest="maintenance_action", help="Maintenance actions"
|
||||
)
|
||||
@@ -248,12 +222,14 @@ def setup_maintenance_parser(subparsers):
|
||||
add_maintenance_parser.add_argument(
|
||||
"--monitor",
|
||||
action="append",
|
||||
help="Monitor name pattern to add to maintenance (supports wildcards like *NextCloud*, can be repeated)",
|
||||
help="Monitor name pattern to add to maintenance "
|
||||
"(supports wildcards like *NextCloud*, can be repeated)",
|
||||
)
|
||||
add_maintenance_parser.add_argument(
|
||||
"--group",
|
||||
action="append",
|
||||
help="Group name pattern to add all group members to maintenance (supports wildcards, can be repeated)",
|
||||
help="Group name pattern to add all group members to maintenance "
|
||||
"(supports wildcards, can be repeated)",
|
||||
)
|
||||
|
||||
return maintenance_parser
|
||||
@@ -263,6 +239,9 @@ def handle_maintenance_command(args, client):
|
||||
"""Handle maintenance command execution"""
|
||||
maintenance_commands = MaintenanceCommands(client)
|
||||
|
||||
if not args.maintenance_action:
|
||||
setup_maintenance_parser.parser.print_help()
|
||||
return False
|
||||
if args.maintenance_action == "list":
|
||||
maintenance_commands.list_maintenances()
|
||||
elif args.maintenance_action == "add":
|
||||
|
@@ -1,9 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Monitor command implementations for Uptime Kuma CLI."""
|
||||
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
from ..client import KumaClient
|
||||
|
||||
|
||||
class MonitorCommands:
|
||||
"""Commands for managing monitors."""
|
||||
|
||||
def __init__(self, client: KumaClient):
|
||||
self.client = client
|
||||
|
||||
@@ -61,16 +65,104 @@ class MonitorCommands:
|
||||
parent_name = parent_monitor.get("name", f"Group {parent_id}")
|
||||
|
||||
print(
|
||||
f"{monitor_id:<5} {name:<25} {monitor_type:<12} {parent_name:<20} {url:<35} {active:<10}"
|
||||
f"{monitor_id:<5} {name:<25} {monitor_type:<12} "
|
||||
f"{parent_name:<20} {url:<35} {active:<10}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error listing monitors: {e}")
|
||||
|
||||
def pause_monitors(self, monitor_patterns=None, group_patterns=None):
|
||||
"""Pause monitors by patterns and/or groups"""
|
||||
try:
|
||||
matched_monitors = self.client.find_and_get_monitors(
|
||||
monitor_patterns, group_patterns
|
||||
)
|
||||
if not matched_monitors:
|
||||
return
|
||||
|
||||
print(f"Found {len(matched_monitors)} matching monitors to pause:")
|
||||
for monitor in matched_monitors:
|
||||
print(f" - {monitor['name']} (ID: {monitor['id']})")
|
||||
|
||||
# Pause each monitor
|
||||
paused_count = 0
|
||||
for monitor in matched_monitors:
|
||||
try:
|
||||
self.client.api.pause_monitor(monitor["id"])
|
||||
print(f"Paused monitor '{monitor['name']}' (ID: {monitor['id']})")
|
||||
paused_count += 1
|
||||
except UptimeKumaException as e:
|
||||
print(f"Failed to pause monitor '{monitor['name']}': {e}")
|
||||
|
||||
print(
|
||||
f"Successfully paused {paused_count} out of {len(matched_monitors)} monitors"
|
||||
)
|
||||
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error pausing monitors: {e}")
|
||||
|
||||
def resume_monitors(
|
||||
self, monitor_patterns=None, group_patterns=None, resume_all=False
|
||||
):
|
||||
"""Resume monitors by patterns and/or groups, or all paused monitors"""
|
||||
try:
|
||||
if resume_all:
|
||||
# Get all monitors and filter for inactive (paused) ones
|
||||
all_monitors = self.client.api.get_monitors()
|
||||
monitor_ids = [
|
||||
m.get("id") for m in all_monitors if not m.get("active", True)
|
||||
]
|
||||
if not monitor_ids:
|
||||
print("No paused monitors found to resume")
|
||||
return
|
||||
matched_monitors = [
|
||||
{
|
||||
"id": mid,
|
||||
"name": next(
|
||||
(
|
||||
m.get("name", f"Monitor {mid}")
|
||||
for m in all_monitors
|
||||
if m.get("id") == mid
|
||||
),
|
||||
f"Monitor {mid}",
|
||||
),
|
||||
}
|
||||
for mid in monitor_ids
|
||||
]
|
||||
print(f"Found {len(matched_monitors)} paused monitors to resume:")
|
||||
else:
|
||||
matched_monitors = self.client.find_and_get_monitors(
|
||||
monitor_patterns, group_patterns
|
||||
)
|
||||
if not matched_monitors:
|
||||
return
|
||||
print(f"Found {len(matched_monitors)} matching monitors to resume:")
|
||||
for monitor in matched_monitors:
|
||||
print(f" - {monitor['name']} (ID: {monitor['id']})")
|
||||
|
||||
# Resume each monitor
|
||||
resumed_count = 0
|
||||
for monitor in matched_monitors:
|
||||
try:
|
||||
self.client.api.resume_monitor(monitor["id"])
|
||||
print(f"Resumed monitor '{monitor['name']}' (ID: {monitor['id']})")
|
||||
resumed_count += 1
|
||||
except UptimeKumaException as e:
|
||||
print(f"Failed to resume monitor '{monitor['name']}': {e}")
|
||||
|
||||
print(
|
||||
f"Successfully resumed {resumed_count} out of {len(matched_monitors)} monitors"
|
||||
)
|
||||
|
||||
except UptimeKumaException as e:
|
||||
print(f"Error resuming monitors: {e}")
|
||||
|
||||
|
||||
def setup_monitor_parser(subparsers):
|
||||
"""Setup monitor command parser"""
|
||||
monitor_parser = subparsers.add_parser("monitor", help="Monitor operations")
|
||||
setup_monitor_parser.parser = monitor_parser
|
||||
monitor_subparsers = monitor_parser.add_subparsers(
|
||||
dest="monitor_action", help="Monitor actions"
|
||||
)
|
||||
@@ -90,6 +182,39 @@ def setup_monitor_parser(subparsers):
|
||||
help="Group name pattern to filter by (supports wildcards, can be repeated)",
|
||||
)
|
||||
|
||||
# Pause monitors command
|
||||
pause_monitors_parser = monitor_subparsers.add_parser(
|
||||
"pause", help="Pause monitors"
|
||||
)
|
||||
pause_monitors_parser.add_argument(
|
||||
"--monitor",
|
||||
action="append",
|
||||
help="Monitor name pattern to pause (supports wildcards, can be repeated)",
|
||||
)
|
||||
pause_monitors_parser.add_argument(
|
||||
"--group",
|
||||
action="append",
|
||||
help="Group name pattern to pause all group members (supports wildcards, can be repeated)",
|
||||
)
|
||||
|
||||
# Resume monitors command
|
||||
resume_monitors_parser = monitor_subparsers.add_parser(
|
||||
"resume", help="Resume monitors"
|
||||
)
|
||||
resume_monitors_parser.add_argument(
|
||||
"--monitor",
|
||||
action="append",
|
||||
help="Monitor name pattern to resume (supports wildcards, can be repeated)",
|
||||
)
|
||||
resume_monitors_parser.add_argument(
|
||||
"--group",
|
||||
action="append",
|
||||
help="Group name pattern to resume all group members (supports wildcards, can be repeated)",
|
||||
)
|
||||
resume_monitors_parser.add_argument(
|
||||
"--all", action="store_true", help="Resume all paused monitors"
|
||||
)
|
||||
|
||||
return monitor_parser
|
||||
|
||||
|
||||
@@ -97,10 +222,23 @@ def handle_monitor_command(args, client):
|
||||
"""Handle monitor command execution"""
|
||||
monitor_commands = MonitorCommands(client)
|
||||
|
||||
if not args.monitor_action:
|
||||
setup_monitor_parser.parser.print_help()
|
||||
return False
|
||||
if args.monitor_action == "list":
|
||||
monitor_commands.list_monitors(
|
||||
monitor_patterns=args.monitor, group_patterns=args.group
|
||||
)
|
||||
elif args.monitor_action == "pause":
|
||||
monitor_commands.pause_monitors(
|
||||
monitor_patterns=args.monitor, group_patterns=args.group
|
||||
)
|
||||
elif args.monitor_action == "resume":
|
||||
monitor_commands.resume_monitors(
|
||||
monitor_patterns=args.monitor,
|
||||
group_patterns=args.group,
|
||||
resume_all=args.all,
|
||||
)
|
||||
else:
|
||||
print("Unknown monitor action. Use --help for usage information.")
|
||||
return False
|
||||
|
16
src/kumacli/cmd/version.py
Normal file
16
src/kumacli/cmd/version.py
Normal file
@@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Version command implementations for Uptime Kuma CLI."""
|
||||
|
||||
__version__ = "1.4.0"
|
||||
|
||||
|
||||
def setup_version_parser(subparsers):
|
||||
"""Setup version command parser"""
|
||||
version_parser = subparsers.add_parser("version", help="Show version information")
|
||||
return version_parser
|
||||
|
||||
|
||||
def handle_version_command(args, client): # pylint: disable=unused-argument
|
||||
"""Handle version command execution"""
|
||||
print(f"kumacli {__version__}")
|
||||
return True
|
@@ -1,15 +1,17 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Main CLI module for Uptime Kuma."""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
# Handle both direct execution and package import
|
||||
try:
|
||||
from .client import KumaClient
|
||||
from .cmd.monitor import setup_monitor_parser, handle_monitor_command
|
||||
from .cmd.maintenance import setup_maintenance_parser, handle_maintenance_command
|
||||
from .cmd.info import setup_info_parser, handle_info_command
|
||||
from .cmd.version import setup_version_parser, handle_version_command
|
||||
except ImportError:
|
||||
# Running directly, add parent directory to path
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
@@ -19,9 +21,12 @@ except ImportError:
|
||||
setup_maintenance_parser,
|
||||
handle_maintenance_command,
|
||||
)
|
||||
from kumacli.cmd.info import setup_info_parser, handle_info_command
|
||||
from kumacli.cmd.version import setup_version_parser, handle_version_command
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for the CLI application."""
|
||||
parser = argparse.ArgumentParser(description="Uptime Kuma CLI Client")
|
||||
parser.add_argument(
|
||||
"--url", help="Uptime Kuma server URL (can also use KUMA_URL env var)"
|
||||
@@ -38,8 +43,10 @@ def main():
|
||||
subparsers = parser.add_subparsers(dest="resource", help="Resource to operate on")
|
||||
|
||||
# Setup command parsers
|
||||
monitor_parser = setup_monitor_parser(subparsers)
|
||||
maintenance_parser = setup_maintenance_parser(subparsers)
|
||||
setup_monitor_parser(subparsers)
|
||||
setup_maintenance_parser(subparsers)
|
||||
setup_info_parser(subparsers)
|
||||
setup_version_parser(subparsers)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -69,6 +76,10 @@ def main():
|
||||
success = handle_monitor_command(args, client)
|
||||
elif args.resource == "maintenance":
|
||||
success = handle_maintenance_command(args, client)
|
||||
elif args.resource == "info":
|
||||
success = handle_info_command(args, client)
|
||||
elif args.resource == "version":
|
||||
success = handle_version_command(args, client)
|
||||
else:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
100
tests/README.md
Normal file
100
tests/README.md
Normal file
@@ -0,0 +1,100 @@
|
||||
# KumaCLI Tests
|
||||
|
||||
This directory contains the test suite for kumacli.
|
||||
|
||||
## Running Tests
|
||||
|
||||
### Prerequisites
|
||||
|
||||
Install test dependencies:
|
||||
```bash
|
||||
pip install -e ".[test]"
|
||||
# or
|
||||
pip install pytest pytest-cov
|
||||
```
|
||||
|
||||
### Run All Tests
|
||||
|
||||
```bash
|
||||
# Using pytest directly
|
||||
python3 -m pytest
|
||||
|
||||
# Using the test runner script
|
||||
python3 run_tests.py
|
||||
|
||||
# From the project root
|
||||
python3 -m pytest tests/
|
||||
```
|
||||
|
||||
### Run Specific Tests
|
||||
|
||||
```bash
|
||||
# Test a specific file
|
||||
pytest tests/test_info.py
|
||||
|
||||
# Test a specific class
|
||||
pytest tests/test_monitor.py::TestMonitorCommands
|
||||
|
||||
# Test a specific method
|
||||
pytest tests/test_monitor.py::TestMonitorCommands::test_pause_monitors_by_pattern
|
||||
```
|
||||
|
||||
### Run Tests with Coverage
|
||||
|
||||
```bash
|
||||
pytest --cov=kumacli --cov-report=html
|
||||
python run_tests.py --cov
|
||||
```
|
||||
|
||||
### Test Options
|
||||
|
||||
```bash
|
||||
# Verbose output
|
||||
pytest -v
|
||||
|
||||
# Stop on first failure
|
||||
pytest -x
|
||||
|
||||
# Run tests in parallel (requires pytest-xdist)
|
||||
pytest -n auto
|
||||
```
|
||||
|
||||
## Test Structure
|
||||
|
||||
- `conftest.py` - Shared fixtures and test configuration
|
||||
- `test_info.py` - Tests for the info command
|
||||
- `test_monitor.py` - Tests for monitor commands (list, pause, resume)
|
||||
- `test_maintenance.py` - Tests for maintenance commands
|
||||
- `test_client.py` - Tests for the KumaClient class
|
||||
- `test_cli_integration.py` - Integration tests for CLI functionality
|
||||
|
||||
## Test Coverage
|
||||
|
||||
The tests cover:
|
||||
|
||||
- ✅ Command argument parsing
|
||||
- ✅ API method calls and responses
|
||||
- ✅ Error handling and edge cases
|
||||
- ✅ Help message functionality
|
||||
- ✅ Monitor pause/resume operations
|
||||
- ✅ Maintenance operations
|
||||
- ✅ Client utility functions
|
||||
- ✅ Integration between components
|
||||
|
||||
## Mock Strategy
|
||||
|
||||
Tests use unittest.mock to:
|
||||
- Mock the UptimeKumaApi calls
|
||||
- Simulate API responses and errors
|
||||
- Test command logic without requiring a live server
|
||||
- Verify correct API method calls with expected parameters
|
||||
|
||||
## Adding New Tests
|
||||
|
||||
When adding new functionality:
|
||||
|
||||
1. Add unit tests for the new commands/methods
|
||||
2. Add integration tests if the feature involves multiple components
|
||||
3. Test both success and error cases
|
||||
4. Mock external dependencies (API calls, file operations)
|
||||
5. Use descriptive test names that explain what is being tested
|
1
tests/__init__.py
Normal file
1
tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Test package for kumacli
|
78
tests/conftest.py
Normal file
78
tests/conftest.py
Normal file
@@ -0,0 +1,78 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add the src directory to Python path
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, MagicMock
|
||||
from kumacli.client import KumaClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_client():
|
||||
"""Create a mock KumaClient for testing"""
|
||||
client = Mock(spec=KumaClient)
|
||||
client.api = Mock()
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_monitors():
|
||||
"""Sample monitor data for testing"""
|
||||
return [
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Test Monitor 1",
|
||||
"type": "http",
|
||||
"url": "https://example.com",
|
||||
"active": True,
|
||||
"parent": None,
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Test Monitor 2",
|
||||
"type": "http",
|
||||
"url": "https://test.com",
|
||||
"active": False,
|
||||
"parent": None,
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"name": "Group Monitor",
|
||||
"type": "group",
|
||||
"active": True,
|
||||
"parent": None,
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"name": "Child Monitor",
|
||||
"type": "http",
|
||||
"url": "https://child.com",
|
||||
"active": False,
|
||||
"parent": 3,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_maintenances():
|
||||
"""Sample maintenance data for testing"""
|
||||
return [
|
||||
{
|
||||
"id": 1,
|
||||
"title": "Test Maintenance",
|
||||
"description": "Test maintenance description",
|
||||
"strategy": "single",
|
||||
"active": True,
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "Inactive Maintenance",
|
||||
"description": "Inactive maintenance description",
|
||||
"strategy": "single",
|
||||
"active": False,
|
||||
},
|
||||
]
|
273
tests/test_cli_integration.py
Normal file
273
tests/test_cli_integration.py
Normal file
@@ -0,0 +1,273 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
import argparse
|
||||
from io import StringIO
|
||||
import sys
|
||||
|
||||
from kumacli.cmd.monitor import setup_monitor_parser, handle_monitor_command
|
||||
from kumacli.cmd.maintenance import setup_maintenance_parser, handle_maintenance_command
|
||||
from kumacli.cmd.info import setup_info_parser, handle_info_command
|
||||
|
||||
|
||||
class TestCLIIntegration:
|
||||
def test_monitor_parser_setup(self):
|
||||
"""Test monitor parser setup"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
|
||||
monitor_parser = setup_monitor_parser(subparsers)
|
||||
|
||||
# Verify parser is created
|
||||
assert monitor_parser is not None
|
||||
assert monitor_parser.prog.endswith("monitor")
|
||||
|
||||
def test_maintenance_parser_setup(self):
|
||||
"""Test maintenance parser setup"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
|
||||
maintenance_parser = setup_maintenance_parser(subparsers)
|
||||
|
||||
# Verify parser is created
|
||||
assert maintenance_parser is not None
|
||||
assert maintenance_parser.prog.endswith("maintenance")
|
||||
|
||||
def test_info_parser_setup(self):
|
||||
"""Test info parser setup"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
|
||||
info_parser = setup_info_parser(subparsers)
|
||||
|
||||
# Verify parser is created
|
||||
assert info_parser is not None
|
||||
|
||||
def test_monitor_help_message(self, mock_client, capsys):
|
||||
"""Test monitor command shows help when no action specified"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = None
|
||||
|
||||
# Setup parser reference to simulate having called setup_monitor_parser
|
||||
mock_parser = Mock()
|
||||
setup_monitor_parser.parser = mock_parser
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
mock_parser.print_help.assert_called_once()
|
||||
|
||||
def test_maintenance_help_message(self, mock_client, capsys):
|
||||
"""Test maintenance command shows help when no action specified"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.maintenance_action = None
|
||||
|
||||
# Setup parser reference to simulate having called setup_maintenance_parser
|
||||
mock_parser = Mock()
|
||||
setup_maintenance_parser.parser = mock_parser
|
||||
|
||||
# Execute
|
||||
result = handle_maintenance_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
mock_parser.print_help.assert_called_once()
|
||||
|
||||
def test_monitor_command_with_full_args(self, mock_client):
|
||||
"""Test monitor command with complete argument structure"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = "pause"
|
||||
mock_args.monitor = ["test*"]
|
||||
mock_args.group = ["web-services"]
|
||||
|
||||
# Mock client methods
|
||||
mock_client.find_and_get_monitors.return_value = [
|
||||
{"id": 1, "name": "test-monitor"},
|
||||
{"id": 2, "name": "web-service-monitor"},
|
||||
]
|
||||
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
mock_client.find_and_get_monitors.assert_called_once_with(
|
||||
["test*"], ["web-services"]
|
||||
)
|
||||
# Should pause both monitors (deduplicated)
|
||||
assert mock_client.api.pause_monitor.call_count == 2
|
||||
|
||||
def test_resume_all_monitors_integration(self, mock_client, mock_monitors):
|
||||
"""Test resume all monitors integration"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = "resume"
|
||||
mock_args.monitor = None
|
||||
mock_args.group = None
|
||||
mock_args.all = True
|
||||
|
||||
mock_client.api.get_monitors.return_value = mock_monitors
|
||||
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
mock_client.api.get_monitors.assert_called_once()
|
||||
# Should resume only paused monitors (ID 2 and 4 from mock_monitors)
|
||||
assert mock_client.api.resume_monitor.call_count == 2
|
||||
mock_client.api.resume_monitor.assert_any_call(2)
|
||||
mock_client.api.resume_monitor.assert_any_call(4)
|
||||
|
||||
|
||||
class TestArgumentParsing:
|
||||
def test_monitor_list_arguments(self):
|
||||
"""Test monitor list command argument parsing"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
setup_monitor_parser(subparsers)
|
||||
|
||||
# Test with monitor patterns
|
||||
args = parser.parse_args(
|
||||
["monitor", "list", "--monitor", "web*", "--monitor", "api*"]
|
||||
)
|
||||
assert args.resource == "monitor"
|
||||
assert args.monitor_action == "list"
|
||||
assert args.monitor == ["web*", "api*"]
|
||||
|
||||
def test_monitor_pause_arguments(self):
|
||||
"""Test monitor pause command argument parsing"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
setup_monitor_parser(subparsers)
|
||||
|
||||
# Test with group patterns
|
||||
args = parser.parse_args(["monitor", "pause", "--group", "production"])
|
||||
assert args.resource == "monitor"
|
||||
assert args.monitor_action == "pause"
|
||||
assert args.group == ["production"]
|
||||
|
||||
def test_monitor_resume_all_arguments(self):
|
||||
"""Test monitor resume all command argument parsing"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
setup_monitor_parser(subparsers)
|
||||
|
||||
# Test with --all flag
|
||||
args = parser.parse_args(["monitor", "resume", "--all"])
|
||||
assert args.resource == "monitor"
|
||||
assert args.monitor_action == "resume"
|
||||
assert args.all is True
|
||||
|
||||
def test_maintenance_add_arguments(self):
|
||||
"""Test maintenance add command argument parsing"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
setup_maintenance_parser(subparsers)
|
||||
|
||||
# Test maintenance add
|
||||
args = parser.parse_args(
|
||||
[
|
||||
"maintenance",
|
||||
"add",
|
||||
"--title",
|
||||
"Server Update",
|
||||
"--description",
|
||||
"Updating server software",
|
||||
"--duration",
|
||||
"2h",
|
||||
"--monitor",
|
||||
"server*",
|
||||
]
|
||||
)
|
||||
assert args.resource == "maintenance"
|
||||
assert args.maintenance_action == "add"
|
||||
assert args.title == "Server Update"
|
||||
assert args.description == "Updating server software"
|
||||
assert args.duration == "2h"
|
||||
assert args.monitor == ["server*"]
|
||||
|
||||
def test_maintenance_delete_arguments(self):
|
||||
"""Test maintenance delete command argument parsing"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
setup_maintenance_parser(subparsers)
|
||||
|
||||
# Test delete by ID
|
||||
args = parser.parse_args(["maintenance", "delete", "--id", "123"])
|
||||
assert args.resource == "maintenance"
|
||||
assert args.maintenance_action == "delete"
|
||||
assert args.id == 123
|
||||
|
||||
# Test delete all
|
||||
args = parser.parse_args(["maintenance", "delete", "--all"])
|
||||
assert args.resource == "maintenance"
|
||||
assert args.maintenance_action == "delete"
|
||||
assert args.all is True
|
||||
|
||||
def test_info_arguments(self):
|
||||
"""Test info command argument parsing"""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="resource")
|
||||
setup_info_parser(subparsers)
|
||||
|
||||
# Test info command
|
||||
args = parser.parse_args(["info"])
|
||||
assert args.resource == "info"
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
def test_monitor_command_resilience(self, mock_client, capsys):
|
||||
"""Test monitor command handles various error conditions"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = "pause"
|
||||
mock_args.monitor = ["nonexistent*"]
|
||||
mock_args.group = None
|
||||
|
||||
# Mock no matches found
|
||||
def mock_find_and_get_monitors(*args, **kwargs):
|
||||
print("Error: No monitors found matching the specified patterns or groups")
|
||||
return []
|
||||
|
||||
mock_client.find_and_get_monitors.side_effect = mock_find_and_get_monitors
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify error handling
|
||||
assert result is True # Command completes even with no matches
|
||||
captured = capsys.readouterr()
|
||||
assert (
|
||||
"Error: No monitors found matching the specified patterns or groups"
|
||||
in captured.out
|
||||
)
|
||||
|
||||
def test_maintenance_command_resilience(self, mock_client, capsys):
|
||||
"""Test maintenance command handles API errors"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.maintenance_action = "list"
|
||||
|
||||
# Mock API error
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
mock_client.api.get_maintenances.side_effect = UptimeKumaException(
|
||||
"Connection timeout"
|
||||
)
|
||||
|
||||
# Execute
|
||||
result = handle_maintenance_command(mock_args, mock_client)
|
||||
|
||||
# Verify error handling
|
||||
assert result is True # Command completes even with error
|
||||
captured = capsys.readouterr()
|
||||
assert "Error listing maintenances: Connection timeout" in captured.out
|
226
tests/test_client.py
Normal file
226
tests/test_client.py
Normal file
@@ -0,0 +1,226 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from kumacli.client import KumaClient
|
||||
|
||||
|
||||
class TestKumaClient:
|
||||
def test_parse_duration_minutes(self):
|
||||
"""Test parsing duration in minutes"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
assert client.parse_duration("90m") == 5400 # 90 * 60
|
||||
assert client.parse_duration("1m") == 60
|
||||
assert client.parse_duration("120m") == 7200
|
||||
|
||||
def test_parse_duration_hours(self):
|
||||
"""Test parsing duration in hours"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
assert client.parse_duration("1h") == 3600 # 1 * 3600
|
||||
assert client.parse_duration("2h") == 7200
|
||||
assert client.parse_duration("24h") == 86400
|
||||
|
||||
def test_parse_duration_seconds(self):
|
||||
"""Test parsing duration in seconds"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
assert client.parse_duration("3600s") == 3600
|
||||
assert client.parse_duration("60s") == 60
|
||||
assert client.parse_duration("1s") == 1
|
||||
|
||||
def test_parse_duration_default(self):
|
||||
"""Test parsing duration with default value"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
assert client.parse_duration(None) == 5400 # Default 90 minutes
|
||||
assert client.parse_duration("") == 5400
|
||||
|
||||
def test_parse_duration_invalid(self):
|
||||
"""Test parsing invalid duration format"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid duration format"):
|
||||
client.parse_duration("invalid")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid duration format"):
|
||||
client.parse_duration("90x")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid duration format"):
|
||||
client.parse_duration("90")
|
||||
|
||||
def test_parse_start_time_none(self):
|
||||
"""Test parsing start time with None (current time)"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
before = datetime.utcnow()
|
||||
result = client.parse_start_time(None)
|
||||
after = datetime.utcnow()
|
||||
|
||||
assert before <= result <= after
|
||||
|
||||
def test_parse_start_time_iso_format(self):
|
||||
"""Test parsing ISO format start time"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
# Test ISO format with Z
|
||||
result = client.parse_start_time("2023-12-25T10:30:00Z")
|
||||
expected = datetime(2023, 12, 25, 10, 30, 0)
|
||||
assert result == expected
|
||||
|
||||
# Test ISO format with timezone
|
||||
result = client.parse_start_time("2023-12-25T10:30:00+00:00")
|
||||
assert result == expected
|
||||
|
||||
def test_parse_start_time_common_formats(self):
|
||||
"""Test parsing common date/time formats"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
# Full datetime
|
||||
result = client.parse_start_time("2023-12-25 10:30:00")
|
||||
expected = datetime(2023, 12, 25, 10, 30, 0)
|
||||
assert result == expected
|
||||
|
||||
# Date and hour:minute
|
||||
result = client.parse_start_time("2023-12-25 10:30")
|
||||
expected = datetime(2023, 12, 25, 10, 30, 0)
|
||||
assert result == expected
|
||||
|
||||
# Date only
|
||||
result = client.parse_start_time("2023-12-25")
|
||||
expected = datetime(2023, 12, 25, 0, 0, 0)
|
||||
assert result == expected
|
||||
|
||||
def test_parse_start_time_invalid(self):
|
||||
"""Test parsing invalid start time format"""
|
||||
client = KumaClient("http://test.com")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid start time format"):
|
||||
client.parse_start_time("invalid-date")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid start time format"):
|
||||
client.parse_start_time("2023-13-45")
|
||||
|
||||
def test_find_monitors_by_pattern_success(self):
|
||||
"""Test finding monitors by pattern successfully"""
|
||||
client = KumaClient("http://test.com")
|
||||
client.api = Mock()
|
||||
|
||||
mock_monitors = [
|
||||
{"id": 1, "name": "Web Server"},
|
||||
{"id": 2, "name": "API Server"},
|
||||
{"id": 3, "name": "Database"},
|
||||
{"id": 4, "name": "Web Frontend"},
|
||||
]
|
||||
client.api.get_monitors.return_value = mock_monitors
|
||||
|
||||
# Test exact match
|
||||
result = client.find_monitors_by_pattern(["Web Server"])
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == "Web Server"
|
||||
|
||||
# Test wildcard pattern
|
||||
result = client.find_monitors_by_pattern(["Web*"])
|
||||
assert len(result) == 2
|
||||
names = [m["name"] for m in result]
|
||||
assert "Web Server" in names
|
||||
assert "Web Frontend" in names
|
||||
|
||||
def test_find_monitors_by_pattern_case_insensitive(self):
|
||||
"""Test finding monitors by pattern is case insensitive"""
|
||||
client = KumaClient("http://test.com")
|
||||
client.api = Mock()
|
||||
|
||||
mock_monitors = [
|
||||
{"id": 1, "name": "Web Server"},
|
||||
{"id": 2, "name": "API Server"},
|
||||
]
|
||||
client.api.get_monitors.return_value = mock_monitors
|
||||
|
||||
# Test case insensitive matching
|
||||
result = client.find_monitors_by_pattern(["web*"])
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == "Web Server"
|
||||
|
||||
def test_find_monitors_by_pattern_no_matches(self):
|
||||
"""Test finding monitors with no matches"""
|
||||
client = KumaClient("http://test.com")
|
||||
client.api = Mock()
|
||||
|
||||
mock_monitors = [{"id": 1, "name": "Web Server"}]
|
||||
client.api.get_monitors.return_value = mock_monitors
|
||||
|
||||
result = client.find_monitors_by_pattern(["Database*"])
|
||||
assert len(result) == 0
|
||||
|
||||
def test_find_monitors_by_pattern_duplicates(self):
|
||||
"""Test finding monitors removes duplicates"""
|
||||
client = KumaClient("http://test.com")
|
||||
client.api = Mock()
|
||||
|
||||
mock_monitors = [{"id": 1, "name": "Web Server"}]
|
||||
client.api.get_monitors.return_value = mock_monitors
|
||||
|
||||
# Same monitor should match both patterns
|
||||
result = client.find_monitors_by_pattern(["Web*", "*Server"])
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == "Web Server"
|
||||
|
||||
def test_find_monitors_by_pattern_api_error(self, capsys):
|
||||
"""Test finding monitors handles API errors"""
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
client = KumaClient("http://test.com")
|
||||
client.api = Mock()
|
||||
|
||||
client.api.get_monitors.side_effect = UptimeKumaException("API Error")
|
||||
|
||||
result = client.find_monitors_by_pattern(["Web*"])
|
||||
assert len(result) == 0
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Error finding monitors: API Error" in captured.out
|
||||
|
||||
@patch("kumacli.client.UptimeKumaApi")
|
||||
def test_connect_success(self, mock_api_class, capsys):
|
||||
"""Test successful connection"""
|
||||
mock_api = Mock()
|
||||
mock_api_class.return_value = mock_api
|
||||
mock_api.login.return_value = True
|
||||
|
||||
client = KumaClient("http://test.com", "user", "pass")
|
||||
result = client.connect()
|
||||
|
||||
assert result is True
|
||||
assert client.api is mock_api
|
||||
mock_api_class.assert_called_once_with("http://test.com")
|
||||
mock_api.login.assert_called_once_with("user", "pass")
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Connected to http://test.com" in captured.out
|
||||
|
||||
@patch("kumacli.client.UptimeKumaApi")
|
||||
def test_connect_failure(self, mock_api_class, capsys):
|
||||
"""Test connection failure"""
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
mock_api_class.side_effect = UptimeKumaException("Connection failed")
|
||||
|
||||
client = KumaClient("http://test.com", "user", "pass")
|
||||
result = client.connect()
|
||||
|
||||
assert result is False
|
||||
captured = capsys.readouterr()
|
||||
assert "Failed to connect: Connection failed" in captured.out
|
||||
|
||||
def test_disconnect(self):
|
||||
"""Test disconnection"""
|
||||
client = KumaClient("http://test.com")
|
||||
client.api = Mock()
|
||||
|
||||
client.disconnect()
|
||||
|
||||
client.api.disconnect.assert_called_once()
|
96
tests/test_info.py
Normal file
96
tests/test_info.py
Normal file
@@ -0,0 +1,96 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from io import StringIO
|
||||
import sys
|
||||
|
||||
from kumacli.cmd.info import InfoCommands, handle_info_command
|
||||
|
||||
|
||||
class TestInfoCommands:
|
||||
def test_get_info_success(self, mock_client, capsys):
|
||||
"""Test successful info retrieval"""
|
||||
# Setup
|
||||
mock_info_data = {
|
||||
"version": "1.23.0",
|
||||
"hostname": "kuma-server",
|
||||
"primaryBaseURL": "https://status.example.com",
|
||||
}
|
||||
mock_client.api.info.return_value = mock_info_data
|
||||
|
||||
info_commands = InfoCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
info_commands.get_info()
|
||||
|
||||
# Verify
|
||||
mock_client.api.info.assert_called_once()
|
||||
captured = capsys.readouterr()
|
||||
assert "Server Information:" in captured.out
|
||||
assert "version: 1.23.0" in captured.out
|
||||
assert "hostname: kuma-server" in captured.out
|
||||
assert "primaryBaseURL: https://status.example.com" in captured.out
|
||||
|
||||
def test_get_info_empty_response(self, mock_client, capsys):
|
||||
"""Test info command with empty response"""
|
||||
# Setup
|
||||
mock_client.api.info.return_value = None
|
||||
|
||||
info_commands = InfoCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
info_commands.get_info()
|
||||
|
||||
# Verify
|
||||
mock_client.api.info.assert_called_once()
|
||||
captured = capsys.readouterr()
|
||||
assert "No server info available" in captured.out
|
||||
|
||||
def test_get_info_api_error(self, mock_client, capsys):
|
||||
"""Test info command with API error"""
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
# Setup
|
||||
mock_client.api.info.side_effect = UptimeKumaException("Connection failed")
|
||||
|
||||
info_commands = InfoCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
info_commands.get_info()
|
||||
|
||||
# Verify
|
||||
mock_client.api.info.assert_called_once()
|
||||
captured = capsys.readouterr()
|
||||
assert "Error getting server info: Connection failed" in captured.out
|
||||
|
||||
|
||||
class TestInfoCommandHandler:
|
||||
def test_handle_info_command(self, mock_client):
|
||||
"""Test info command handler"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_info_data = {"version": "1.23.0"}
|
||||
mock_client.api.info.return_value = mock_info_data
|
||||
|
||||
# Execute
|
||||
result = handle_info_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
mock_client.api.info.assert_called_once()
|
||||
|
||||
def test_handle_info_command_with_error(self, mock_client):
|
||||
"""Test info command handler with error"""
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_client.api.info.side_effect = UptimeKumaException("API Error")
|
||||
|
||||
# Execute
|
||||
result = handle_info_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True # Handler always returns True
|
||||
mock_client.api.info.assert_called_once()
|
185
tests/test_maintenance.py
Normal file
185
tests/test_maintenance.py
Normal file
@@ -0,0 +1,185 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from io import StringIO
|
||||
import sys
|
||||
|
||||
from kumacli.cmd.maintenance import MaintenanceCommands, handle_maintenance_command
|
||||
|
||||
|
||||
class TestMaintenanceCommands:
|
||||
def test_list_maintenances_success(self, mock_client, mock_maintenances, capsys):
|
||||
"""Test successful maintenance listing"""
|
||||
# Setup
|
||||
mock_client.api.get_maintenances.return_value = mock_maintenances
|
||||
|
||||
maintenance_commands = MaintenanceCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
maintenance_commands.list_maintenances()
|
||||
|
||||
# Verify
|
||||
mock_client.api.get_maintenances.assert_called_once()
|
||||
captured = capsys.readouterr()
|
||||
assert "Test Maintenance" in captured.out
|
||||
assert "Inactive Maintenance" in captured.out
|
||||
assert "Active" in captured.out
|
||||
assert "Inactive" in captured.out
|
||||
|
||||
def test_list_maintenances_empty(self, mock_client, capsys):
|
||||
"""Test maintenance listing with no maintenances"""
|
||||
# Setup
|
||||
mock_client.api.get_maintenances.return_value = []
|
||||
|
||||
maintenance_commands = MaintenanceCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
maintenance_commands.list_maintenances()
|
||||
|
||||
# Verify
|
||||
captured = capsys.readouterr()
|
||||
assert "No maintenances found" in captured.out
|
||||
|
||||
def test_list_maintenances_api_error(self, mock_client, capsys):
|
||||
"""Test maintenance listing with API error"""
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
# Setup
|
||||
mock_client.api.get_maintenances.side_effect = UptimeKumaException("API Error")
|
||||
|
||||
maintenance_commands = MaintenanceCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
maintenance_commands.list_maintenances()
|
||||
|
||||
# Verify
|
||||
captured = capsys.readouterr()
|
||||
assert "Error listing maintenances: API Error" in captured.out
|
||||
|
||||
def test_delete_maintenance_by_id(self, mock_client, capsys):
|
||||
"""Test deleting maintenance by ID"""
|
||||
# Setup
|
||||
mock_maintenance = {"id": 1, "title": "Test Maintenance"}
|
||||
mock_client.api.get_maintenance.return_value = mock_maintenance
|
||||
mock_client.api.delete_maintenance.return_value = {
|
||||
"msg": "Deleted Successfully"
|
||||
}
|
||||
|
||||
maintenance_commands = MaintenanceCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
maintenance_commands.delete_maintenance(maintenance_id=1)
|
||||
|
||||
# Verify
|
||||
mock_client.api.get_maintenance.assert_called_once_with(1)
|
||||
mock_client.api.delete_maintenance.assert_called_once_with(1)
|
||||
captured = capsys.readouterr()
|
||||
assert (
|
||||
"Successfully deleted maintenance 'Test Maintenance' (ID: 1)"
|
||||
in captured.out
|
||||
)
|
||||
|
||||
def test_delete_all_maintenances(self, mock_client, mock_maintenances, capsys):
|
||||
"""Test deleting all maintenances"""
|
||||
# Setup
|
||||
mock_client.api.get_maintenances.return_value = mock_maintenances
|
||||
mock_client.api.delete_maintenance.return_value = {
|
||||
"msg": "Deleted Successfully"
|
||||
}
|
||||
|
||||
maintenance_commands = MaintenanceCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
maintenance_commands.delete_maintenance(delete_all=True)
|
||||
|
||||
# Verify
|
||||
assert mock_client.api.delete_maintenance.call_count == 2
|
||||
captured = capsys.readouterr()
|
||||
assert "Found 2 maintenances to delete:" in captured.out
|
||||
assert "Successfully deleted 2 out of 2 maintenances" in captured.out
|
||||
|
||||
def test_delete_maintenance_no_params(self, mock_client, capsys):
|
||||
"""Test deleting maintenance without parameters"""
|
||||
maintenance_commands = MaintenanceCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
maintenance_commands.delete_maintenance()
|
||||
|
||||
# Verify
|
||||
captured = capsys.readouterr()
|
||||
assert (
|
||||
"Error: Either --id or --all flag is required for delete operation"
|
||||
in captured.out
|
||||
)
|
||||
|
||||
|
||||
class TestMaintenanceCommandHandler:
|
||||
def test_handle_maintenance_command_no_action(self, mock_client, capsys):
|
||||
"""Test maintenance command handler with no action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.maintenance_action = None
|
||||
|
||||
# Mock the parser setup
|
||||
with patch("kumacli.cmd.maintenance.setup_maintenance_parser") as mock_setup:
|
||||
mock_parser = Mock()
|
||||
mock_setup._parser = mock_parser
|
||||
|
||||
# Execute
|
||||
result = handle_maintenance_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
|
||||
def test_handle_maintenance_command_list(self, mock_client, mock_maintenances):
|
||||
"""Test maintenance command handler for list action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.maintenance_action = "list"
|
||||
mock_client.api.get_maintenances.return_value = mock_maintenances
|
||||
|
||||
# Execute
|
||||
result = handle_maintenance_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
mock_client.api.get_maintenances.assert_called_once()
|
||||
|
||||
def test_handle_maintenance_command_delete(self, mock_client):
|
||||
"""Test maintenance command handler for delete action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.maintenance_action = "delete"
|
||||
mock_args.id = 1
|
||||
mock_args.all = False
|
||||
|
||||
mock_maintenance = {"id": 1, "title": "Test Maintenance"}
|
||||
mock_client.api.get_maintenance.return_value = mock_maintenance
|
||||
mock_client.api.delete_maintenance.return_value = {
|
||||
"msg": "Deleted Successfully"
|
||||
}
|
||||
|
||||
# Execute
|
||||
result = handle_maintenance_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
mock_client.api.delete_maintenance.assert_called_once_with(1)
|
||||
|
||||
def test_handle_maintenance_command_unknown_action(self, mock_client, capsys):
|
||||
"""Test maintenance command handler with unknown action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.maintenance_action = "unknown"
|
||||
|
||||
# Execute
|
||||
result = handle_maintenance_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
captured = capsys.readouterr()
|
||||
assert (
|
||||
"Unknown maintenance action. Use --help for usage information."
|
||||
in captured.out
|
||||
)
|
275
tests/test_monitor.py
Normal file
275
tests/test_monitor.py
Normal file
@@ -0,0 +1,275 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from io import StringIO
|
||||
import sys
|
||||
|
||||
from kumacli.cmd.monitor import (
|
||||
MonitorCommands,
|
||||
handle_monitor_command,
|
||||
setup_monitor_parser,
|
||||
)
|
||||
|
||||
|
||||
class TestMonitorCommands:
|
||||
def test_pause_monitors_by_pattern(self, mock_client, mock_monitors, capsys):
|
||||
"""Test pausing monitors by pattern"""
|
||||
# Setup
|
||||
mock_client.find_and_get_monitors.return_value = [
|
||||
{"id": 1, "name": "Test Monitor 1"},
|
||||
{"id": 2, "name": "Test Monitor 2"},
|
||||
]
|
||||
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.pause_monitors(monitor_patterns=["Test*"])
|
||||
|
||||
# Verify
|
||||
mock_client.find_and_get_monitors.assert_called_once_with(["Test*"], None)
|
||||
assert mock_client.api.pause_monitor.call_count == 2
|
||||
mock_client.api.pause_monitor.assert_any_call(1)
|
||||
mock_client.api.pause_monitor.assert_any_call(2)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Found 2 matching monitors to pause:" in captured.out
|
||||
assert "Paused monitor 'Test Monitor 1' (ID: 1)" in captured.out
|
||||
assert "Paused monitor 'Test Monitor 2' (ID: 2)" in captured.out
|
||||
assert "Successfully paused 2 out of 2 monitors" in captured.out
|
||||
|
||||
def test_pause_monitors_by_group(self, mock_client, mock_monitors, capsys):
|
||||
"""Test pausing monitors by group"""
|
||||
# Setup
|
||||
mock_client.find_and_get_monitors.return_value = [
|
||||
{"id": 4, "name": "Child Monitor"}
|
||||
]
|
||||
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.pause_monitors(group_patterns=["Group*"])
|
||||
|
||||
# Verify
|
||||
mock_client.find_and_get_monitors.assert_called_once_with(None, ["Group*"])
|
||||
mock_client.api.pause_monitor.assert_called_once_with(4)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Found 1 matching monitors to pause:" in captured.out
|
||||
assert "Paused monitor 'Child Monitor' (ID: 4)" in captured.out
|
||||
|
||||
def test_pause_monitors_no_patterns(self, mock_client, capsys):
|
||||
"""Test pausing monitors without patterns"""
|
||||
# Setup
|
||||
mock_client.find_and_get_monitors.return_value = []
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.pause_monitors()
|
||||
|
||||
# Verify
|
||||
mock_client.find_and_get_monitors.assert_called_once_with(None, None)
|
||||
|
||||
def test_pause_monitors_no_matches(self, mock_client, capsys):
|
||||
"""Test pausing monitors with no matches"""
|
||||
# Setup
|
||||
mock_client.find_and_get_monitors.return_value = []
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.pause_monitors(monitor_patterns=["NonExistent*"])
|
||||
|
||||
# Verify
|
||||
mock_client.find_and_get_monitors.assert_called_once_with(
|
||||
["NonExistent*"], None
|
||||
)
|
||||
|
||||
def test_pause_monitors_api_error(self, mock_client, capsys):
|
||||
"""Test pausing monitors with API error"""
|
||||
# Setup
|
||||
mock_client.find_and_get_monitors.return_value = [
|
||||
{"id": 1, "name": "Test Monitor 1"}
|
||||
]
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
mock_client.api.pause_monitor.side_effect = UptimeKumaException("API Error")
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.pause_monitors(monitor_patterns=["Test*"])
|
||||
|
||||
# Verify
|
||||
captured = capsys.readouterr()
|
||||
assert "Failed to pause monitor 'Test Monitor 1': API Error" in captured.out
|
||||
assert "Successfully paused 0 out of 1 monitors" in captured.out
|
||||
|
||||
def test_resume_monitors_by_pattern(self, mock_client, mock_monitors, capsys):
|
||||
"""Test resuming monitors by pattern"""
|
||||
# Setup
|
||||
mock_client.find_and_get_monitors.return_value = [
|
||||
{"id": 2, "name": "Test Monitor 2"}
|
||||
]
|
||||
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.resume_monitors(monitor_patterns=["Test*"])
|
||||
|
||||
# Verify
|
||||
mock_client.find_and_get_monitors.assert_called_once_with(["Test*"], None)
|
||||
mock_client.api.resume_monitor.assert_called_once_with(2)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Found 1 matching monitors to resume:" in captured.out
|
||||
assert "Resumed monitor 'Test Monitor 2' (ID: 2)" in captured.out
|
||||
|
||||
def test_resume_monitors_all_paused(self, mock_client, mock_monitors, capsys):
|
||||
"""Test resuming all paused monitors"""
|
||||
# Setup
|
||||
mock_client.api.get_monitors.return_value = mock_monitors
|
||||
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.resume_monitors(resume_all=True)
|
||||
|
||||
# Verify
|
||||
# Should resume monitors with active=False (monitors 2 and 4)
|
||||
assert mock_client.api.resume_monitor.call_count == 2
|
||||
mock_client.api.resume_monitor.assert_any_call(2)
|
||||
mock_client.api.resume_monitor.assert_any_call(4)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Found 2 paused monitors to resume:" in captured.out
|
||||
assert "Successfully resumed 2 out of 2 monitors" in captured.out
|
||||
|
||||
def test_resume_monitors_all_no_paused(self, mock_client, capsys):
|
||||
"""Test resuming all paused monitors when none are paused"""
|
||||
# Setup
|
||||
active_monitors = [{"id": 1, "name": "Active Monitor", "active": True}]
|
||||
mock_client.api.get_monitors.return_value = active_monitors
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.resume_monitors(resume_all=True)
|
||||
|
||||
# Verify
|
||||
captured = capsys.readouterr()
|
||||
assert "No paused monitors found to resume" in captured.out
|
||||
|
||||
def test_resume_monitors_no_args(self, mock_client, capsys):
|
||||
"""Test resuming monitors without any arguments"""
|
||||
# Setup
|
||||
mock_client.find_and_get_monitors.return_value = []
|
||||
|
||||
monitor_commands = MonitorCommands(mock_client)
|
||||
|
||||
# Execute
|
||||
monitor_commands.resume_monitors()
|
||||
|
||||
# Verify
|
||||
mock_client.find_and_get_monitors.assert_called_once_with(None, None)
|
||||
|
||||
|
||||
class TestMonitorCommandHandler:
|
||||
def test_handle_monitor_command_no_action(self, mock_client, capsys):
|
||||
"""Test monitor command handler with no action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = None
|
||||
|
||||
# Mock the parser setup to avoid importing issues
|
||||
with patch("kumacli.cmd.monitor.setup_monitor_parser") as mock_setup:
|
||||
mock_parser = Mock()
|
||||
mock_setup._parser = mock_parser
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
|
||||
def test_handle_monitor_command_pause(self, mock_client):
|
||||
"""Test monitor command handler for pause action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = "pause"
|
||||
mock_args.monitor = ["test*"]
|
||||
mock_args.group = None
|
||||
|
||||
mock_client.find_and_get_monitors.return_value = [
|
||||
{"id": 1, "name": "test monitor"}
|
||||
]
|
||||
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
mock_client.api.pause_monitor.assert_called_once_with(1)
|
||||
|
||||
def test_handle_monitor_command_resume(self, mock_client):
|
||||
"""Test monitor command handler for resume action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = "resume"
|
||||
mock_args.monitor = ["test*"]
|
||||
mock_args.group = None
|
||||
mock_args.all = False
|
||||
|
||||
mock_client.find_and_get_monitors.return_value = [
|
||||
{"id": 1, "name": "test monitor"}
|
||||
]
|
||||
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
mock_client.api.resume_monitor.assert_called_once_with(1)
|
||||
|
||||
def test_handle_monitor_command_resume_all(self, mock_client, mock_monitors):
|
||||
"""Test monitor command handler for resume all action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = "resume"
|
||||
mock_args.monitor = None
|
||||
mock_args.group = None
|
||||
mock_args.all = True
|
||||
|
||||
mock_client.api.get_monitors.return_value = mock_monitors
|
||||
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
# Should resume paused monitors (monitors 2 and 4)
|
||||
assert mock_client.api.resume_monitor.call_count == 2
|
||||
|
||||
def test_handle_monitor_command_unknown_action(self, mock_client, capsys):
|
||||
"""Test monitor command handler with unknown action"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
mock_args.monitor_action = "unknown"
|
||||
|
||||
# Execute
|
||||
result = handle_monitor_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
captured = capsys.readouterr()
|
||||
assert (
|
||||
"Unknown monitor action. Use --help for usage information." in captured.out
|
||||
)
|
31
tests/test_version.py
Normal file
31
tests/test_version.py
Normal file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock
|
||||
|
||||
from kumacli.cmd.version import handle_version_command, __version__
|
||||
|
||||
|
||||
class TestVersionCommand:
|
||||
def test_handle_version_command(self, mock_client, capsys):
|
||||
"""Test version command handler"""
|
||||
# Setup
|
||||
mock_args = Mock()
|
||||
|
||||
# Execute
|
||||
result = handle_version_command(mock_args, mock_client)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
captured = capsys.readouterr()
|
||||
assert f"kumacli {__version__}" in captured.out
|
||||
|
||||
def test_version_is_defined(self):
|
||||
"""Test that version is properly defined"""
|
||||
assert __version__ is not None
|
||||
assert isinstance(__version__, str)
|
||||
assert len(__version__) > 0
|
||||
# Version should follow semantic versioning pattern (e.g., "1.4.0")
|
||||
parts = __version__.split(".")
|
||||
assert len(parts) >= 2 # At least major.minor
|
||||
assert all(part.isdigit() for part in parts) # All parts should be numeric
|
Reference in New Issue
Block a user