package agent import ( "fmt" "math" "net" "net/http" "sync" "time" "github.com/henrygd/beszel/internal/entities/probe" "log/slog" ) // ProbeManager manages network probe tasks. type ProbeManager struct { mu sync.RWMutex probes map[string]*probeTask // key = probe.Config.Key() httpClient *http.Client } type probeTask struct { config probe.Config cancel chan struct{} mu sync.Mutex samples []probeSample } type probeSample struct { latencyMs float64 // -1 means loss timestamp time.Time } func newProbeManager() *ProbeManager { return &ProbeManager{ probes: make(map[string]*probeTask), httpClient: &http.Client{Timeout: 10 * time.Second}, } } // SyncProbes replaces all probe tasks with the given configs. func (pm *ProbeManager) SyncProbes(configs []probe.Config) { pm.mu.Lock() defer pm.mu.Unlock() // Build set of new keys newKeys := make(map[string]probe.Config, len(configs)) for _, cfg := range configs { newKeys[cfg.Key()] = cfg } // Stop removed probes for key, task := range pm.probes { if _, exists := newKeys[key]; !exists { close(task.cancel) delete(pm.probes, key) } } // Start new probes (skip existing ones with same key) for key, cfg := range newKeys { if _, exists := pm.probes[key]; exists { continue } task := &probeTask{ config: cfg, cancel: make(chan struct{}), samples: make([]probeSample, 0, 64), } pm.probes[key] = task go pm.runProbe(task) } } // GetResults returns aggregated results for all probes over the last 60s window. func (pm *ProbeManager) GetResults() map[string]probe.Result { pm.mu.RLock() defer pm.mu.RUnlock() results := make(map[string]probe.Result, len(pm.probes)) cutoff := time.Now().Add(-60 * time.Second) for key, task := range pm.probes { task.mu.Lock() var sum, minMs, maxMs float64 var count, lossCount int minMs = math.MaxFloat64 for _, s := range task.samples { if s.timestamp.Before(cutoff) { continue } count++ if s.latencyMs < 0 { lossCount++ continue } sum += s.latencyMs if s.latencyMs < minMs { minMs = s.latencyMs } if s.latencyMs > maxMs { maxMs = s.latencyMs } } task.mu.Unlock() if count == 0 { continue } successCount := count - lossCount var avg float64 if successCount > 0 { avg = math.Round(sum/float64(successCount)*100) / 100 } if minMs == math.MaxFloat64 { minMs = 0 } results[key] = probe.Result{ AvgMs: avg, MinMs: math.Round(minMs*100) / 100, MaxMs: math.Round(maxMs*100) / 100, Loss: math.Round(float64(lossCount)/float64(count)*10000) / 100, } } return results } // Stop stops all probe tasks. func (pm *ProbeManager) Stop() { pm.mu.Lock() defer pm.mu.Unlock() for key, task := range pm.probes { close(task.cancel) delete(pm.probes, key) } } // runProbe executes a single probe task in a loop. func (pm *ProbeManager) runProbe(task *probeTask) { interval := time.Duration(task.config.Interval) * time.Second if interval < time.Second { interval = 10 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() // Run immediately on start pm.executeProbe(task) for { select { case <-task.cancel: return case <-ticker.C: pm.executeProbe(task) } } } func (pm *ProbeManager) executeProbe(task *probeTask) { var latencyMs float64 switch task.config.Protocol { case "icmp": latencyMs = probeICMP(task.config.Target) case "tcp": latencyMs = probeTCP(task.config.Target, task.config.Port) case "http": latencyMs = probeHTTP(pm.httpClient, task.config.Target) default: slog.Warn("unknown probe protocol", "protocol", task.config.Protocol) return } sample := probeSample{ latencyMs: latencyMs, timestamp: time.Now(), } task.mu.Lock() // Trim old samples beyond 120s to bound memory cutoff := time.Now().Add(-120 * time.Second) start := 0 for i, s := range task.samples { if s.timestamp.After(cutoff) { start = i break } if i == len(task.samples)-1 { start = len(task.samples) } } if start > 0 { task.samples = task.samples[start:] } task.samples = append(task.samples, sample) task.mu.Unlock() } // probeTCP measures TCP connection latency. Returns -1 on failure. func probeTCP(target string, port uint16) float64 { addr := net.JoinHostPort(target, fmt.Sprintf("%d", port)) start := time.Now() conn, err := net.DialTimeout("tcp", addr, 3*time.Second) if err != nil { return -1 } conn.Close() return float64(time.Since(start).Microseconds()) / 1000.0 } // probeHTTP measures HTTP GET request latency. Returns -1 on failure. func probeHTTP(client *http.Client, url string) float64 { start := time.Now() resp, err := client.Get(url) if err != nil { return -1 } resp.Body.Close() if resp.StatusCode >= 400 { return -1 } return float64(time.Since(start).Microseconds()) / 1000.0 }