264 lines
9.8 KiB
Python
264 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import pytest
|
|
from unittest.mock import Mock, patch
|
|
from io import StringIO
|
|
import sys
|
|
|
|
from kumacli.cmd.monitor import MonitorCommands, handle_monitor_command, setup_monitor_parser
|
|
|
|
|
|
class TestMonitorCommands:
|
|
def test_pause_monitors_by_pattern(self, mock_client, mock_monitors, capsys):
|
|
"""Test pausing monitors by pattern"""
|
|
# Setup
|
|
mock_client.api.get_monitors.return_value = mock_monitors
|
|
mock_client.find_monitors_by_pattern.return_value = [
|
|
{"id": 1, "name": "Test Monitor 1"},
|
|
{"id": 2, "name": "Test Monitor 2"}
|
|
]
|
|
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
|
|
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.pause_monitors(monitor_patterns=["Test*"])
|
|
|
|
# Verify
|
|
mock_client.find_monitors_by_pattern.assert_called_once_with(["Test*"])
|
|
assert mock_client.api.pause_monitor.call_count == 2
|
|
mock_client.api.pause_monitor.assert_any_call(1)
|
|
mock_client.api.pause_monitor.assert_any_call(2)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Found 2 matching monitors to pause:" in captured.out
|
|
assert "Paused monitor 'Test Monitor 1' (ID: 1)" in captured.out
|
|
assert "Paused monitor 'Test Monitor 2' (ID: 2)" in captured.out
|
|
assert "Successfully paused 2 out of 2 monitors" in captured.out
|
|
|
|
def test_pause_monitors_by_group(self, mock_client, mock_monitors, capsys):
|
|
"""Test pausing monitors by group"""
|
|
# Setup
|
|
mock_client.get_monitors_in_groups.return_value = [
|
|
{"id": 4, "name": "Child Monitor"}
|
|
]
|
|
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
|
|
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.pause_monitors(group_patterns=["Group*"])
|
|
|
|
# Verify
|
|
mock_client.get_monitors_in_groups.assert_called_once_with(["Group*"])
|
|
mock_client.api.pause_monitor.assert_called_once_with(4)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Found 1 matching monitors to pause:" in captured.out
|
|
assert "Paused monitor 'Child Monitor' (ID: 4)" in captured.out
|
|
|
|
def test_pause_monitors_no_patterns(self, mock_client, capsys):
|
|
"""Test pausing monitors without patterns"""
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.pause_monitors()
|
|
|
|
# Verify
|
|
captured = capsys.readouterr()
|
|
assert "Error: Either --monitor or --group flag is required." in captured.out
|
|
|
|
def test_pause_monitors_no_matches(self, mock_client, capsys):
|
|
"""Test pausing monitors with no matches"""
|
|
# Setup
|
|
mock_client.find_monitors_by_pattern.return_value = []
|
|
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.pause_monitors(monitor_patterns=["NonExistent*"])
|
|
|
|
# Verify
|
|
captured = capsys.readouterr()
|
|
assert "Error: No monitors found matching the specified patterns or groups" in captured.out
|
|
|
|
def test_pause_monitors_api_error(self, mock_client, capsys):
|
|
"""Test pausing monitors with API error"""
|
|
# Setup
|
|
mock_client.find_monitors_by_pattern.return_value = [
|
|
{"id": 1, "name": "Test Monitor 1"}
|
|
]
|
|
mock_client.api.pause_monitor.side_effect = Exception("API Error")
|
|
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.pause_monitors(monitor_patterns=["Test*"])
|
|
|
|
# Verify
|
|
captured = capsys.readouterr()
|
|
assert "Failed to pause monitor 'Test Monitor 1': API Error" in captured.out
|
|
assert "Successfully paused 0 out of 1 monitors" in captured.out
|
|
|
|
def test_resume_monitors_by_pattern(self, mock_client, mock_monitors, capsys):
|
|
"""Test resuming monitors by pattern"""
|
|
# Setup
|
|
mock_client.find_monitors_by_pattern.return_value = [
|
|
{"id": 2, "name": "Test Monitor 2"}
|
|
]
|
|
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
|
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.resume_monitors(monitor_patterns=["Test*"])
|
|
|
|
# Verify
|
|
mock_client.api.resume_monitor.assert_called_once_with(2)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Found 1 matching monitors to resume:" in captured.out
|
|
assert "Resumed monitor 'Test Monitor 2' (ID: 2)" in captured.out
|
|
|
|
def test_resume_monitors_all_paused(self, mock_client, mock_monitors, capsys):
|
|
"""Test resuming all paused monitors"""
|
|
# Setup
|
|
mock_client.api.get_monitors.return_value = mock_monitors
|
|
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
|
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.resume_monitors(resume_all=True)
|
|
|
|
# Verify
|
|
# Should resume monitors with active=False (monitors 2 and 4)
|
|
assert mock_client.api.resume_monitor.call_count == 2
|
|
mock_client.api.resume_monitor.assert_any_call(2)
|
|
mock_client.api.resume_monitor.assert_any_call(4)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Found 2 paused monitors to resume:" in captured.out
|
|
assert "Successfully resumed 2 out of 2 monitors" in captured.out
|
|
|
|
def test_resume_monitors_all_no_paused(self, mock_client, capsys):
|
|
"""Test resuming all paused monitors when none are paused"""
|
|
# Setup
|
|
active_monitors = [
|
|
{"id": 1, "name": "Active Monitor", "active": True}
|
|
]
|
|
mock_client.api.get_monitors.return_value = active_monitors
|
|
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.resume_monitors(resume_all=True)
|
|
|
|
# Verify
|
|
captured = capsys.readouterr()
|
|
assert "No paused monitors found to resume" in captured.out
|
|
|
|
def test_resume_monitors_no_args(self, mock_client, capsys):
|
|
"""Test resuming monitors without any arguments"""
|
|
monitor_commands = MonitorCommands(mock_client)
|
|
|
|
# Execute
|
|
monitor_commands.resume_monitors()
|
|
|
|
# Verify
|
|
captured = capsys.readouterr()
|
|
assert "Error: Either --monitor, --group, or --all flag is required." in captured.out
|
|
|
|
|
|
class TestMonitorCommandHandler:
|
|
def test_handle_monitor_command_no_action(self, mock_client, capsys):
|
|
"""Test monitor command handler with no action"""
|
|
# Setup
|
|
mock_args = Mock()
|
|
mock_args.monitor_action = None
|
|
|
|
# Mock the parser setup to avoid importing issues
|
|
with patch('kumacli.cmd.monitor.setup_monitor_parser') as mock_setup:
|
|
mock_parser = Mock()
|
|
mock_setup._parser = mock_parser
|
|
|
|
# Execute
|
|
result = handle_monitor_command(mock_args, mock_client)
|
|
|
|
# Verify
|
|
assert result is False
|
|
|
|
def test_handle_monitor_command_pause(self, mock_client):
|
|
"""Test monitor command handler for pause action"""
|
|
# Setup
|
|
mock_args = Mock()
|
|
mock_args.monitor_action = "pause"
|
|
mock_args.monitor = ["test*"]
|
|
mock_args.group = None
|
|
|
|
mock_client.find_monitors_by_pattern.return_value = [
|
|
{"id": 1, "name": "test monitor"}
|
|
]
|
|
mock_client.api.pause_monitor.return_value = {"msg": "Paused Successfully."}
|
|
|
|
# Execute
|
|
result = handle_monitor_command(mock_args, mock_client)
|
|
|
|
# Verify
|
|
assert result is True
|
|
mock_client.api.pause_monitor.assert_called_once_with(1)
|
|
|
|
def test_handle_monitor_command_resume(self, mock_client):
|
|
"""Test monitor command handler for resume action"""
|
|
# Setup
|
|
mock_args = Mock()
|
|
mock_args.monitor_action = "resume"
|
|
mock_args.monitor = ["test*"]
|
|
mock_args.group = None
|
|
mock_args.all = False
|
|
|
|
mock_client.find_monitors_by_pattern.return_value = [
|
|
{"id": 1, "name": "test monitor"}
|
|
]
|
|
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
|
|
|
# Execute
|
|
result = handle_monitor_command(mock_args, mock_client)
|
|
|
|
# Verify
|
|
assert result is True
|
|
mock_client.api.resume_monitor.assert_called_once_with(1)
|
|
|
|
def test_handle_monitor_command_resume_all(self, mock_client, mock_monitors):
|
|
"""Test monitor command handler for resume all action"""
|
|
# Setup
|
|
mock_args = Mock()
|
|
mock_args.monitor_action = "resume"
|
|
mock_args.monitor = None
|
|
mock_args.group = None
|
|
mock_args.all = True
|
|
|
|
mock_client.api.get_monitors.return_value = mock_monitors
|
|
mock_client.api.resume_monitor.return_value = {"msg": "Resumed Successfully."}
|
|
|
|
# Execute
|
|
result = handle_monitor_command(mock_args, mock_client)
|
|
|
|
# Verify
|
|
assert result is True
|
|
# Should resume paused monitors (monitors 2 and 4)
|
|
assert mock_client.api.resume_monitor.call_count == 2
|
|
|
|
def test_handle_monitor_command_unknown_action(self, mock_client, capsys):
|
|
"""Test monitor command handler with unknown action"""
|
|
# Setup
|
|
mock_args = Mock()
|
|
mock_args.monitor_action = "unknown"
|
|
|
|
# Execute
|
|
result = handle_monitor_command(mock_args, mock_client)
|
|
|
|
# Verify
|
|
assert result is False
|
|
captured = capsys.readouterr()
|
|
assert "Unknown monitor action. Use --help for usage information." in captured.out |