Compare commits

...

19 Commits

Author SHA1 Message Date
Pim van Pelt
cf6d8712fb Simplify gitignore 2025-08-04 11:35:16 +02:00
Pim van Pelt
17c31ce6c9 Remove redundant pytest.ini 2025-08-04 11:33:37 +02:00
Pim van Pelt
d6ecb9e4c9 Simplify clean target 2025-08-04 11:27:02 +02:00
Pim van Pelt
65f732e497 Move run_tests.py directly to 'make test' 2025-08-04 11:24:46 +02:00
Pim van Pelt
4690d13cb4 Simplify clean 2025-08-03 14:51:43 +02:00
Pim van Pelt
41692b7dc1 Add kumacli version and link it into setup.py 2025-08-03 12:39:21 +02:00
Pim van Pelt
7eafb1e68e Release v1.3.0 2025-08-03 11:51:55 +02:00
Pim van Pelt
11aad79720 Restore behavior where 'kumacli monitor' shows the subcommands 2025-08-03 11:50:44 +02:00
Pim van Pelt
64e1ca124f Fix tests after refactor 2025-08-03 11:43:53 +02:00
Pim van Pelt
f1d10458c6 Use UptimeKumaException exceptions 2025-08-03 11:27:36 +02:00
Pim van Pelt
1e3999eee2 Refactor: add find_monitors_by_globs() and find_and_get_monitors() 2025-08-03 11:21:10 +02:00
Pim van Pelt
2ddcc00cda Fix some pylint issues 2025-08-03 11:10:24 +02:00
Pim van Pelt
c1d70cd9b6 Reformat with black 2025-08-03 11:03:38 +02:00
Pim van Pelt
81c7f9c9bd Cut release v1.2.0 2025-08-02 20:06:05 +02:00
Pim van Pelt
e2e65add2e Add tests 2025-08-02 20:03:32 +02:00
Pim van Pelt
5eb76736cc print help if there are no sub-commands given 2025-08-02 19:54:42 +02:00
Pim van Pelt
3000b0799b Add resume_monitor() API call. including an --all flag 2025-08-02 19:49:54 +02:00
Pim van Pelt
3b61b13eef Implement pause_monitor() 2025-08-02 19:46:09 +02:00
Pim van Pelt
e3c6dd5513 Add info command 2025-08-02 19:38:49 +02:00
20 changed files with 1674 additions and 84 deletions

6
.gitignore vendored
View File

@@ -1,8 +1,4 @@
bin
include
lib
lib64
pyvenv.cfg
__pycache__
dist/
kumacli.egg-info
.pytest_cache

View File

@@ -1,4 +1,4 @@
.PHONY: clean build install test help
.PHONY: clean build install test test-deps help
# Default target
help:
@@ -6,19 +6,15 @@ help:
@echo " clean - Remove build artifacts and cache files"
@echo " build - Build the wheel package"
@echo " install - Install the package in development mode"
@echo " test - Run tests (if available)"
@echo " test - Run the test suite"
@echo " test-deps - Install test dependencies"
@echo " help - Show this help message"
# Clean build artifacts
clean:
@echo "Cleaning build artifacts..."
rm -rf build/
rm -rf dist/
rm -rf src/kumacli.egg-info/
rm -rf src/kumacli/__pycache__/
rm -rf src/kumacli/cmd/__pycache__/
find . -name "*.pyc" -delete
find . -name "*.pyo" -delete
rm -rf build/ dist/ src/kumacli.egg-info/
find . -name "*.pyc" -o -name "*.pyo" -delete
find . -name "__pycache__" -type d -exec rm -rf {} + 2>/dev/null || true
@echo "Clean complete."
@@ -33,10 +29,15 @@ install:
@echo "Installing package in development mode..."
pip install -e .
# Test the package (placeholder for when tests are added)
# Install test dependencies
test-deps:
@echo "Installing test dependencies..."
pip install -e ".[test]"
# Test the package
test:
@echo "Running tests..."
@echo "No tests configured yet."
python3 -m pytest tests/ -v --tb=short
# Rebuild and reinstall (useful during development)
dev: clean build

View File

