feat(agent): add ProbeManager with ICMP/TCP/HTTP probes and handlers

Implements the core probe execution engine (ProbeManager) that runs
network probes on configurable intervals, collects latency samples,
and aggregates results over a 60s sliding window. Adds two new
WebSocket handlers (SyncNetworkProbes, GetNetworkProbeResults) for
hub-agent communication and integrates probe lifecycle into the agent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
xiaomiku01
2026-04-11 00:26:02 +08:00
parent a42d899e64
commit 865e6db90f
5 changed files with 306 additions and 0 deletions

View File

@@ -48,6 +48,7 @@ type Agent struct {
keys []gossh.PublicKey // SSH public keys keys []gossh.PublicKey // SSH public keys
smartManager *SmartManager // Manages SMART data smartManager *SmartManager // Manages SMART data
systemdManager *systemdManager // Manages systemd services systemdManager *systemdManager // Manages systemd services
probeManager *ProbeManager // Manages network probes
} }
// NewAgent creates a new agent with the given data directory for persisting data. // NewAgent creates a new agent with the given data directory for persisting data.
@@ -121,6 +122,9 @@ func NewAgent(dataDir ...string) (agent *Agent, err error) {
// initialize handler registry // initialize handler registry
agent.handlerRegistry = NewHandlerRegistry() agent.handlerRegistry = NewHandlerRegistry()
// initialize probe manager
agent.probeManager = newProbeManager()
// initialize disk info // initialize disk info
agent.initializeDiskInfo() agent.initializeDiskInfo()

View File

@@ -112,6 +112,7 @@ func (c *ConnectionManager) Start(serverOptions ServerOptions) error {
case <-sigCtx.Done(): case <-sigCtx.Done():
slog.Info("Shutting down", "cause", context.Cause(sigCtx)) slog.Info("Shutting down", "cause", context.Cause(sigCtx))
_ = c.agent.StopServer() _ = c.agent.StopServer()
c.agent.probeManager.Stop()
c.closeWebSocket() c.closeWebSocket()
return health.CleanUp() return health.CleanUp()
} }

View File

