This commit is contained in:
henrygd
2026-04-26 13:37:33 -04:00
parent 89ac8dc585
commit 0378023b6f
5 changed files with 94 additions and 16 deletions

View File

@@ -217,7 +217,7 @@ func (h *SyncNetworkProbesHandler) Handle(hctx *HandlerContext) error {
if err := cbor.Unmarshal(hctx.Request.Data, &req); err != nil {
return err
}
resp, err := hctx.Agent.probeManager.ApplySync(req)
resp, err := hctx.Agent.probeManager.HandleSyncRequest(req)
if err != nil {
return err
}

View File

@@ -86,6 +86,19 @@ func newProbeTask(config probe.Config) *probeTask {
}
}
func newProbeTaskFromExisting(config probe.Config, existing *probeTask) *probeTask {
task := newProbeTask(config)
if existing == nil {
return task
}
existing.mu.Lock()
defer existing.mu.Unlock()
task.samples = append(task.samples, existing.samples...)
task.buckets = existing.buckets
return task
}
// newProbeAggregate initializes an aggregate with an unset minimum value.
func newProbeAggregate() probeAggregate {
return probeAggregate{minMs: math.MaxFloat64}
@@ -193,14 +206,14 @@ func (pm *ProbeManager) SyncProbes(configs []probe.Config) {
if exists {
close(task.cancel)
}
task = newProbeTask(cfg)
task = newProbeTaskFromExisting(cfg, task)
pm.probes[key] = task
go pm.runProbe(task, true)
}
}
// ApplySync applies a full or incremental probe sync request.
func (pm *ProbeManager) ApplySync(req probe.SyncRequest) (probe.SyncResponse, error) {
// HandleSyncRequest applies a full or incremental probe sync request.
func (pm *ProbeManager) HandleSyncRequest(req probe.SyncRequest) (probe.SyncResponse, error) {
switch req.Action {
case probe.SyncActionReplace:
pm.SyncProbes(req.Configs)
@@ -216,7 +229,7 @@ func (pm *ProbeManager) ApplySync(req probe.SyncRequest) (probe.SyncResponse, er
return probe.SyncResponse{Result: *result}, nil
case probe.SyncActionDelete:
if req.Config.ID == "" {
return probe.SyncResponse{}, errors.New("missing probe ID for delete action")
return probe.SyncResponse{}, errors.New("missing probe ID for delete")
}
pm.DeleteProbe(req.Config.ID)
return probe.SyncResponse{}, nil
@@ -244,7 +257,7 @@ func (pm *ProbeManager) UpsertProbe(config probe.Config, runNow bool) (*probe.Re
if exists {
close(task.cancel)
}
task = newProbeTask(config)
task = newProbeTaskFromExisting(config, task)
pm.probes[config.ID] = task
startTask = true
pm.mu.Unlock()
@@ -309,7 +322,7 @@ func (pm *ProbeManager) Stop() {
}
// runProbe executes a single probe task in a loop.
func (pm *ProbeManager) runProbe(task *probeTask, runImmediately bool) {
func (pm *ProbeManager) runProbe(task *probeTask, runNow bool) {
interval := time.Duration(task.config.Interval) * time.Second
if interval < time.Second {
interval = 10 * time.Second
@@ -317,7 +330,7 @@ func (pm *ProbeManager) runProbe(task *probeTask, runImmediately bool) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
if runImmediately {
if runNow {
pm.executeProbe(task)
}

View File

@@ -197,7 +197,7 @@ func TestProbeManagerApplySyncUpsertRunsImmediatelyAndReturnsResult(t *testing.T
httpClient: server.Client(),
}
resp, err := pm.ApplySync(probe.SyncRequest{
resp, err := pm.HandleSyncRequest(probe.SyncRequest{
Action: probe.SyncActionUpsert,
Config: probe.Config{ID: "probe-1", Target: server.URL, Protocol: "http", Interval: 10},
RunNow: true,
@@ -216,6 +216,48 @@ func TestProbeManagerApplySyncUpsertRunsImmediatelyAndReturnsResult(t *testing.T
require.Len(t, task.samples, 1)
}
func TestProbeManagerUpsertProbeKeepsHistoryWhenOnlyIntervalChanges(t *testing.T) {
originalCfg := probe.Config{ID: "probe-1", Target: "1.1.1.1", Protocol: "icmp", Interval: 10}
updatedCfg := probe.Config{ID: "probe-1", Target: "1.1.1.1", Protocol: "icmp", Interval: 30}
now := time.Now().UTC()
existingTask := &probeTask{config: originalCfg, cancel: make(chan struct{})}
existingTask.addSampleLocked(probeSample{responseMs: 12, timestamp: now.Add(-50 * time.Minute)})
existingTask.addSampleLocked(probeSample{responseMs: 24, timestamp: now.Add(-30 * time.Second)})
pm := &ProbeManager{
probes: map[string]*probeTask{originalCfg.ID: existingTask},
}
result, err := pm.UpsertProbe(updatedCfg, false)
defer pm.Stop()
require.NoError(t, err)
assert.Nil(t, result)
updatedTask := pm.probes[updatedCfg.ID]
require.NotNil(t, updatedTask)
assert.NotSame(t, existingTask, updatedTask)
assert.Equal(t, updatedCfg, updatedTask.config)
updatedTask.mu.Lock()
defer updatedTask.mu.Unlock()
require.Len(t, updatedTask.samples, 1)
assert.Equal(t, 24.0, updatedTask.samples[0].responseMs)
agg := updatedTask.aggregateLocked(time.Hour, now)
require.True(t, agg.hasData())
assert.Equal(t, 2, agg.totalCount)
assert.Equal(t, 2, agg.successCount)
assert.Equal(t, 18.0, agg.avgResponse())
select {
case <-existingTask.cancel:
default:
t.Fatal("expected original probe task to be cancelled")
}
}
func TestProbeManagerApplySyncDeleteRemovesTask(t *testing.T) {
config := probe.Config{ID: "probe-1", Target: "1.1.1.1", Protocol: "icmp", Interval: 10}
task := &probeTask{config: config, cancel: make(chan struct{})}
@@ -223,7 +265,7 @@ func TestProbeManagerApplySyncDeleteRemovesTask(t *testing.T) {
probes: map[string]*probeTask{config.ID: task},
}
_, err := pm.ApplySync(probe.SyncRequest{
_, err := pm.HandleSyncRequest(probe.SyncRequest{
Action: probe.SyncActionDelete,
Config: probe.Config{ID: config.ID},
})

View File

@@ -1,6 +1,7 @@
package hub
import (
"strconv"
"time"
"github.com/henrygd/beszel/internal/entities/probe"
@@ -11,7 +12,7 @@ import (
// generateProbeID creates a stable hash ID for a probe based on its configuration and the system it belongs to.
func generateProbeID(systemId string, config probe.Config) string {
return systems.MakeStableHashId(systemId, config.Target, config.Protocol)
return systems.MakeStableHashId(systemId, config.Target, config.Protocol, strconv.FormatUint(uint64(config.Port), 10))
}
// bindNetworkProbesEvents keeps probe records and agent probe state in sync.

View File

@@ -23,7 +23,7 @@ func TestGenerateProbeID(t *testing.T) {
Port: 80,
Interval: 60,
},
expected: "a20a5827",
expected: "de7b3647",
},
{
name: "HTTP probe on example.com with different system ID",
@@ -34,7 +34,7 @@ func TestGenerateProbeID(t *testing.T) {
Port: 80,
Interval: 60,
},
expected: "ab602ae7",
expected: "be9e2707",
},
{
name: "Same probe, different interval",
@@ -45,7 +45,7 @@ func TestGenerateProbeID(t *testing.T) {
Port: 80,
Interval: 120,
},
expected: "ab602ae7",
expected: "be9e2707",
},
{
name: "ICMP probe on 1.1.1.1",
@@ -56,7 +56,7 @@ func TestGenerateProbeID(t *testing.T) {
Port: 0,
Interval: 10,
},
expected: "6d13a4a4",
expected: "49ec14fc",
}, {
name: "ICMP probe on 1.1.1.1 with different system ID",
systemID: "sys4567",
@@ -66,7 +66,29 @@ func TestGenerateProbeID(t *testing.T) {
Port: 0,
Interval: 10,
},
expected: "ddd6c81",
expected: "84921aa3",
},
{
name: "TCP probe on example.com with port 443",
systemID: "sys789",
config: probe.Config{
Protocol: "tcp",
Target: "example.com",
Port: 443,
Interval: 30,
},
expected: "677b991",
},
{
name: "TCP probe on example.com with port 8443",
systemID: "sys789",
config: probe.Config{
Protocol: "tcp",
Target: "example.com",
Port: 8443,
Interval: 30,
},
expected: "84167969",
},
}