@@ -29,6 +29,9 @@ kumacli --url http://localhost:3001 --username admin --password password monitor
### Monitor Commands
```bash
# Show available subcommands
kumacli monitor
# List all monitors
kumacli monitor list
@@ -40,11 +43,23 @@ kumacli monitor list --group "production*"
# Combine filters
kumacli monitor list --monitor "*api*" --group "web*"
# Pause monitors
kumacli monitor pause --monitor "*api*"
kumacli monitor pause --group "production*"
# Resume monitors
kumacli monitor resume --monitor "*api*"
kumacli monitor resume --group "production*"
kumacli monitor resume --all
```
### Maintenance Commands
```bash
# Show available subcommands
kumacli maintenance
# Create maintenance for specific monitors (90 minutes, starting now)
kumacli maintenance add --monitor "*nextcloud*"

View File

@@ -1,13 +1,27 @@
#!/usr/bin/env python3
import os
import re
from setuptools import setup, find_packages
# Read version directly from version.py file without importing
def get_version():
version_file = os.path.join(os.path.dirname(__file__), 'src', 'kumacli', 'cmd', 'version.py')
with open(version_file, 'r', encoding='utf-8') as f:
content = f.read()
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", content, re.M)
if version_match:
return version_match.group(1)
raise RuntimeError("Unable to find version string.")
__version__ = get_version()
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
setup(
name="kumacli",
version="1.1.0",
version=__version__,
author="Uptime Kuma CLI",
description="A command-line interface for Uptime Kuma",
long_description=long_description,
@@ -34,6 +48,16 @@ setup(
install_requires=[
"uptime-kuma-api>=1.0.0",
],
extras_require={
"dev": [
"pytest>=6.0",
"pytest-cov>=2.0",
],
"test": [
"pytest>=6.0",
"pytest-cov>=2.0",
],
},
entry_points={
"console_scripts": [
"kumacli=kumacli.kumacli:main",

View File

@@ -2,10 +2,10 @@
KumaCLI - A command-line interface for Uptime Kuma
"""
__version__ = "1.0.0"
from kumacli.kumacli import main
from kumacli.cmd.version import __version__
__author__ = "KumaCLI Team"
__email__ = "info@kumacli.com"
from kumacli.kumacli import main
__all__ = ["main"]

View File

@@ -1,12 +1,15 @@
#!/usr/bin/env python3
"""Uptime Kuma client wrapper for API operations."""
import fnmatch
import re
from datetime import datetime, timedelta
from uptime_kuma_api import UptimeKumaApi
from datetime import datetime
from uptime_kuma_api import UptimeKumaApi, UptimeKumaException
class KumaClient:
"""Client wrapper for Uptime Kuma API operations."""
def __init__(self, url, username=None, password=None):
self.url = url
self.username = username
@@ -29,9 +32,9 @@ class KumaClient:
if unit == "s":
return value
elif unit == "m":
if unit == "m":
return value * 60
elif unit == "h":
if unit == "h":
return value * 3600
raise ValueError(f"Invalid duration unit: {unit}")
@@ -71,10 +74,10 @@ class KumaClient:
try:
self.api = UptimeKumaApi(self.url)
if self.username and self.password:
result = self.api.login(self.username, self.password)
self.api.login(self.username, self.password)
print(f"Connected to {self.url}")
return True
except Exception as e:
except UptimeKumaException as e:
print(f"Failed to connect: {e}")
return False
@@ -108,7 +111,7 @@ class KumaClient:
return unique_monitors
except Exception as e:
except UptimeKumaException as e:
print(f"Error finding monitors: {e}")
return []
@@ -138,7 +141,7 @@ class KumaClient:
return unique_groups
except Exception as e:
except UptimeKumaException as e:
print(f"Error finding groups: {e}")
return []
@@ -165,6 +168,108 @@ class KumaClient:
return group_members
except Exception as e:
except UptimeKumaException as e:
print(f"Error getting group members: {e}")
return []
def find_monitors_by_globs(self, monitor_patterns=None, group_patterns=None):
"""Find monitor IDs by name patterns and/or group patterns.
Args:
monitor_patterns: List of monitor name patterns (supports wildcards)
group_patterns: List of group name patterns (supports wildcards)
Returns:
List of monitor IDs (integers) that match the criteria
"""
try:
# Check if we have either monitor patterns or group patterns
if not monitor_patterns and not group_patterns:
print(
"Error: Either monitor or group patterns required. "
"Specify at least one pattern."
)
return []
matched_monitors = []
# Find monitors by patterns if specified
if monitor_patterns:
pattern_monitors = self.find_monitors_by_pattern(monitor_patterns)
matched_monitors.extend(pattern_monitors)
# Find monitors by groups if specified
if group_patterns:
group_monitors = self.get_monitors_in_groups(group_patterns)
# Convert to same format as find_monitors_by_pattern
group_monitor_objs = [
{"id": m.get("id"), "name": m.get("name")} for m in group_monitors
]
matched_monitors.extend(group_monitor_objs)
# Remove duplicates while preserving order
seen = set()
unique_monitors = []
for monitor in matched_monitors:
if monitor["id"] not in seen:
seen.add(monitor["id"])
unique_monitors.append(monitor)
matched_monitors = unique_monitors
if not matched_monitors:
print(
"Error: No monitors found matching the specified patterns or groups"
)
return []
# Return list of monitor IDs
return [monitor["id"] for monitor in matched_monitors]
except UptimeKumaException as e:
print(f"Error finding monitors by globs: {e}")
return []
def get_monitor_details(self, monitor_ids):
"""Get monitor details for display purposes.
Args:
monitor_ids: List of monitor IDs
Returns:
List of dicts with 'id' and 'name' keys
"""
try:
all_monitors = self.api.get_monitors()
return [
{
"id": mid,
"name": next(
(
m.get("name", f"Monitor {mid}")
for m in all_monitors
if m.get("id") == mid
),
f"Monitor {mid}",
),
}
for mid in monitor_ids
]
except UptimeKumaException as e:
print(f"Error getting monitor details: {e}")
return []
def find_and_get_monitors(self, monitor_patterns=None, group_patterns=None):
"""Find monitors by patterns/groups and return detailed info.
Args:
monitor_patterns: List of monitor name patterns (supports wildcards)
group_patterns: List of group name patterns (supports wildcards)
Returns:
List of dicts with 'id' and 'name' keys, or empty list if none found
"""
monitor_ids = self.find_monitors_by_globs(monitor_patterns, group_patterns)
if not monitor_ids:
return []
return self.get_monitor_details(monitor_ids)

40
src/kumacli/cmd/info.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""Info command implementations for Uptime Kuma CLI."""
from uptime_kuma_api import UptimeKumaException
from ..client import KumaClient
class InfoCommands:
"""Commands for retrieving server information."""
def __init__(self, client: KumaClient):
self.client = client
def get_info(self):
"""Get server info"""
try:
info = self.client.api.info()
if not info:
print("No server info available")
return
print("Server Information:")
for key, value in info.items():
print(f" {key}: {value}")
except UptimeKumaException as e:
print(f"Error getting server info: {e}")
def setup_info_parser(subparsers):
"""Setup info command parser"""
info_parser = subparsers.add_parser("info", help="Get server information")
return info_parser
def handle_info_command(args, client): # pylint: disable=unused-argument
"""Handle info command execution"""
info_commands = InfoCommands(client)
info_commands.get_info()
return True

View File

@@ -1,10 +1,14 @@
#!/usr/bin/env python3
"""Maintenance command implementations for Uptime Kuma CLI."""
from datetime import datetime, timedelta
from uptime_kuma_api import UptimeKumaException
from ..client import KumaClient
class MaintenanceCommands:
"""Commands for managing maintenance windows."""
def __init__(self, client: KumaClient):
self.client = client
@@ -32,7 +36,7 @@ class MaintenanceCommands:
f"{maintenance_id:<5} {title:<30} {strategy:<15} {active:<10} {description:<50}"
)
except Exception as e:
except UptimeKumaException as e:
print(f"Error listing maintenances: {e}")
def add_maintenance(
@@ -46,45 +50,10 @@ class MaintenanceCommands:
):
"""Add a new maintenance"""
try:
# Check if we have either monitor patterns or group patterns
if not monitor_patterns and not group_patterns:
print(
"Error: Either --monitor or --group flag is required. Specify at least one pattern."
)
return
matched_monitors = []
# Find monitors by patterns if specified
if monitor_patterns:
pattern_monitors = self.client.find_monitors_by_pattern(
monitor_patterns
)
matched_monitors.extend(pattern_monitors)
# Find monitors by groups if specified
if group_patterns:
group_monitors = self.client.get_monitors_in_groups(group_patterns)
# Convert to the same format as find_monitors_by_pattern
group_monitor_objs = [
{"id": m.get("id"), "name": m.get("name")} for m in group_monitors
]
matched_monitors.extend(group_monitor_objs)
# Remove duplicates while preserving order
seen = set()
unique_monitors = []
for monitor in matched_monitors:
if monitor["id"] not in seen:
seen.add(monitor["id"])
unique_monitors.append(monitor)
matched_monitors = unique_monitors
matched_monitors = self.client.find_and_get_monitors(
monitor_patterns, group_patterns
)
if not matched_monitors:
print(
"Error: No monitors found matching the specified patterns or groups"
)
return
print(f"Found {len(matched_monitors)} matching monitors:")
@@ -102,7 +71,8 @@ class MaintenanceCommands:
end_dt = start_dt + timedelta(seconds=duration_seconds)
print(
f"Maintenance window: {start_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} - {end_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} ({duration})"
f"Maintenance window: {start_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} - "
f"{end_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} ({duration})"
)
# Create the maintenance with single strategy and date range
@@ -135,13 +105,13 @@ class MaintenanceCommands:
f"Successfully added {len(matched_monitors)} monitors to maintenance"
)
print(f"API response: {result}")
except Exception as e:
except UptimeKumaException as e:
print(f"Error: Failed to add monitors to maintenance: {e}")
print(
"This might be due to API compatibility issues or server configuration"
)
except Exception as e:
except UptimeKumaException as e:
print(f"Error adding maintenance: {e}")
def delete_maintenance(self, maintenance_id=None, delete_all=False):
@@ -157,7 +127,8 @@ class MaintenanceCommands:
print(f"Found {len(maintenances)} maintenances to delete:")
for maintenance in maintenances:
print(
f" - {maintenance.get('title', 'N/A')} (ID: {maintenance.get('id', 'N/A')})"
f" - {maintenance.get('title', 'N/A')} "
f"(ID: {maintenance.get('id', 'N/A')})"
)
# Delete all maintenances
@@ -168,10 +139,11 @@ class MaintenanceCommands:
maintenance.get("id")
)
print(
f"Deleted maintenance '{maintenance.get('title', 'N/A')}' (ID: {maintenance.get('id')})"
f"Deleted maintenance '{maintenance.get('title', 'N/A')}' "
f"(ID: {maintenance.get('id')})"
)
deleted_count += 1
except Exception as e:
except UptimeKumaException as e:
print(
f"Failed to delete maintenance '{maintenance.get('title', 'N/A')}': {e}"
)
@@ -189,18 +161,19 @@ class MaintenanceCommands:
result = self.client.api.delete_maintenance(maintenance_id)
print(
f"Successfully deleted maintenance '{maintenance_title}' (ID: {maintenance_id})"
f"Successfully deleted maintenance '{maintenance_title}' "
f"(ID: {maintenance_id})"
)
print(f"API response: {result}")
except Exception as e:
except UptimeKumaException as e:
print(f"Failed to delete maintenance ID {maintenance_id}: {e}")
else:
print(
"Error: Either --id or --all flag is required for delete operation"
)
except Exception as e:
except UptimeKumaException as e:
print(f"Error during maintenance deletion: {e}")
@@ -209,6 +182,7 @@ def setup_maintenance_parser(subparsers):
maintenance_parser = subparsers.add_parser(
"maintenance", help="Maintenance operations"
)
setup_maintenance_parser.parser = maintenance_parser
maintenance_subparsers = maintenance_parser.add_subparsers(
dest="maintenance_action", help="Maintenance actions"
)
@@ -248,12 +222,14 @@ def setup_maintenance_parser(subparsers):
add_maintenance_parser.add_argument(
"--monitor",
action="append",
help="Monitor name pattern to add to maintenance (supports wildcards like *NextCloud*, can be repeated)",
help="Monitor name pattern to add to maintenance "
"(supports wildcards like *NextCloud*, can be repeated)",
)
add_maintenance_parser.add_argument(
"--group",
action="append",
help="Group name pattern to add all group members to maintenance (supports wildcards, can be repeated)",
help="Group name pattern to add all group members to maintenance "
"(supports wildcards, can be repeated)",
)
return maintenance_parser
@@ -263,6 +239,9 @@ def handle_maintenance_command(args, client):
"""Handle maintenance command execution"""
maintenance_commands = MaintenanceCommands(client)
if not args.maintenance_action:
setup_maintenance_parser.parser.print_help()
return False
if args.maintenance_action == "list":
maintenance_commands.list_maintenances()
elif args.maintenance_action == "add":

View File

@@ -1,9 +1,13 @@
#!/usr/bin/env python3
"""Monitor command implementations for Uptime Kuma CLI."""
from uptime_kuma_api import UptimeKumaException
from ..client import KumaClient
class MonitorCommands:
"""Commands for managing monitors."""
def __init__(self, client: KumaClient):
self.client = client
@@ -61,16 +65,104 @@ class MonitorCommands:
parent_name = parent_monitor.get("name", f"Group {parent_id}")
print(
f"{monitor_id:<5} {name:<25} {monitor_type:<12} {parent_name:<20} {url:<35} {active:<10}"
f"{monitor_id:<5} {name:<25} {monitor_type:<12} "
f"{parent_name:<20} {url:<35} {active:<10}"
)
except Exception as e:
except UptimeKumaException as e:
print(f"Error listing monitors: {e}")
def pause_monitors(self, monitor_patterns=None, group_patterns=None):
"""Pause monitors by patterns and/or groups"""
try:
matched_monitors = self.client.find_and_get_monitors(
monitor_patterns, group_patterns
)
if not matched_monitors:
return
print(f"Found {len(matched_monitors)} matching monitors to pause:")
for monitor in matched_monitors:
print(f" - {monitor['name']} (ID: {monitor['id']})")
# Pause each monitor
paused_count = 0
for monitor in matched_monitors:
try:
self.client.api.pause_monitor(monitor["id"])
print(f"Paused monitor '{monitor['name']}' (ID: {monitor['id']})")
paused_count += 1
except UptimeKumaException as e:
print(f"Failed to pause monitor '{monitor['name']}': {e}")
print(
f"Successfully paused {paused_count} out of {len(matched_monitors)} monitors"
)
except UptimeKumaException as e:
print(f"Error pausing monitors: {e}")
def resume_monitors(
self, monitor_patterns=None, group_patterns=None, resume_all=False
):
"""Resume monitors by patterns and/or groups, or all paused monitors"""
try:
if resume_all:
# Get all monitors and filter for inactive (paused) ones
all_monitors = self.client.api.get_monitors()
monitor_ids = [
m.get("id") for m in all_monitors if not m.get("active", True)
]
if not monitor_ids:
print("No paused monitors found to resume")
return
matched_monitors = [
{
"id": mid,
"name": next(
(
m.get("name", f"Monitor {mid}")
for m in all_monitors
if m.get("id") == mid
),
f"Monitor {mid}",
),
}
for mid in monitor_ids
]
print(f"Found {len(matched_monitors)} paused monitors to resume:")
else:
matched_monitors = self.client.find_and_get_monitors(
monitor_patterns, group_patterns
)
if not matched_monitors:
return
print(f"Found {len(matched_monitors)} matching monitors to resume:")
for monitor in matched_monitors:
print(f" - {monitor['name']} (ID: {monitor['id']})")
# Resume each monitor
resumed_count = 0
for monitor in matched_monitors:
try:
self.client.api.resume_monitor(monitor["id"])
print(f"Resumed monitor '{monitor['name']}' (ID: {monitor['id']})")
resumed_count += 1
except UptimeKumaException as e:
print(f"Failed to resume monitor '{monitor['name']}': {e}")
print(
f"Successfully resumed {resumed_count} out of {len(matched_monitors)} monitors"
)
except UptimeKumaException as e:
print(f"Error resuming monitors: {e}")
def setup_monitor_parser(subparsers):
"""Setup monitor command parser"""
monitor_parser = subparsers.add_parser("monitor", help="Monitor operations")
setup_monitor_parser.parser = monitor_parser
monitor_subparsers = monitor_parser.add_subparsers(
dest="monitor_action", help="Monitor actions"
)
@@ -90,6 +182,39 @@ def setup_monitor_parser(subparsers):
help="Group name pattern to filter by (supports wildcards, can be repeated)",
)
# Pause monitors command
pause_monitors_parser = monitor_subparsers.add_parser(
"pause", help="Pause monitors"
)
pause_monitors_parser.add_argument(
"--monitor",
action="append",
help="Monitor name pattern to pause (supports wildcards, can be repeated)",
)
pause_monitors_parser.add_argument(
"--group",
action="append",
help="Group name pattern to pause all group members (supports wildcards, can be repeated)",
)
# Resume monitors command
resume_monitors_parser = monitor_subparsers.add_parser(
"resume", help="Resume monitors"
)
resume_monitors_parser.add_argument(
"--monitor",
action="append",
help="Monitor name pattern to resume (supports wildcards, can be repeated)",
)
resume_monitors_parser.add_argument(
"--group",
action="append",
help="Group name pattern to resume all group members (supports wildcards, can be repeated)",
)
resume_monitors_parser.add_argument(
"--all", action="store_true", help="Resume all paused monitors"
)
return monitor_parser
@@ -97,10 +222,23 @@ def handle_monitor_command(args, client):
"""Handle monitor command execution"""
monitor_commands = MonitorCommands(client)
if not args.monitor_action:
setup_monitor_parser.parser.print_help()
return False
if args.monitor_action == "list":
monitor_commands.list_monitors(
monitor_patterns=args.monitor, group_patterns=args.group
)
elif args.monitor_action == "pause":
monitor_commands.pause_monitors(
monitor_patterns=args.monitor, group_patterns=args.group
)
elif args.monitor_action == "resume":
monitor_commands.resume_monitors(
monitor_patterns=args.monitor,
group_patterns=args.group,
resume_all=args.all,
)
else:
print("Unknown monitor action. Use --help for usage information.")
return False

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env python3
"""Version command implementations for Uptime Kuma CLI."""
__version__ = "1.4.0"
def setup_version_parser(subparsers):
"""Setup version command parser"""
version_parser = subparsers.add_parser("version", help="Show version information")
return version_parser
def handle_version_command(args, client): # pylint: disable=unused-argument
"""Handle version command execution"""
print(f"kumacli {__version__}")
return True

View File

@@ -1,15 +1,17 @@
#!/usr/bin/env python3
"""Main CLI module for Uptime Kuma."""
import argparse
import os
import sys
from datetime import datetime
# Handle both direct execution and package import
try:
from .client import KumaClient
from .cmd.monitor import setup_monitor_parser, handle_monitor_command
from .cmd.maintenance import setup_maintenance_parser, handle_maintenance_command
from .cmd.info import setup_info_parser, handle_info_command
from .cmd.version import setup_version_parser, handle_version_command
except ImportError:
# Running directly, add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -19,9 +21,12 @@ except ImportError:
setup_maintenance_parser,
handle_maintenance_command,
)
from kumacli.cmd.info import setup_info_parser, handle_info_command
from kumacli.cmd.version import setup_version_parser, handle_version_command
def main():
"""Main entry point for the CLI application."""
parser = argparse.ArgumentParser(description="Uptime Kuma CLI Client")
parser.add_argument(
"--url", help="Uptime Kuma server URL (can also use KUMA_URL env var)"
@@ -38,8 +43,10 @@ def main():
subparsers = parser.add_subparsers(dest="resource", help="Resource to operate on")
# Setup command parsers
monitor_parser = setup_monitor_parser(subparsers)
maintenance_parser = setup_maintenance_parser(subparsers)
setup_monitor_parser(subparsers)
setup_maintenance_parser(subparsers)
setup_info_parser(subparsers)
setup_version_parser(subparsers)
args = parser.parse_args()
@@ -69,6 +76,10 @@ def main():
success = handle_monitor_command(args, client)
elif args.resource == "maintenance":
success = handle_maintenance_command(args, client)
elif args.resource == "info":
success = handle_info_command(args, client)
elif args.resource == "version":
success = handle_version_command(args, client)
else:
parser.print_help()
sys.exit(1)

100
tests/README.md Normal file
View File

@@ -0,0 +1,100 @@
# KumaCLI Tests
This directory contains the test suite for kumacli.
## Running Tests
### Prerequisites
Install test dependencies:
```bash
pip install -e ".[test]"
# or
pip install pytest pytest-cov
```
### Run All Tests
```bash
# Using pytest directly
python3 -m pytest
# Using the test runner script
python3 run_tests.py
# From the project root
python3 -m pytest tests/
```
### Run Specific Tests
```bash
# Test a specific file
pytest tests/test_info.py
# Test a specific class
pytest tests/test_monitor.py::TestMonitorCommands
# Test a specific method
pytest tests/test_monitor.py::TestMonitorCommands::test_pause_monitors_by_pattern
```
### Run Tests with Coverage
```bash
pytest --cov=kumacli --cov-report=html
python run_tests.py --cov
```
### Test Options
```bash
# Verbose output
pytest -v
# Stop on first failure
pytest -x
# Run tests in parallel (requires pytest-xdist)
pytest -n auto
```
## Test Structure
- `conftest.py` - Shared fixtures and test configuration
- `test_info.py` - Tests for the info command
- `test_monitor.py` - Tests for monitor commands (list, pause, resume)
- `test_maintenance.py` - Tests for maintenance commands
- `test_client.py` - Tests for the KumaClient class
- `test_cli_integration.py` - Integration tests for CLI functionality
## Test Coverage
The tests cover:
- ✅ Command argument parsing
- ✅ API method calls and responses
- ✅ Error handling and edge cases
- ✅ Help message functionality
- ✅ Monitor pause/resume operations
- ✅ Maintenance operations
- ✅ Client utility functions
- ✅ Integration between components
## Mock Strategy
Tests use unittest.mock to:
- Mock the UptimeKumaApi calls
- Simulate API responses and errors
- Test command logic without requiring a live server
- Verify correct API method calls with expected parameters
## Adding New Tests
When adding new functionality:
1. Add unit tests for the new commands/methods
2. Add integration tests if the feature involves multiple components
3. Test both success and error cases
4. Mock external dependencies (API calls, file operations)
5. Use descriptive test names that explain what is being tested

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Test package for kumacli

78
tests/conftest.py Normal file
View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
import sys
import os
# Add the src directory to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
import pytest
from unittest.mock import Mock, MagicMock
from kumacli.client import KumaClient
@pytest.fixture
def mock_client():
"""Create a mock KumaClient for testing"""
client = Mock(spec=KumaClient)
client.api = Mock()
return client
@pytest.fixture
def mock_monitors():
"""Sample monitor data for testing"""
return [
{
"id": 1,
"name": "Test Monitor 1",
"type": "http",
"url": "https://example.com",
"active": True,
"parent": None,
},
{
"id": 2,
"name": "Test Monitor 2",
"type": "http",
"url": "https://test.com",
"active": False,
"parent": None,
},
{
"id": 3,
"name": "Group Monitor",
"type": "group",
"active": True,
"parent": None,
},
{
"id": 4,
"name": "Child Monitor",
"type": "http",
"url": "https://child.com",
"active": False,
"parent": 3,
},
]
@pytest.fixture
def mock_maintenances():
"""Sample maintenance data for testing"""
return [
{
"id": 1,
"title": "Test Maintenance",
"description": "Test maintenance description",
"strategy": "single",
"active": True,
},
{
"id": 2,
"title": "Inactive Maintenance",
"description": "Inactive maintenance description",
"strategy": "single",
"active": False,
},
]

View File

@@ -0,0 +1,273 @@
#!/usr/bin/env python3
import pytest
from unittest.mock import Mock, patch, MagicMock
import argparse
from io import StringIO
import sys
from kumacli.cmd.monitor import setup_monitor_parser, handle_monitor_command
from kumacli.cmd.maintenance import setup_maintenance_parser, handle_maintenance_command
from kumacli.cmd.info import setup_info_parser, handle_info_command
class TestCLIIntegration:
def test_monitor_parser_setup(self):
"""Test monitor parser setup"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
monitor_parser = setup_monitor_parser(subparsers)
# Verify parser is created
assert monitor_parser is not None
assert monitor_parser.prog.endswith("monitor")
def test_maintenance_parser_setup(self):
"""Test maintenance parser setup"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
maintenance_parser = setup_maintenance_parser(subparsers)
# Verify parser is created
assert maintenance_parser is not None
assert maintenance_parser.prog.endswith("maintenance")
def test_info_parser_setup(self):
"""Test info parser setup"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
info_parser = setup_info_parser(subparsers)
# Verify parser is created
assert info_parser is not None
def test_monitor_help_message(self, mock_client, capsys):
"""Test monitor command shows help when no action specified"""
# Setup
mock_args = Mock()
mock_args.monitor_action = None
# Setup parser reference to simulate having called setup_monitor_parser
mock_parser = Mock()
setup_monitor_parser.parser = mock_parser
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is False
mock_parser.print_help.assert_called_once()
def test_maintenance_help_message(self, mock_client, capsys):
"""Test maintenance command shows help when no action specified"""
# Setup
mock_args = Mock()
mock_args.maintenance_action = None
# Setup parser reference to simulate having called setup_maintenance_parser
mock_parser = Mock()
setup_maintenance_parser.parser = mock_parser
# Execute
result = handle_maintenance_command(mock_args, mock_client)
# Verify
assert result is False
mock_parser.print_help.assert_called_once()
def test_monitor_command_with_full_args(self, mock_client):
"""Test monitor command with complete argument structure"""
# Setup
mock_args = Mock()
mock_args.monitor_action = "pause"
mock_args.monitor = ["test*"]
mock_args.group = ["web-services"]
# Mock client methods
mock_client.find_and_get_monitors.return_value = [
{"id": 1, "name": "test-monitor"},
{"id": 2, "name": "web-service-monitor"},
]
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is True
mock_client.find_and_get_monitors.assert_called_once_with(
["test*"], ["web-services"]
)
# Should pause both monitors (deduplicated)
assert mock_client.api.pause_monitor.call_count == 2
def test_resume_all_monitors_integration(self, mock_client, mock_monitors):
"""Test resume all monitors integration"""
# Setup
mock_args = Mock()
mock_args.monitor_action = "resume"
mock_args.monitor = None
mock_args.group = None
mock_args.all = True
mock_client.api.get_monitors.return_value = mock_monitors
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is True
mock_client.api.get_monitors.assert_called_once()
# Should resume only paused monitors (ID 2 and 4 from mock_monitors)
assert mock_client.api.resume_monitor.call_count == 2
mock_client.api.resume_monitor.assert_any_call(2)
mock_client.api.resume_monitor.assert_any_call(4)
class TestArgumentParsing:
def test_monitor_list_arguments(self):
"""Test monitor list command argument parsing"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
setup_monitor_parser(subparsers)
# Test with monitor patterns
args = parser.parse_args(
["monitor", "list", "--monitor", "web*", "--monitor", "api*"]
)
assert args.resource == "monitor"
assert args.monitor_action == "list"
assert args.monitor == ["web*", "api*"]
def test_monitor_pause_arguments(self):
"""Test monitor pause command argument parsing"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
setup_monitor_parser(subparsers)
# Test with group patterns
args = parser.parse_args(["monitor", "pause", "--group", "production"])
assert args.resource == "monitor"
assert args.monitor_action == "pause"
assert args.group == ["production"]
def test_monitor_resume_all_arguments(self):
"""Test monitor resume all command argument parsing"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
setup_monitor_parser(subparsers)
# Test with --all flag
args = parser.parse_args(["monitor", "resume", "--all"])
assert args.resource == "monitor"
assert args.monitor_action == "resume"
assert args.all is True
def test_maintenance_add_arguments(self):
"""Test maintenance add command argument parsing"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
setup_maintenance_parser(subparsers)
# Test maintenance add
args = parser.parse_args(
[
"maintenance",
"add",
"--title",
"Server Update",
"--description",
"Updating server software",
"--duration",
"2h",
"--monitor",
"server*",
]
)
assert args.resource == "maintenance"
assert args.maintenance_action == "add"
assert args.title == "Server Update"
assert args.description == "Updating server software"
assert args.duration == "2h"
assert args.monitor == ["server*"]
def test_maintenance_delete_arguments(self):
"""Test maintenance delete command argument parsing"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
setup_maintenance_parser(subparsers)
# Test delete by ID
args = parser.parse_args(["maintenance", "delete", "--id", "123"])
assert args.resource == "maintenance"
assert args.maintenance_action == "delete"
assert args.id == 123
# Test delete all
args = parser.parse_args(["maintenance", "delete", "--all"])
assert args.resource == "maintenance"
assert args.maintenance_action == "delete"
assert args.all is True
def test_info_arguments(self):
"""Test info command argument parsing"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="resource")
setup_info_parser(subparsers)
# Test info command
args = parser.parse_args(["info"])
assert args.resource == "info"
class TestErrorHandling:
def test_monitor_command_resilience(self, mock_client, capsys):
"""Test monitor command handles various error conditions"""
# Setup
mock_args = Mock()
mock_args.monitor_action = "pause"
mock_args.monitor = ["nonexistent*"]
mock_args.group = None
# Mock no matches found
def mock_find_and_get_monitors(*args, **kwargs):
print("Error: No monitors found matching the specified patterns or groups")
return []
mock_client.find_and_get_monitors.side_effect = mock_find_and_get_monitors
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify error handling
assert result is True # Command completes even with no matches
captured = capsys.readouterr()
assert (
"Error: No monitors found matching the specified patterns or groups"
in captured.out
)
def test_maintenance_command_resilience(self, mock_client, capsys):
"""Test maintenance command handles API errors"""
# Setup
mock_args = Mock()
mock_args.maintenance_action = "list"
# Mock API error
from uptime_kuma_api import UptimeKumaException
mock_client.api.get_maintenances.side_effect = UptimeKumaException(
"Connection timeout"
)
# Execute
result = handle_maintenance_command(mock_args, mock_client)
# Verify error handling
assert result is True # Command completes even with error
captured = capsys.readouterr()
assert "Error listing maintenances: Connection timeout" in captured.out

226
tests/test_client.py Normal file
View File

@@ -0,0 +1,226 @@
#!/usr/bin/env python3
import pytest
from unittest.mock import Mock, patch
from datetime import datetime, timedelta
from kumacli.client import KumaClient
class TestKumaClient:
def test_parse_duration_minutes(self):
"""Test parsing duration in minutes"""
client = KumaClient("http://test.com")
assert client.parse_duration("90m") == 5400 # 90 * 60
assert client.parse_duration("1m") == 60
assert client.parse_duration("120m") == 7200
def test_parse_duration_hours(self):
"""Test parsing duration in hours"""
client = KumaClient("http://test.com")
assert client.parse_duration("1h") == 3600 # 1 * 3600
assert client.parse_duration("2h") == 7200
assert client.parse_duration("24h") == 86400
def test_parse_duration_seconds(self):
"""Test parsing duration in seconds"""
client = KumaClient("http://test.com")
assert client.parse_duration("3600s") == 3600
assert client.parse_duration("60s") == 60
assert client.parse_duration("1s") == 1
def test_parse_duration_default(self):
"""Test parsing duration with default value"""
client = KumaClient("http://test.com")
assert client.parse_duration(None) == 5400 # Default 90 minutes
assert client.parse_duration("") == 5400
def test_parse_duration_invalid(self):
"""Test parsing invalid duration format"""
client = KumaClient("http://test.com")
with pytest.raises(ValueError, match="Invalid duration format"):
client.parse_duration("invalid")
with pytest.raises(ValueError, match="Invalid duration format"):
client.parse_duration("90x")
with pytest.raises(ValueError, match="Invalid duration format"):
client.parse_duration("90")
def test_parse_start_time_none(self):
"""Test parsing start time with None (current time)"""
client = KumaClient("http://test.com")
before = datetime.utcnow()
result = client.parse_start_time(None)
after = datetime.utcnow()
assert before <= result <= after
def test_parse_start_time_iso_format(self):
"""Test parsing ISO format start time"""
client = KumaClient("http://test.com")
# Test ISO format with Z
result = client.parse_start_time("2023-12-25T10:30:00Z")
expected = datetime(2023, 12, 25, 10, 30, 0)
assert result == expected
# Test ISO format with timezone
result = client.parse_start_time("2023-12-25T10:30:00+00:00")
assert result == expected
def test_parse_start_time_common_formats(self):
"""Test parsing common date/time formats"""
client = KumaClient("http://test.com")
# Full datetime
result = client.parse_start_time("2023-12-25 10:30:00")
expected = datetime(2023, 12, 25, 10, 30, 0)
assert result == expected
# Date and hour:minute
result = client.parse_start_time("2023-12-25 10:30")
expected = datetime(2023, 12, 25, 10, 30, 0)
assert result == expected
# Date only
result = client.parse_start_time("2023-12-25")
expected = datetime(2023, 12, 25, 0, 0, 0)
assert result == expected
def test_parse_start_time_invalid(self):
"""Test parsing invalid start time format"""
client = KumaClient("http://test.com")
with pytest.raises(ValueError, match="Invalid start time format"):
client.parse_start_time("invalid-date")
with pytest.raises(ValueError, match="Invalid start time format"):
client.parse_start_time("2023-13-45")
def test_find_monitors_by_pattern_success(self):
"""Test finding monitors by pattern successfully"""
client = KumaClient("http://test.com")
client.api = Mock()
mock_monitors = [
{"id": 1, "name": "Web Server"},
{"id": 2, "name": "API Server"},
{"id": 3, "name": "Database"},
{"id": 4, "name": "Web Frontend"},
]
client.api.get_monitors.return_value = mock_monitors
# Test exact match
result = client.find_monitors_by_pattern(["Web Server"])
assert len(result) == 1
assert result[0]["name"] == "Web Server"
# Test wildcard pattern
result = client.find_monitors_by_pattern(["Web*"])
assert len(result) == 2
names = [m["name"] for m in result]
assert "Web Server" in names
assert "Web Frontend" in names
def test_find_monitors_by_pattern_case_insensitive(self):
"""Test finding monitors by pattern is case insensitive"""
client = KumaClient("http://test.com")
client.api = Mock()
mock_monitors = [
{"id": 1, "name": "Web Server"},
{"id": 2, "name": "API Server"},
]
client.api.get_monitors.return_value = mock_monitors
# Test case insensitive matching
result = client.find_monitors_by_pattern(["web*"])
assert len(result) == 1
assert result[0]["name"] == "Web Server"
def test_find_monitors_by_pattern_no_matches(self):
"""Test finding monitors with no matches"""
client = KumaClient("http://test.com")
client.api = Mock()
mock_monitors = [{"id": 1, "name": "Web Server"}]
client.api.get_monitors.return_value = mock_monitors
result = client.find_monitors_by_pattern(["Database*"])
assert len(result) == 0
def test_find_monitors_by_pattern_duplicates(self):
"""Test finding monitors removes duplicates"""
client = KumaClient("http://test.com")
client.api = Mock()
mock_monitors = [{"id": 1, "name": "Web Server"}]
client.api.get_monitors.return_value = mock_monitors
# Same monitor should match both patterns
result = client.find_monitors_by_pattern(["Web*", "*Server"])
assert len(result) == 1
assert result[0]["name"] == "Web Server"
def test_find_monitors_by_pattern_api_error(self, capsys):
"""Test finding monitors handles API errors"""
from uptime_kuma_api import UptimeKumaException
client = KumaClient("http://test.com")
client.api = Mock()
client.api.get_monitors.side_effect = UptimeKumaException("API Error")
result = client.find_monitors_by_pattern(["Web*"])
assert len(result) == 0
captured = capsys.readouterr()
assert "Error finding monitors: API Error" in captured.out
@patch("kumacli.client.UptimeKumaApi")
def test_connect_success(self, mock_api_class, capsys):
"""Test successful connection"""
mock_api = Mock()
mock_api_class.return_value = mock_api
mock_api.login.return_value = True
client = KumaClient("http://test.com", "user", "pass")
result = client.connect()
assert result is True
assert client.api is mock_api
mock_api_class.assert_called_once_with("http://test.com")
mock_api.login.assert_called_once_with("user", "pass")
captured = capsys.readouterr()
assert "Connected to http://test.com" in captured.out
@patch("kumacli.client.UptimeKumaApi")
def test_connect_failure(self, mock_api_class, capsys):
"""Test connection failure"""
from uptime_kuma_api import UptimeKumaException
mock_api_class.side_effect = UptimeKumaException("Connection failed")
client = KumaClient("http://test.com", "user", "pass")
result = client.connect()
assert result is False
captured = capsys.readouterr()
assert "Failed to connect: Connection failed" in captured.out
def test_disconnect(self):
"""Test disconnection"""
client = KumaClient("http://test.com")
client.api = Mock()
client.disconnect()
client.api.disconnect.assert_called_once()

96
tests/test_info.py Normal file
View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
import pytest
from unittest.mock import Mock, patch
from io import StringIO
import sys
from kumacli.cmd.info import InfoCommands, handle_info_command
class TestInfoCommands:
def test_get_info_success(self, mock_client, capsys):
"""Test successful info retrieval"""
# Setup
mock_info_data = {
"version": "1.23.0",
"hostname": "kuma-server",
"primaryBaseURL": "https://status.example.com",
}
mock_client.api.info.return_value = mock_info_data
info_commands = InfoCommands(mock_client)
# Execute
info_commands.get_info()
# Verify
mock_client.api.info.assert_called_once()
captured = capsys.readouterr()
assert "Server Information:" in captured.out
assert "version: 1.23.0" in captured.out
assert "hostname: kuma-server" in captured.out
assert "primaryBaseURL: https://status.example.com" in captured.out
def test_get_info_empty_response(self, mock_client, capsys):
"""Test info command with empty response"""
# Setup
mock_client.api.info.return_value = None
info_commands = InfoCommands(mock_client)
# Execute
info_commands.get_info()
# Verify
mock_client.api.info.assert_called_once()
captured = capsys.readouterr()
assert "No server info available" in captured.out
def test_get_info_api_error(self, mock_client, capsys):
"""Test info command with API error"""
from uptime_kuma_api import UptimeKumaException
# Setup
mock_client.api.info.side_effect = UptimeKumaException("Connection failed")
info_commands = InfoCommands(mock_client)
# Execute
info_commands.get_info()
# Verify
mock_client.api.info.assert_called_once()
captured = capsys.readouterr()
assert "Error getting server info: Connection failed" in captured.out
class TestInfoCommandHandler:
def test_handle_info_command(self, mock_client):
"""Test info command handler"""
# Setup
mock_args = Mock()
mock_info_data = {"version": "1.23.0"}
mock_client.api.info.return_value = mock_info_data
# Execute
result = handle_info_command(mock_args, mock_client)
# Verify
assert result is True
mock_client.api.info.assert_called_once()
def test_handle_info_command_with_error(self, mock_client):
"""Test info command handler with error"""
from uptime_kuma_api import UptimeKumaException
# Setup
mock_args = Mock()
mock_client.api.info.side_effect = UptimeKumaException("API Error")
# Execute
result = handle_info_command(mock_args, mock_client)
# Verify
assert result is True # Handler always returns True
mock_client.api.info.assert_called_once()

185
tests/test_maintenance.py Normal file
View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python3
import pytest
from unittest.mock import Mock, patch
from io import StringIO
import sys
from kumacli.cmd.maintenance import MaintenanceCommands, handle_maintenance_command
class TestMaintenanceCommands:
def test_list_maintenances_success(self, mock_client, mock_maintenances, capsys):
"""Test successful maintenance listing"""
# Setup
mock_client.api.get_maintenances.return_value = mock_maintenances
maintenance_commands = MaintenanceCommands(mock_client)
# Execute
maintenance_commands.list_maintenances()
# Verify
mock_client.api.get_maintenances.assert_called_once()
captured = capsys.readouterr()
assert "Test Maintenance" in captured.out
assert "Inactive Maintenance" in captured.out
assert "Active" in captured.out
assert "Inactive" in captured.out
def test_list_maintenances_empty(self, mock_client, capsys):
"""Test maintenance listing with no maintenances"""
# Setup
mock_client.api.get_maintenances.return_value = []
maintenance_commands = MaintenanceCommands(mock_client)
# Execute
maintenance_commands.list_maintenances()
# Verify
captured = capsys.readouterr()
assert "No maintenances found" in captured.out
def test_list_maintenances_api_error(self, mock_client, capsys):
"""Test maintenance listing with API error"""
from uptime_kuma_api import UptimeKumaException
# Setup
mock_client.api.get_maintenances.side_effect = UptimeKumaException("API Error")
maintenance_commands = MaintenanceCommands(mock_client)
# Execute
maintenance_commands.list_maintenances()
# Verify
captured = capsys.readouterr()
assert "Error listing maintenances: API Error" in captured.out
def test_delete_maintenance_by_id(self, mock_client, capsys):
"""Test deleting maintenance by ID"""
# Setup
mock_maintenance = {"id": 1, "title": "Test Maintenance"}
mock_client.api.get_maintenance.return_value = mock_maintenance
mock_client.api.delete_maintenance.return_value = {
"msg": "Deleted Successfully"
}
maintenance_commands = MaintenanceCommands(mock_client)
# Execute
maintenance_commands.delete_maintenance(maintenance_id=1)
# Verify
mock_client.api.get_maintenance.assert_called_once_with(1)
mock_client.api.delete_maintenance.assert_called_once_with(1)
captured = capsys.readouterr()
assert (
"Successfully deleted maintenance 'Test Maintenance' (ID: 1)"
in captured.out
)
def test_delete_all_maintenances(self, mock_client, mock_maintenances, capsys):
"""Test deleting all maintenances"""
# Setup
mock_client.api.get_maintenances.return_value = mock_maintenances
mock_client.api.delete_maintenance.return_value = {
"msg": "Deleted Successfully"
}
maintenance_commands = MaintenanceCommands(mock_client)
# Execute
maintenance_commands.delete_maintenance(delete_all=True)
# Verify
assert mock_client.api.delete_maintenance.call_count == 2
captured = capsys.readouterr()
assert "Found 2 maintenances to delete:" in captured.out
assert "Successfully deleted 2 out of 2 maintenances" in captured.out
def test_delete_maintenance_no_params(self, mock_client, capsys):
"""Test deleting maintenance without parameters"""
maintenance_commands = MaintenanceCommands(mock_client)
# Execute
maintenance_commands.delete_maintenance()
# Verify
captured = capsys.readouterr()
assert (
"Error: Either --id or --all flag is required for delete operation"
in captured.out
)
class TestMaintenanceCommandHandler:
def test_handle_maintenance_command_no_action(self, mock_client, capsys):
"""Test maintenance command handler with no action"""
# Setup
mock_args = Mock()
mock_args.maintenance_action = None
# Mock the parser setup
with patch("kumacli.cmd.maintenance.setup_maintenance_parser") as mock_setup:
mock_parser = Mock()
mock_setup._parser = mock_parser
# Execute
result = handle_maintenance_command(mock_args, mock_client)
# Verify
assert result is False
def test_handle_maintenance_command_list(self, mock_client, mock_maintenances):
"""Test maintenance command handler for list action"""
# Setup
mock_args = Mock()
mock_args.maintenance_action = "list"
mock_client.api.get_maintenances.return_value = mock_maintenances
# Execute
result = handle_maintenance_command(mock_args, mock_client)
# Verify
assert result is True
mock_client.api.get_maintenances.assert_called_once()
def test_handle_maintenance_command_delete(self, mock_client):
"""Test maintenance command handler for delete action"""
# Setup
mock_args = Mock()
mock_args.maintenance_action = "delete"
mock_args.id = 1
mock_args.all = False
mock_maintenance = {"id": 1, "title": "Test Maintenance"}
mock_client.api.get_maintenance.return_value = mock_maintenance
mock_client.api.delete_maintenance.return_value = {
"msg": "Deleted Successfully"
}
# Execute
result = handle_maintenance_command(mock_args, mock_client)
# Verify
assert result is True
mock_client.api.delete_maintenance.assert_called_once_with(1)
def test_handle_maintenance_command_unknown_action(self, mock_client, capsys):
"""Test maintenance command handler with unknown action"""
# Setup
mock_args = Mock()
mock_args.maintenance_action = "unknown"
# Execute
result = handle_maintenance_command(mock_args, mock_client)
# Verify
assert result is False
captured = capsys.readouterr()
assert (
"Unknown maintenance action. Use --help for usage information."
in captured.out
)

275
tests/test_monitor.py Normal file
View File

@@ -0,0 +1,275 @@
#!/usr/bin/env python3
import pytest
from unittest.mock import Mock, patch
from io import StringIO
import sys
from kumacli.cmd.monitor import (
MonitorCommands,
handle_monitor_command,
setup_monitor_parser,
)
class TestMonitorCommands:
def test_pause_monitors_by_pattern(self, mock_client, mock_monitors, capsys):
"""Test pausing monitors by pattern"""
# Setup
mock_client.find_and_get_monitors.return_value = [
{"id": 1, "name": "Test Monitor 1"},
{"id": 2, "name": "Test Monitor 2"},
]
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.pause_monitors(monitor_patterns=["Test*"])
# Verify
mock_client.find_and_get_monitors.assert_called_once_with(["Test*"], None)
assert mock_client.api.pause_monitor.call_count == 2
mock_client.api.pause_monitor.assert_any_call(1)
mock_client.api.pause_monitor.assert_any_call(2)
captured = capsys.readouterr()
assert "Found 2 matching monitors to pause:" in captured.out
assert "Paused monitor 'Test Monitor 1' (ID: 1)" in captured.out
assert "Paused monitor 'Test Monitor 2' (ID: 2)" in captured.out
assert "Successfully paused 2 out of 2 monitors" in captured.out
def test_pause_monitors_by_group(self, mock_client, mock_monitors, capsys):
"""Test pausing monitors by group"""
# Setup
mock_client.find_and_get_monitors.return_value = [
{"id": 4, "name": "Child Monitor"}
]
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.pause_monitors(group_patterns=["Group*"])
# Verify
mock_client.find_and_get_monitors.assert_called_once_with(None, ["Group*"])
mock_client.api.pause_monitor.assert_called_once_with(4)
captured = capsys.readouterr()
assert "Found 1 matching monitors to pause:" in captured.out
assert "Paused monitor 'Child Monitor' (ID: 4)" in captured.out
def test_pause_monitors_no_patterns(self, mock_client, capsys):
"""Test pausing monitors without patterns"""
# Setup
mock_client.find_and_get_monitors.return_value = []
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.pause_monitors()
# Verify
mock_client.find_and_get_monitors.assert_called_once_with(None, None)
def test_pause_monitors_no_matches(self, mock_client, capsys):
"""Test pausing monitors with no matches"""
# Setup
mock_client.find_and_get_monitors.return_value = []
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.pause_monitors(monitor_patterns=["NonExistent*"])
# Verify
mock_client.find_and_get_monitors.assert_called_once_with(
["NonExistent*"], None
)
def test_pause_monitors_api_error(self, mock_client, capsys):
"""Test pausing monitors with API error"""
# Setup
mock_client.find_and_get_monitors.return_value = [
{"id": 1, "name": "Test Monitor 1"}
]
from uptime_kuma_api import UptimeKumaException
mock_client.api.pause_monitor.side_effect = UptimeKumaException("API Error")
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.pause_monitors(monitor_patterns=["Test*"])
# Verify
captured = capsys.readouterr()
assert "Failed to pause monitor 'Test Monitor 1': API Error" in captured.out
assert "Successfully paused 0 out of 1 monitors" in captured.out
def test_resume_monitors_by_pattern(self, mock_client, mock_monitors, capsys):
"""Test resuming monitors by pattern"""
# Setup
mock_client.find_and_get_monitors.return_value = [
{"id": 2, "name": "Test Monitor 2"}
]
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.resume_monitors(monitor_patterns=["Test*"])
# Verify
mock_client.find_and_get_monitors.assert_called_once_with(["Test*"], None)
mock_client.api.resume_monitor.assert_called_once_with(2)
captured = capsys.readouterr()
assert "Found 1 matching monitors to resume:" in captured.out
assert "Resumed monitor 'Test Monitor 2' (ID: 2)" in captured.out
def test_resume_monitors_all_paused(self, mock_client, mock_monitors, capsys):
"""Test resuming all paused monitors"""
# Setup
mock_client.api.get_monitors.return_value = mock_monitors
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.resume_monitors(resume_all=True)
# Verify
# Should resume monitors with active=False (monitors 2 and 4)
assert mock_client.api.resume_monitor.call_count == 2
mock_client.api.resume_monitor.assert_any_call(2)
mock_client.api.resume_monitor.assert_any_call(4)
captured = capsys.readouterr()
assert "Found 2 paused monitors to resume:" in captured.out
assert "Successfully resumed 2 out of 2 monitors" in captured.out
def test_resume_monitors_all_no_paused(self, mock_client, capsys):
"""Test resuming all paused monitors when none are paused"""
# Setup
active_monitors = [{"id": 1, "name": "Active Monitor", "active": True}]
mock_client.api.get_monitors.return_value = active_monitors
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.resume_monitors(resume_all=True)
# Verify
captured = capsys.readouterr()
assert "No paused monitors found to resume" in captured.out
def test_resume_monitors_no_args(self, mock_client, capsys):
"""Test resuming monitors without any arguments"""
# Setup
mock_client.find_and_get_monitors.return_value = []
monitor_commands = MonitorCommands(mock_client)
# Execute
monitor_commands.resume_monitors()
# Verify
mock_client.find_and_get_monitors.assert_called_once_with(None, None)
class TestMonitorCommandHandler:
def test_handle_monitor_command_no_action(self, mock_client, capsys):
"""Test monitor command handler with no action"""
# Setup
mock_args = Mock()
mock_args.monitor_action = None
# Mock the parser setup to avoid importing issues
with patch("kumacli.cmd.monitor.setup_monitor_parser") as mock_setup:
mock_parser = Mock()
mock_setup._parser = mock_parser
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is False
def test_handle_monitor_command_pause(self, mock_client):
"""Test monitor command handler for pause action"""
# Setup
mock_args = Mock()
mock_args.monitor_action = "pause"
mock_args.monitor = ["test*"]
mock_args.group = None
mock_client.find_and_get_monitors.return_value = [
{"id": 1, "name": "test monitor"}
]
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is True
mock_client.api.pause_monitor.assert_called_once_with(1)
def test_handle_monitor_command_resume(self, mock_client):
"""Test monitor command handler for resume action"""
# Setup
mock_args = Mock()
mock_args.monitor_action = "resume"
mock_args.monitor = ["test*"]
mock_args.group = None
mock_args.all = False
mock_client.find_and_get_monitors.return_value = [
{"id": 1, "name": "test monitor"}
]
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is True
mock_client.api.resume_monitor.assert_called_once_with(1)
def test_handle_monitor_command_resume_all(self, mock_client, mock_monitors):
"""Test monitor command handler for resume all action"""
# Setup
mock_args = Mock()
mock_args.monitor_action = "resume"
mock_args.monitor = None
mock_args.group = None
mock_args.all = True
mock_client.api.get_monitors.return_value = mock_monitors
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is True
# Should resume paused monitors (monitors 2 and 4)
assert mock_client.api.resume_monitor.call_count == 2
def test_handle_monitor_command_unknown_action(self, mock_client, capsys):
"""Test monitor command handler with unknown action"""
# Setup
mock_args = Mock()
mock_args.monitor_action = "unknown"
# Execute
result = handle_monitor_command(mock_args, mock_client)
# Verify
assert result is False
captured = capsys.readouterr()
assert (
"Unknown monitor action. Use --help for usage information." in captured.out
)

31
tests/test_version.py Normal file
View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python3
import pytest
from unittest.mock import Mock
from kumacli.cmd.version import handle_version_command, __version__
class TestVersionCommand:
def test_handle_version_command(self, mock_client, capsys):
"""Test version command handler"""
# Setup
mock_args = Mock()
# Execute
result = handle_version_command(mock_args, mock_client)
# Verify
assert result is True
captured = capsys.readouterr()
assert f"kumacli {__version__}" in captured.out
def test_version_is_defined(self):
"""Test that version is properly defined"""
assert __version__ is not None
assert isinstance(__version__, str)
assert len(__version__) > 0
# Version should follow semantic versioning pattern (e.g., "1.4.0")
parts = __version__.split(".")
assert len(parts) >= 2 # At least major.minor
assert all(part.isdigit() for part in parts) # All parts should be numeric