@@ -7,6 +7,7 @@ import (
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
"github.com/henrygd/beszel/internal/common" "github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/smart" "github.com/henrygd/beszel/internal/entities/smart"
"log/slog" "log/slog"
@@ -51,6 +52,8 @@ func NewHandlerRegistry() *HandlerRegistry {
registry.Register(common.GetContainerInfo, &GetContainerInfoHandler{}) registry.Register(common.GetContainerInfo, &GetContainerInfoHandler{})
registry.Register(common.GetSmartData, &GetSmartDataHandler{}) registry.Register(common.GetSmartData, &GetSmartDataHandler{})
registry.Register(common.GetSystemdInfo, &GetSystemdInfoHandler{}) registry.Register(common.GetSystemdInfo, &GetSystemdInfoHandler{})
registry.Register(common.SyncNetworkProbes, &SyncNetworkProbesHandler{})
registry.Register(common.GetNetworkProbeResults, &GetNetworkProbeResultsHandler{})
return registry return registry
} }
@@ -203,3 +206,30 @@ func (h *GetSystemdInfoHandler) Handle(hctx *HandlerContext) error {
return hctx.SendResponse(details, hctx.RequestID) return hctx.SendResponse(details, hctx.RequestID)
} }
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// SyncNetworkProbesHandler handles probe configuration sync from hub
type SyncNetworkProbesHandler struct{}
func (h *SyncNetworkProbesHandler) Handle(hctx *HandlerContext) error {
var configs []probe.Config
if err := cbor.Unmarshal(hctx.Request.Data, &configs); err != nil {
return err
}
hctx.Agent.probeManager.SyncProbes(configs)
slog.Info("network probes synced", "count", len(configs))
return hctx.SendResponse("ok", hctx.RequestID)
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// GetNetworkProbeResultsHandler handles probe results request from hub
type GetNetworkProbeResultsHandler struct{}
func (h *GetNetworkProbeResultsHandler) Handle(hctx *HandlerContext) error {
results := hctx.Agent.probeManager.GetResults()
return hctx.SendResponse(results, hctx.RequestID)
}

227
agent/probe.go Normal file
View File

@@ -0,0 +1,227 @@
package agent
import (
"fmt"
"math"
"net"
"net/http"
"sync"
"time"
"github.com/henrygd/beszel/internal/entities/probe"
"log/slog"
)
// ProbeManager manages network probe tasks.
type ProbeManager struct {
mu sync.RWMutex
probes map[string]*probeTask // key = probe.Config.Key()
}
type probeTask struct {
config probe.Config
cancel chan struct{}
mu sync.Mutex
samples []probeSample
}
type probeSample struct {
latencyMs float64 // -1 means loss
timestamp time.Time
}
func newProbeManager() *ProbeManager {
return &ProbeManager{
probes: make(map[string]*probeTask),
}
}
// SyncProbes replaces all probe tasks with the given configs.
func (pm *ProbeManager) SyncProbes(configs []probe.Config) {
pm.mu.Lock()
defer pm.mu.Unlock()
// Build set of new keys
newKeys := make(map[string]probe.Config, len(configs))
for _, cfg := range configs {
newKeys[cfg.Key()] = cfg
}
// Stop removed probes
for key, task := range pm.probes {
if _, exists := newKeys[key]; !exists {
close(task.cancel)
delete(pm.probes, key)
}
}
// Start new probes (skip existing ones with same key)
for key, cfg := range newKeys {
if _, exists := pm.probes[key]; exists {
continue
}
task := &probeTask{
config: cfg,
cancel: make(chan struct{}),
samples: make([]probeSample, 0, 64),
}
pm.probes[key] = task
go pm.runProbe(task)
}
}
// GetResults returns aggregated results for all probes over the last 60s window.
func (pm *ProbeManager) GetResults() map[string]probe.Result {
pm.mu.RLock()
defer pm.mu.RUnlock()
results := make(map[string]probe.Result, len(pm.probes))
cutoff := time.Now().Add(-60 * time.Second)
for key, task := range pm.probes {
task.mu.Lock()
var sum, minMs, maxMs float64
var count, lossCount int
minMs = math.MaxFloat64
for _, s := range task.samples {
if s.timestamp.Before(cutoff) {
continue
}
count++
if s.latencyMs < 0 {
lossCount++
continue
}
sum += s.latencyMs
if s.latencyMs < minMs {
minMs = s.latencyMs
}
if s.latencyMs > maxMs {
maxMs = s.latencyMs
}
}
task.mu.Unlock()
if count == 0 {
continue
}
successCount := count - lossCount
var avg float64
if successCount > 0 {
avg = math.Round(sum/float64(successCount)*100) / 100
}
if minMs == math.MaxFloat64 {
minMs = 0
}
results[key] = probe.Result{
AvgMs: avg,
MinMs: math.Round(minMs*100) / 100,
MaxMs: math.Round(maxMs*100) / 100,
Loss: math.Round(float64(lossCount)/float64(count)*10000) / 100,
}
}
return results
}
// Stop stops all probe tasks.
func (pm *ProbeManager) Stop() {
pm.mu.Lock()
defer pm.mu.Unlock()
for key, task := range pm.probes {
close(task.cancel)
delete(pm.probes, key)
}
}
// runProbe executes a single probe task in a loop.
func (pm *ProbeManager) runProbe(task *probeTask) {
interval := time.Duration(task.config.Interval) * time.Second
if interval < time.Second {
interval = 10 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
// Run immediately on start
pm.executeProbe(task)
for {
select {
case <-task.cancel:
return
case <-ticker.C:
pm.executeProbe(task)
}
}
}
func (pm *ProbeManager) executeProbe(task *probeTask) {
var latencyMs float64
switch task.config.Protocol {
case "icmp":
latencyMs = probeICMP(task.config.Target)
case "tcp":
latencyMs = probeTCP(task.config.Target, task.config.Port)
case "http":
latencyMs = probeHTTP(task.config.Target)
default:
slog.Warn("unknown probe protocol", "protocol", task.config.Protocol)
return
}
sample := probeSample{
latencyMs: latencyMs,
timestamp: time.Now(),
}
task.mu.Lock()
// Trim old samples beyond 120s to bound memory
cutoff := time.Now().Add(-120 * time.Second)
start := 0
for i, s := range task.samples {
if s.timestamp.After(cutoff) {
start = i
break
}
if i == len(task.samples)-1 {
start = len(task.samples)
}
}
if start > 0 {
task.samples = task.samples[start:]
}
task.samples = append(task.samples, sample)
task.mu.Unlock()
}
// probeTCP measures TCP connection latency. Returns -1 on failure.
func probeTCP(target string, port uint16) float64 {
addr := net.JoinHostPort(target, fmt.Sprintf("%d", port))
start := time.Now()
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
return -1
}
conn.Close()
return float64(time.Since(start).Microseconds()) / 1000.0
}
// probeHTTP measures HTTP GET request latency. Returns -1 on failure.
func probeHTTP(url string) float64 {
client := &http.Client{Timeout: 10 * time.Second}
start := time.Now()
resp, err := client.Get(url)
if err != nil {
return -1
}
resp.Body.Close()
if resp.StatusCode >= 400 {
return -1
}
return float64(time.Since(start).Microseconds()) / 1000.0
}

44
agent/probe_ping.go Normal file
View File

@@ -0,0 +1,44 @@
package agent
import (
"os/exec"
"regexp"
"runtime"
"strconv"
"time"
)
var pingTimeRegex = regexp.MustCompile(`time[=<]([\d.]+)\s*ms`)
// probeICMP executes system ping command and parses latency. Returns -1 on failure.
func probeICMP(target string) float64 {
var cmd *exec.Cmd
switch runtime.GOOS {
case "windows":
cmd = exec.Command("ping", "-n", "1", "-w", "3000", target)
default: // linux, darwin, freebsd
cmd = exec.Command("ping", "-c", "1", "-W", "3", target)
}
start := time.Now()
output, err := cmd.Output()
if err != nil {
// If ping fails but we got output, still try to parse
if len(output) == 0 {
return -1
}
}
matches := pingTimeRegex.FindSubmatch(output)
if len(matches) >= 2 {
if ms, err := strconv.ParseFloat(string(matches[1]), 64); err == nil {
return ms
}
}
// Fallback: use wall clock time if ping succeeded but parsing failed
if err == nil {
return float64(time.Since(start).Microseconds()) / 1000.0
}
return -1
}