Add parallelism

This commit is contained in:
Pim van Pelt
2025-07-07 01:08:42 +02:00
parent c6775736ac
commit 53c7bca43e
3 changed files with 87 additions and 17 deletions

View File

@ -7,6 +7,7 @@ import (
"log"
"os"
"path/filepath"
"sync"
"github.com/spf13/cobra"
)
@ -21,6 +22,29 @@ const Version = "1.2.4"
// SSH helper functions are now in ssh.go
// processDevice handles backup processing for a single device
func processDevice(hostname string, deviceConfig Device, commands []string, excludePatterns []string, password, keyFile string, port int, outputDir string) bool {
// Create backup instance
backup := NewRouterBackup(hostname, deviceConfig.Address, deviceConfig.User, password, keyFile, port)
// Connect and backup
if err := backup.Connect(); err != nil {
fmt.Printf("%s: Failed to connect: %v\n", hostname, err)
return false
}
err := backup.BackupCommands(commands, excludePatterns, outputDir)
backup.Disconnect()
if err != nil {
fmt.Printf("%s: Backup failed: %v\n", hostname, err)
return false
} else {
fmt.Printf("%s: Backup completed\n", hostname)
return true
}
}
func main() {
var yamlFiles []string
var password string
@ -28,6 +52,7 @@ func main() {
var port int
var outputDir string
var hostFilter []string
var parallel int
var rootCmd = &cobra.Command{
Use: "ipng-router-backup",
@ -93,12 +118,41 @@ func main() {
}
}
successCount := 0
totalCount := len(devicesToProcess)
for hostname, deviceConfig := range devicesToProcess {
fmt.Printf("\n%s: Processing device (type: %s)\n", hostname, deviceConfig.Type)
// Create channels for work distribution and result collection
type DeviceWork struct {
hostname string
deviceConfig Device
commands []string
excludePatterns []string
}
type DeviceResult struct {
hostname string
success bool
}
workChan := make(chan DeviceWork, totalCount)
resultChan := make(chan DeviceResult, totalCount)
// Start worker pool
var wg sync.WaitGroup
for i := 0; i < parallel; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for work := range workChan {
fmt.Printf("%s: Processing device (type: %s)\n", work.hostname, work.deviceConfig.Type)
success := processDevice(work.hostname, work.deviceConfig, work.commands, work.excludePatterns, password, keyFile, port, outputDir)
resultChan <- DeviceResult{hostname: work.hostname, success: success}
}
}()
}
// Queue all work
for hostname, deviceConfig := range devicesToProcess {
user := deviceConfig.User
commands := deviceConfig.Commands
deviceType := deviceConfig.Type
@ -122,27 +176,30 @@ func main() {
continue
}
// Create backup instance
backup := NewRouterBackup(hostname, deviceConfig.Address, user, password, keyFile, port)
// Connect and backup
if err := backup.Connect(); err != nil {
fmt.Printf("%s: Failed to connect: %v\n", hostname, err)
continue
workChan <- DeviceWork{
hostname: hostname,
deviceConfig: deviceConfig,
commands: commands,
excludePatterns: excludePatterns,
}
}
close(workChan)
err = backup.BackupCommands(commands, excludePatterns, outputDir)
backup.Disconnect()
// Wait for all workers to finish
go func() {
wg.Wait()
close(resultChan)
}()
if err != nil {
fmt.Printf("%s: Backup failed: %v\n", hostname, err)
} else {
fmt.Printf("%s: Backup completed\n", hostname)
// Collect results
successCount := 0
for result := range resultChan {
if result.success {
successCount++
}
}
fmt.Printf("\nOverall summary: %d/%d devices processed successfully\n", successCount, totalCount)
fmt.Printf("Overall summary: %d/%d devices processed successfully\n", successCount, totalCount)
// Set exit code based on results
if successCount == 0 {
@ -160,6 +217,7 @@ func main() {
rootCmd.Flags().IntVar(&port, "port", 22, "SSH port")
rootCmd.Flags().StringVar(&outputDir, "output-dir", "/tmp", "Output directory for command output files")
rootCmd.Flags().StringSliceVar(&hostFilter, "host", []string{}, "Specific host(s) to process (can be repeated, processes all if not specified)")
rootCmd.Flags().IntVar(&parallel, "parallel", 10, "Maximum number of devices to process in parallel")
rootCmd.MarkFlagRequired("yaml")