Files
router-backup/router_backup.py
2025-07-05 23:29:59 +00:00

236 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
SSH Router Backup Script
Connects to routers via SSH and runs commands, saving output to local files.
"""
import paramiko
import sys
import os
from datetime import datetime
import argparse
import json
import yaml
class RouterBackup:
def __init__(self, hostname, username, password=None, key_file=None, port=22):
self.hostname = hostname
self.username = username
self.password = password
self.key_file = key_file
self.port = port
self.client = None
def connect(self):
"""Establish SSH connection to the router"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.key_file:
self.client.connect(
hostname=self.hostname,
username=self.username,
key_filename=self.key_file,
port=self.port,
timeout=30
)
elif os.environ.get('SSH_AUTH_SOCK'):
# Use SSH agent if available
self.client.connect(
hostname=self.hostname,
username=self.username,
port=self.port,
timeout=30
)
else:
self.client.connect(
hostname=self.hostname,
username=self.username,
password=self.password,
port=self.port,
timeout=30
)
print(f"Successfully connected to {self.hostname}")
return True
except Exception as e:
print(f"Failed to connect to {self.hostname}: {e}")
return False
def run_command(self, command):
"""Execute a command on the router and return the output"""
if not self.client:
print("No active connection")
return None
try:
stdin, stdout, stderr = self.client.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
if error:
print(f"Error executing '{command}': {error}")
return None
return output
except Exception as e:
print(f"Failed to execute command '{command}': {e}")
return None
def backup_commands(self, commands, output_dir="backup"):
"""Run multiple commands and save outputs to files"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_info = {
"hostname": self.hostname,
"timestamp": timestamp,
"commands": {}
}
# Truncate output file at start
filename = f"{self.hostname}.txt"
filepath = os.path.join(output_dir, filename)
open(filepath, 'w').close()
for i, command in enumerate(commands):
print(f"Running command {i+1}/{len(commands)}: {command}")
output = self.run_command(command)
if output:
# Use hostname as filename
filename = f"{self.hostname}.txt"
filepath = os.path.join(output_dir, filename)
with open(filepath, 'a') as f:
f.write(f"## COMMAND: {command}\n")
f.write(output)
backup_info["commands"][command] = {
"filename": filename,
"success": True,
"output_length": len(output)
}
print(f"Output saved to {filepath}")
else:
backup_info["commands"][command] = {
"filename": None,
"success": False,
"output_length": 0
}
# Save backup info
info_file = os.path.join(output_dir, f"{self.hostname}_{timestamp}_info.json")
with open(info_file, 'w') as f:
json.dump(backup_info, f, indent=2)
return backup_info
def disconnect(self):
"""Close SSH connection"""
if self.client:
self.client.close()
print(f"Disconnected from {self.hostname}")
def main():
parser = argparse.ArgumentParser(description='SSH Router Backup Tool')
parser.add_argument('--config', required=True, help='YAML configuration file path')
parser.add_argument('--password', help='SSH password')
parser.add_argument('--key-file', help='SSH private key file path')
parser.add_argument('--port', type=int, default=22, help='SSH port (default: 22)')
parser.add_argument('--git-repo', default='/tmp', help='Git repository directory for command output files (default: /tmp)')
args = parser.parse_args()
# Load configuration
try:
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
print(f"Failed to load config file {args.config}: {e}")
sys.exit(1)
# Use SSH key by default, fall back to password
if not args.password and not args.key_file:
# Check if SSH agent is available first
if os.environ.get('SSH_AUTH_SOCK'):
print("Using SSH agent for authentication")
else:
# Try default SSH key locations
default_keys = [
os.path.expanduser("~/.ssh/id_rsa"),
os.path.expanduser("~/.ssh/id_ed25519"),
os.path.expanduser("~/.ssh/id_ecdsa")
]
for key_path in default_keys:
if os.path.exists(key_path):
args.key_file = key_path
print(f"Using SSH key: {key_path}")
break
# If no key found and no SSH agent, prompt for password
if not args.key_file:
import getpass
args.password = getpass.getpass("No SSH key found. Enter SSH password: ")
# Process each device in config
devices = config.get('devices', {})
if not devices:
print("No devices found in config file")
sys.exit(1)
types = config.get('types', {})
success_count = 0
total_count = len(devices)
for hostname, device_config in devices.items():
print(f"\nProcessing device: {hostname}")
user = device_config.get('user')
commands = device_config.get('commands', [])
device_type = device_config.get('type')
# If device has a type, get commands from types section
if device_type and device_type in types:
commands = types[device_type].get('commands', [])
if not user:
print(f"No user specified for {hostname}, skipping")
continue
if not commands:
print(f"No commands specified for {hostname}, skipping")
continue
# Create backup instance
backup = RouterBackup(
hostname=hostname,
username=user,
password=args.password,
key_file=args.key_file,
port=args.port
)
# Connect and backup
if backup.connect():
try:
backup_info = backup.backup_commands(commands, args.git_repo)
print(f"Backup completed for {hostname}")
print(f"Summary: {sum(1 for cmd in backup_info['commands'].values() if cmd['success'])}/{len(commands)} commands successful")
success_count += 1
finally:
backup.disconnect()
else:
print(f"Failed to connect to {hostname}")
print(f"\nOverall summary: {success_count}/{total_count} devices processed successfully")
if __name__ == "__main__":
main()