From 0378023b6f4a672d89d8b8c3868b1eb980679828 Mon Sep 17 00:00:00 2001 From: henrygd Date: Sun, 26 Apr 2026 13:37:33 -0400 Subject: [PATCH] update --- agent/handlers.go | 2 +- agent/probe.go | 27 ++++++++++++++++------ agent/probe_test.go | 46 +++++++++++++++++++++++++++++++++++-- internal/hub/probes.go | 3 ++- internal/hub/probes_test.go | 32 ++++++++++++++++++++++---- 5 files changed, 94 insertions(+), 16 deletions(-) diff --git a/agent/handlers.go b/agent/handlers.go index f55cb151..5fd689f4 100644 --- a/agent/handlers.go +++ b/agent/handlers.go @@ -217,7 +217,7 @@ func (h *SyncNetworkProbesHandler) Handle(hctx *HandlerContext) error { if err := cbor.Unmarshal(hctx.Request.Data, &req); err != nil { return err } - resp, err := hctx.Agent.probeManager.ApplySync(req) + resp, err := hctx.Agent.probeManager.HandleSyncRequest(req) if err != nil { return err } diff --git a/agent/probe.go b/agent/probe.go index 61638f64..4f069d82 100644 --- a/agent/probe.go +++ b/agent/probe.go @@ -86,6 +86,19 @@ func newProbeTask(config probe.Config) *probeTask { } } +func newProbeTaskFromExisting(config probe.Config, existing *probeTask) *probeTask { + task := newProbeTask(config) + if existing == nil { + return task + } + + existing.mu.Lock() + defer existing.mu.Unlock() + task.samples = append(task.samples, existing.samples...) + task.buckets = existing.buckets + return task +} + // newProbeAggregate initializes an aggregate with an unset minimum value. func newProbeAggregate() probeAggregate { return probeAggregate{minMs: math.MaxFloat64} @@ -193,14 +206,14 @@ func (pm *ProbeManager) SyncProbes(configs []probe.Config) { if exists { close(task.cancel) } - task = newProbeTask(cfg) + task = newProbeTaskFromExisting(cfg, task) pm.probes[key] = task go pm.runProbe(task, true) } } -// ApplySync applies a full or incremental probe sync request. -func (pm *ProbeManager) ApplySync(req probe.SyncRequest) (probe.SyncResponse, error) { +// HandleSyncRequest applies a full or incremental probe sync request. +func (pm *ProbeManager) HandleSyncRequest(req probe.SyncRequest) (probe.SyncResponse, error) { switch req.Action { case probe.SyncActionReplace: pm.SyncProbes(req.Configs) @@ -216,7 +229,7 @@ func (pm *ProbeManager) ApplySync(req probe.SyncRequest) (probe.SyncResponse, er return probe.SyncResponse{Result: *result}, nil case probe.SyncActionDelete: if req.Config.ID == "" { - return probe.SyncResponse{}, errors.New("missing probe ID for delete action") + return probe.SyncResponse{}, errors.New("missing probe ID for delete") } pm.DeleteProbe(req.Config.ID) return probe.SyncResponse{}, nil @@ -244,7 +257,7 @@ func (pm *ProbeManager) UpsertProbe(config probe.Config, runNow bool) (*probe.Re if exists { close(task.cancel) } - task = newProbeTask(config) + task = newProbeTaskFromExisting(config, task) pm.probes[config.ID] = task startTask = true pm.mu.Unlock() @@ -309,7 +322,7 @@ func (pm *ProbeManager) Stop() { } // runProbe executes a single probe task in a loop. -func (pm *ProbeManager) runProbe(task *probeTask, runImmediately bool) { +func (pm *ProbeManager) runProbe(task *probeTask, runNow bool) { interval := time.Duration(task.config.Interval) * time.Second if interval < time.Second { interval = 10 * time.Second @@ -317,7 +330,7 @@ func (pm *ProbeManager) runProbe(task *probeTask, runImmediately bool) { ticker := time.NewTicker(interval) defer ticker.Stop() - if runImmediately { + if runNow { pm.executeProbe(task) } diff --git a/agent/probe_test.go b/agent/probe_test.go index b10b164d..abfb6638 100644 --- a/agent/probe_test.go +++ b/agent/probe_test.go @@ -197,7 +197,7 @@ func TestProbeManagerApplySyncUpsertRunsImmediatelyAndReturnsResult(t *testing.T httpClient: server.Client(), } - resp, err := pm.ApplySync(probe.SyncRequest{ + resp, err := pm.HandleSyncRequest(probe.SyncRequest{ Action: probe.SyncActionUpsert, Config: probe.Config{ID: "probe-1", Target: server.URL, Protocol: "http", Interval: 10}, RunNow: true, @@ -216,6 +216,48 @@ func TestProbeManagerApplySyncUpsertRunsImmediatelyAndReturnsResult(t *testing.T require.Len(t, task.samples, 1) } +func TestProbeManagerUpsertProbeKeepsHistoryWhenOnlyIntervalChanges(t *testing.T) { + originalCfg := probe.Config{ID: "probe-1", Target: "1.1.1.1", Protocol: "icmp", Interval: 10} + updatedCfg := probe.Config{ID: "probe-1", Target: "1.1.1.1", Protocol: "icmp", Interval: 30} + now := time.Now().UTC() + + existingTask := &probeTask{config: originalCfg, cancel: make(chan struct{})} + existingTask.addSampleLocked(probeSample{responseMs: 12, timestamp: now.Add(-50 * time.Minute)}) + existingTask.addSampleLocked(probeSample{responseMs: 24, timestamp: now.Add(-30 * time.Second)}) + + pm := &ProbeManager{ + probes: map[string]*probeTask{originalCfg.ID: existingTask}, + } + + result, err := pm.UpsertProbe(updatedCfg, false) + defer pm.Stop() + + require.NoError(t, err) + assert.Nil(t, result) + + updatedTask := pm.probes[updatedCfg.ID] + require.NotNil(t, updatedTask) + assert.NotSame(t, existingTask, updatedTask) + assert.Equal(t, updatedCfg, updatedTask.config) + + updatedTask.mu.Lock() + defer updatedTask.mu.Unlock() + require.Len(t, updatedTask.samples, 1) + assert.Equal(t, 24.0, updatedTask.samples[0].responseMs) + + agg := updatedTask.aggregateLocked(time.Hour, now) + require.True(t, agg.hasData()) + assert.Equal(t, 2, agg.totalCount) + assert.Equal(t, 2, agg.successCount) + assert.Equal(t, 18.0, agg.avgResponse()) + + select { + case <-existingTask.cancel: + default: + t.Fatal("expected original probe task to be cancelled") + } +} + func TestProbeManagerApplySyncDeleteRemovesTask(t *testing.T) { config := probe.Config{ID: "probe-1", Target: "1.1.1.1", Protocol: "icmp", Interval: 10} task := &probeTask{config: config, cancel: make(chan struct{})} @@ -223,7 +265,7 @@ func TestProbeManagerApplySyncDeleteRemovesTask(t *testing.T) { probes: map[string]*probeTask{config.ID: task}, } - _, err := pm.ApplySync(probe.SyncRequest{ + _, err := pm.HandleSyncRequest(probe.SyncRequest{ Action: probe.SyncActionDelete, Config: probe.Config{ID: config.ID}, }) diff --git a/internal/hub/probes.go b/internal/hub/probes.go index a7d2e0d4..bb136e3e 100644 --- a/internal/hub/probes.go +++ b/internal/hub/probes.go @@ -1,6 +1,7 @@ package hub import ( + "strconv" "time" "github.com/henrygd/beszel/internal/entities/probe" @@ -11,7 +12,7 @@ import ( // generateProbeID creates a stable hash ID for a probe based on its configuration and the system it belongs to. func generateProbeID(systemId string, config probe.Config) string { - return systems.MakeStableHashId(systemId, config.Target, config.Protocol) + return systems.MakeStableHashId(systemId, config.Target, config.Protocol, strconv.FormatUint(uint64(config.Port), 10)) } // bindNetworkProbesEvents keeps probe records and agent probe state in sync. diff --git a/internal/hub/probes_test.go b/internal/hub/probes_test.go index f71511e9..ebb55847 100644 --- a/internal/hub/probes_test.go +++ b/internal/hub/probes_test.go @@ -23,7 +23,7 @@ func TestGenerateProbeID(t *testing.T) { Port: 80, Interval: 60, }, - expected: "a20a5827", + expected: "de7b3647", }, { name: "HTTP probe on example.com with different system ID", @@ -34,7 +34,7 @@ func TestGenerateProbeID(t *testing.T) { Port: 80, Interval: 60, }, - expected: "ab602ae7", + expected: "be9e2707", }, { name: "Same probe, different interval", @@ -45,7 +45,7 @@ func TestGenerateProbeID(t *testing.T) { Port: 80, Interval: 120, }, - expected: "ab602ae7", + expected: "be9e2707", }, { name: "ICMP probe on 1.1.1.1", @@ -56,7 +56,7 @@ func TestGenerateProbeID(t *testing.T) { Port: 0, Interval: 10, }, - expected: "6d13a4a4", + expected: "49ec14fc", }, { name: "ICMP probe on 1.1.1.1 with different system ID", systemID: "sys4567", @@ -66,7 +66,29 @@ func TestGenerateProbeID(t *testing.T) { Port: 0, Interval: 10, }, - expected: "ddd6c81", + expected: "84921aa3", + }, + { + name: "TCP probe on example.com with port 443", + systemID: "sys789", + config: probe.Config{ + Protocol: "tcp", + Target: "example.com", + Port: 443, + Interval: 30, + }, + expected: "677b991", + }, + { + name: "TCP probe on example.com with port 8443", + systemID: "sys789", + config: probe.Config{ + Protocol: "tcp", + Target: "example.com", + Port: 8443, + Interval: 30, + }, + expected: "84167969", }, }