diff --git a/agent/probe.go b/agent/probe.go index b5285947..e39c854c 100644 --- a/agent/probe.go +++ b/agent/probe.go @@ -199,7 +199,6 @@ func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result { for key, task := range pm.probes { task.mu.Lock() agg := task.aggregateLocked(duration, now) - // The live request window still controls avg/loss, but the range fields are always 1h. hourAgg := task.aggregateLocked(time.Hour, now) task.mu.Unlock() diff --git a/agent/probe_test.go b/agent/probe_test.go index 865a4b24..92a2a94e 100644 --- a/agent/probe_test.go +++ b/agent/probe_test.go @@ -1,9 +1,13 @@ package agent import ( + "net" + "net/http" + "net/http/httptest" "testing" "time" + "github.com/henrygd/beszel/internal/entities/probe" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -87,3 +91,107 @@ func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) { assert.Equal(t, 40.0, result[3]) assert.Equal(t, 20.0, result[4]) } + +func TestProbeManagerGetResultsIncludesLossOnlyHourData(t *testing.T) { + now := time.Now().UTC() + task := &probeTask{} + task.addSampleLocked(probeSample{responseMs: -1, timestamp: now.Add(-30 * time.Second)}) + task.addSampleLocked(probeSample{responseMs: -1, timestamp: now.Add(-10 * time.Second)}) + + pm := &ProbeManager{probes: map[string]*probeTask{"icmp:example.com": task}} + + results := pm.GetResults(uint16(time.Minute / time.Millisecond)) + result, ok := results["icmp:example.com"] + require.True(t, ok) + require.Len(t, result, 5) + assert.Equal(t, 0.0, result[0]) + assert.Equal(t, 0.0, result[1]) + assert.Equal(t, 0.0, result[2]) + assert.Equal(t, 0.0, result[3]) + assert.Equal(t, 100.0, result[4]) +} + +func TestProbeManagerSyncProbesStopsRemovedTasksButKeepsExisting(t *testing.T) { + keepCfg := probe.Config{Target: "https://example.com", Protocol: "http", Interval: 10} + removeCfg := probe.Config{Target: "1.1.1.1", Protocol: "icmp", Interval: 10} + + keptTask := &probeTask{config: keepCfg, cancel: make(chan struct{})} + removedTask := &probeTask{config: removeCfg, cancel: make(chan struct{})} + pm := &ProbeManager{ + probes: map[string]*probeTask{ + keepCfg.Key(): keptTask, + removeCfg.Key(): removedTask, + }, + } + + pm.SyncProbes([]probe.Config{keepCfg}) + + assert.Same(t, keptTask, pm.probes[keepCfg.Key()]) + _, exists := pm.probes[removeCfg.Key()] + assert.False(t, exists) + + select { + case <-removedTask.cancel: + default: + t.Fatal("expected removed probe task to be cancelled") + } + + select { + case <-keptTask.cancel: + t.Fatal("expected existing probe task to remain active") + default: + } +} + +func TestProbeHTTP(t *testing.T) { + t.Run("success", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + responseMs := probeHTTP(server.Client(), server.URL) + assert.GreaterOrEqual(t, responseMs, 0.0) + }) + + t.Run("server error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "boom", http.StatusInternalServerError) + })) + defer server.Close() + + assert.Equal(t, -1.0, probeHTTP(server.Client(), server.URL)) + }) +} + +func TestProbeTCP(t *testing.T) { + t.Run("success", func(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listener.Close() + + accepted := make(chan struct{}) + go func() { + defer close(accepted) + conn, err := listener.Accept() + if err == nil { + _ = conn.Close() + } + }() + + port := uint16(listener.Addr().(*net.TCPAddr).Port) + responseMs := probeTCP("127.0.0.1", port) + assert.GreaterOrEqual(t, responseMs, 0.0) + <-accepted + }) + + t.Run("connection failure", func(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + port := uint16(listener.Addr().(*net.TCPAddr).Port) + require.NoError(t, listener.Close()) + + assert.Equal(t, -1.0, probeTCP("127.0.0.1", port)) + }) +}