From 865e6db90fd03b1a69fb27501b5fb29ded5d4314 Mon Sep 17 00:00:00 2001 From: xiaomiku01 Date: Sat, 11 Apr 2026 00:26:02 +0800 Subject: [PATCH] feat(agent): add ProbeManager with ICMP/TCP/HTTP probes and handlers Implements the core probe execution engine (ProbeManager) that runs network probes on configurable intervals, collects latency samples, and aggregates results over a 60s sliding window. Adds two new WebSocket handlers (SyncNetworkProbes, GetNetworkProbeResults) for hub-agent communication and integrates probe lifecycle into the agent. Co-Authored-By: Claude Opus 4.6 (1M context) --- agent/agent.go | 4 + agent/connection_manager.go | 1 + agent/handlers.go | 30 +++++ agent/probe.go | 227 ++++++++++++++++++++++++++++++++++++ agent/probe_ping.go | 44 +++++++ 5 files changed, 306 insertions(+) create mode 100644 agent/probe.go create mode 100644 agent/probe_ping.go diff --git a/agent/agent.go b/agent/agent.go index e181aac6..7a774378 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -48,6 +48,7 @@ type Agent struct { keys []gossh.PublicKey // SSH public keys smartManager *SmartManager // Manages SMART data systemdManager *systemdManager // Manages systemd services + probeManager *ProbeManager // Manages network probes } // NewAgent creates a new agent with the given data directory for persisting data. @@ -121,6 +122,9 @@ func NewAgent(dataDir ...string) (agent *Agent, err error) { // initialize handler registry agent.handlerRegistry = NewHandlerRegistry() + // initialize probe manager + agent.probeManager = newProbeManager() + // initialize disk info agent.initializeDiskInfo() diff --git a/agent/connection_manager.go b/agent/connection_manager.go index fc91a9b1..54b35d2a 100644 --- a/agent/connection_manager.go +++ b/agent/connection_manager.go @@ -112,6 +112,7 @@ func (c *ConnectionManager) Start(serverOptions ServerOptions) error { case <-sigCtx.Done(): slog.Info("Shutting down", "cause", context.Cause(sigCtx)) _ = c.agent.StopServer() + c.agent.probeManager.Stop() c.closeWebSocket() return health.CleanUp() } diff --git a/agent/handlers.go b/agent/handlers.go index 2db07d31..f46aebdf 100644 --- a/agent/handlers.go +++ b/agent/handlers.go @@ -7,6 +7,7 @@ import ( "github.com/fxamacker/cbor/v2" "github.com/henrygd/beszel/internal/common" + "github.com/henrygd/beszel/internal/entities/probe" "github.com/henrygd/beszel/internal/entities/smart" "log/slog" @@ -51,6 +52,8 @@ func NewHandlerRegistry() *HandlerRegistry { registry.Register(common.GetContainerInfo, &GetContainerInfoHandler{}) registry.Register(common.GetSmartData, &GetSmartDataHandler{}) registry.Register(common.GetSystemdInfo, &GetSystemdInfoHandler{}) + registry.Register(common.SyncNetworkProbes, &SyncNetworkProbesHandler{}) + registry.Register(common.GetNetworkProbeResults, &GetNetworkProbeResultsHandler{}) return registry } @@ -203,3 +206,30 @@ func (h *GetSystemdInfoHandler) Handle(hctx *HandlerContext) error { return hctx.SendResponse(details, hctx.RequestID) } + +//////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////// + +// SyncNetworkProbesHandler handles probe configuration sync from hub +type SyncNetworkProbesHandler struct{} + +func (h *SyncNetworkProbesHandler) Handle(hctx *HandlerContext) error { + var configs []probe.Config + if err := cbor.Unmarshal(hctx.Request.Data, &configs); err != nil { + return err + } + hctx.Agent.probeManager.SyncProbes(configs) + slog.Info("network probes synced", "count", len(configs)) + return hctx.SendResponse("ok", hctx.RequestID) +} + +//////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////// + +// GetNetworkProbeResultsHandler handles probe results request from hub +type GetNetworkProbeResultsHandler struct{} + +func (h *GetNetworkProbeResultsHandler) Handle(hctx *HandlerContext) error { + results := hctx.Agent.probeManager.GetResults() + return hctx.SendResponse(results, hctx.RequestID) +} diff --git a/agent/probe.go b/agent/probe.go new file mode 100644 index 00000000..b50735ea --- /dev/null +++ b/agent/probe.go @@ -0,0 +1,227 @@ +package agent + +import ( + "fmt" + "math" + "net" + "net/http" + "sync" + "time" + + "github.com/henrygd/beszel/internal/entities/probe" + "log/slog" +) + +// ProbeManager manages network probe tasks. +type ProbeManager struct { + mu sync.RWMutex + probes map[string]*probeTask // key = probe.Config.Key() +} + +type probeTask struct { + config probe.Config + cancel chan struct{} + mu sync.Mutex + samples []probeSample +} + +type probeSample struct { + latencyMs float64 // -1 means loss + timestamp time.Time +} + +func newProbeManager() *ProbeManager { + return &ProbeManager{ + probes: make(map[string]*probeTask), + } +} + +// SyncProbes replaces all probe tasks with the given configs. +func (pm *ProbeManager) SyncProbes(configs []probe.Config) { + pm.mu.Lock() + defer pm.mu.Unlock() + + // Build set of new keys + newKeys := make(map[string]probe.Config, len(configs)) + for _, cfg := range configs { + newKeys[cfg.Key()] = cfg + } + + // Stop removed probes + for key, task := range pm.probes { + if _, exists := newKeys[key]; !exists { + close(task.cancel) + delete(pm.probes, key) + } + } + + // Start new probes (skip existing ones with same key) + for key, cfg := range newKeys { + if _, exists := pm.probes[key]; exists { + continue + } + task := &probeTask{ + config: cfg, + cancel: make(chan struct{}), + samples: make([]probeSample, 0, 64), + } + pm.probes[key] = task + go pm.runProbe(task) + } +} + +// GetResults returns aggregated results for all probes over the last 60s window. +func (pm *ProbeManager) GetResults() map[string]probe.Result { + pm.mu.RLock() + defer pm.mu.RUnlock() + + results := make(map[string]probe.Result, len(pm.probes)) + cutoff := time.Now().Add(-60 * time.Second) + + for key, task := range pm.probes { + task.mu.Lock() + var sum, minMs, maxMs float64 + var count, lossCount int + minMs = math.MaxFloat64 + + for _, s := range task.samples { + if s.timestamp.Before(cutoff) { + continue + } + count++ + if s.latencyMs < 0 { + lossCount++ + continue + } + sum += s.latencyMs + if s.latencyMs < minMs { + minMs = s.latencyMs + } + if s.latencyMs > maxMs { + maxMs = s.latencyMs + } + } + task.mu.Unlock() + + if count == 0 { + continue + } + + successCount := count - lossCount + var avg float64 + if successCount > 0 { + avg = math.Round(sum/float64(successCount)*100) / 100 + } + if minMs == math.MaxFloat64 { + minMs = 0 + } + + results[key] = probe.Result{ + AvgMs: avg, + MinMs: math.Round(minMs*100) / 100, + MaxMs: math.Round(maxMs*100) / 100, + Loss: math.Round(float64(lossCount)/float64(count)*10000) / 100, + } + } + + return results +} + +// Stop stops all probe tasks. +func (pm *ProbeManager) Stop() { + pm.mu.Lock() + defer pm.mu.Unlock() + for key, task := range pm.probes { + close(task.cancel) + delete(pm.probes, key) + } +} + +// runProbe executes a single probe task in a loop. +func (pm *ProbeManager) runProbe(task *probeTask) { + interval := time.Duration(task.config.Interval) * time.Second + if interval < time.Second { + interval = 10 * time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + + // Run immediately on start + pm.executeProbe(task) + + for { + select { + case <-task.cancel: + return + case <-ticker.C: + pm.executeProbe(task) + } + } +} + +func (pm *ProbeManager) executeProbe(task *probeTask) { + var latencyMs float64 + + switch task.config.Protocol { + case "icmp": + latencyMs = probeICMP(task.config.Target) + case "tcp": + latencyMs = probeTCP(task.config.Target, task.config.Port) + case "http": + latencyMs = probeHTTP(task.config.Target) + default: + slog.Warn("unknown probe protocol", "protocol", task.config.Protocol) + return + } + + sample := probeSample{ + latencyMs: latencyMs, + timestamp: time.Now(), + } + + task.mu.Lock() + // Trim old samples beyond 120s to bound memory + cutoff := time.Now().Add(-120 * time.Second) + start := 0 + for i, s := range task.samples { + if s.timestamp.After(cutoff) { + start = i + break + } + if i == len(task.samples)-1 { + start = len(task.samples) + } + } + if start > 0 { + task.samples = task.samples[start:] + } + task.samples = append(task.samples, sample) + task.mu.Unlock() +} + +// probeTCP measures TCP connection latency. Returns -1 on failure. +func probeTCP(target string, port uint16) float64 { + addr := net.JoinHostPort(target, fmt.Sprintf("%d", port)) + start := time.Now() + conn, err := net.DialTimeout("tcp", addr, 3*time.Second) + if err != nil { + return -1 + } + conn.Close() + return float64(time.Since(start).Microseconds()) / 1000.0 +} + +// probeHTTP measures HTTP GET request latency. Returns -1 on failure. +func probeHTTP(url string) float64 { + client := &http.Client{Timeout: 10 * time.Second} + start := time.Now() + resp, err := client.Get(url) + if err != nil { + return -1 + } + resp.Body.Close() + if resp.StatusCode >= 400 { + return -1 + } + return float64(time.Since(start).Microseconds()) / 1000.0 +} diff --git a/agent/probe_ping.go b/agent/probe_ping.go new file mode 100644 index 00000000..64a961e3 --- /dev/null +++ b/agent/probe_ping.go @@ -0,0 +1,44 @@ +package agent + +import ( + "os/exec" + "regexp" + "runtime" + "strconv" + "time" +) + +var pingTimeRegex = regexp.MustCompile(`time[=<]([\d.]+)\s*ms`) + +// probeICMP executes system ping command and parses latency. Returns -1 on failure. +func probeICMP(target string) float64 { + var cmd *exec.Cmd + switch runtime.GOOS { + case "windows": + cmd = exec.Command("ping", "-n", "1", "-w", "3000", target) + default: // linux, darwin, freebsd + cmd = exec.Command("ping", "-c", "1", "-W", "3", target) + } + + start := time.Now() + output, err := cmd.Output() + if err != nil { + // If ping fails but we got output, still try to parse + if len(output) == 0 { + return -1 + } + } + + matches := pingTimeRegex.FindSubmatch(output) + if len(matches) >= 2 { + if ms, err := strconv.ParseFloat(string(matches[1]), 64); err == nil { + return ms + } + } + + // Fallback: use wall clock time if ping succeeded but parsing failed + if err == nil { + return float64(time.Since(start).Microseconds()) / 1000.0 + } + return -1 +}