diff --git a/agent/probe.go b/agent/probe.go index e16be83e..f978e588 100644 --- a/agent/probe.go +++ b/agent/probe.go @@ -13,6 +13,13 @@ import ( "github.com/henrygd/beszel/internal/entities/probe" ) +const ( + probeRawRetention = 2 * time.Minute + probeMinuteBucketTTL = time.Hour + probeMinuteBucketLen = int(probeMinuteBucketTTL/time.Minute) + 1 + probeHourWindow = time.Hour +) + // ProbeManager manages network probe tasks. type ProbeManager struct { mu sync.RWMutex @@ -20,16 +27,35 @@ type ProbeManager struct { httpClient *http.Client } +// probeTask owns retention buffers and cancellation for a single probe config. type probeTask struct { config probe.Config cancel chan struct{} mu sync.Mutex samples []probeSample + buckets [probeMinuteBucketLen]probeBucket } +// probeSample stores one probe attempt and its collection time. type probeSample struct { - latencyMs float64 // -1 means loss - timestamp time.Time + responseMs float64 // -1 means loss + timestamp time.Time +} + +// probeBucket stores one minute of aggregated probe data. +type probeBucket struct { + minute int64 + filled bool + stats probeAggregate +} + +// probeAggregate accumulates successful response stats and total sample counts. +type probeAggregate struct { + sumMs float64 + minMs float64 + maxMs float64 + totalCount int + successCount int } func newProbeManager() *ProbeManager { @@ -39,6 +65,74 @@ func newProbeManager() *ProbeManager { } } +// newProbeAggregate initializes an aggregate with an unset minimum value. +func newProbeAggregate() probeAggregate { + return probeAggregate{minMs: math.MaxFloat64} +} + +// addResponse folds a single probe sample into the aggregate. +func (agg *probeAggregate) addResponse(responseMs float64) { + agg.totalCount++ + if responseMs < 0 { + return + } + agg.successCount++ + agg.sumMs += responseMs + if responseMs < agg.minMs { + agg.minMs = responseMs + } + if responseMs > agg.maxMs { + agg.maxMs = responseMs + } +} + +// addAggregate merges another aggregate into this one. +func (agg *probeAggregate) addAggregate(other probeAggregate) { + if other.totalCount == 0 { + return + } + agg.totalCount += other.totalCount + agg.successCount += other.successCount + agg.sumMs += other.sumMs + if other.successCount == 0 { + return + } + if agg.minMs == math.MaxFloat64 || other.minMs < agg.minMs { + agg.minMs = other.minMs + } + if other.maxMs > agg.maxMs { + agg.maxMs = other.maxMs + } +} + +// hasData reports whether the aggregate contains any samples. +func (agg probeAggregate) hasData() bool { + return agg.totalCount > 0 +} + +// result converts the aggregate into the probe result slice format. +func (agg probeAggregate) result() probe.Result { + avg := agg.avgResponse() + minMs := 0.0 + if agg.successCount > 0 { + minMs = math.Round(agg.minMs*100) / 100 + } + return probe.Result{ + avg, + minMs, + math.Round(agg.maxMs*100) / 100, + math.Round(float64(agg.totalCount-agg.successCount)/float64(agg.totalCount)*10000) / 100, + } +} + +// avgResponse returns the rounded average of successful samples. +func (agg probeAggregate) avgResponse() float64 { + if agg.successCount == 0 { + return 0 + } + return math.Round(agg.sumMs/float64(agg.successCount)*100) / 100 +} + // SyncProbes replaces all probe tasks with the given configs. func (pm *ProbeManager) SyncProbes(configs []probe.Config) { pm.mu.Lock() @@ -79,52 +173,34 @@ func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result { defer pm.mu.RUnlock() results := make(map[string]probe.Result, len(pm.probes)) - cutoff := time.Now().Add(-time.Duration(durationMs) * time.Millisecond) + now := time.Now() + duration := time.Duration(durationMs) * time.Millisecond for key, task := range pm.probes { task.mu.Lock() - var sum, minMs, maxMs float64 - var count, lossCount int - minMs = math.MaxFloat64 - - for _, s := range task.samples { - if s.timestamp.Before(cutoff) { - continue - } - count++ - if s.latencyMs < 0 { - lossCount++ - continue - } - sum += s.latencyMs - if s.latencyMs < minMs { - minMs = s.latencyMs - } - if s.latencyMs > maxMs { - maxMs = s.latencyMs - } - } + agg := task.aggregateLocked(duration, now) + // The live request window still controls avg/loss, but the range fields are always 1h. + hourAgg := task.aggregateLocked(probeHourWindow, now) task.mu.Unlock() - if count == 0 { + if !agg.hasData() { continue } - successCount := count - lossCount - var avg float64 - if successCount > 0 { - avg = math.Round(sum/float64(successCount)*100) / 100 - } - if minMs == math.MaxFloat64 { - minMs = 0 - } - - results[key] = probe.Result{ - avg, // average latency in ms - math.Round(minMs*100) / 100, // min latency in ms - math.Round(maxMs*100) / 100, // max latency in ms - math.Round(float64(lossCount)/float64(count)*10000) / 100, // packet loss percentage + result := agg.result() + hourAvg := hourAgg.avgResponse() + if hourAgg.successCount > 0 { + result = probe.Result{ + result[0], + hourAvg, + math.Round(hourAgg.minMs*100) / 100, + math.Round(hourAgg.maxMs*100) / 100, + result[3], + } + } else { + result = probe.Result{result[0], hourAvg, 0, 0, result[3]} } + results[key] = result } return results @@ -161,32 +237,48 @@ func (pm *ProbeManager) runProbe(task *probeTask) { } } -func (pm *ProbeManager) executeProbe(task *probeTask) { - var latencyMs float64 - - switch task.config.Protocol { - case "icmp": - latencyMs = probeICMP(task.config.Target) - case "tcp": - latencyMs = probeTCP(task.config.Target, task.config.Port) - case "http": - latencyMs = probeHTTP(pm.httpClient, task.config.Target) - default: - slog.Warn("unknown probe protocol", "protocol", task.config.Protocol) - return +// aggregateLocked collects probe data for the requested time window. +func (task *probeTask) aggregateLocked(duration time.Duration, now time.Time) probeAggregate { + cutoff := now.Add(-duration) + // Keep short windows exact; longer windows read from minute buckets to avoid raw-sample retention. + if duration <= probeRawRetention { + return aggregateSamplesSince(task.samples, cutoff) } + return aggregateBucketsSince(task.buckets[:], cutoff, now) +} - sample := probeSample{ - latencyMs: latencyMs, - timestamp: time.Now(), +// aggregateSamplesSince aggregates raw samples newer than the cutoff. +func aggregateSamplesSince(samples []probeSample, cutoff time.Time) probeAggregate { + agg := newProbeAggregate() + for _, sample := range samples { + if sample.timestamp.Before(cutoff) { + continue + } + agg.addResponse(sample.responseMs) } + return agg +} - task.mu.Lock() - // Trim old samples beyond 120s to bound memory - cutoff := time.Now().Add(-120 * time.Second) +// aggregateBucketsSince aggregates minute buckets overlapping the requested window. +func aggregateBucketsSince(buckets []probeBucket, cutoff, now time.Time) probeAggregate { + agg := newProbeAggregate() + startMinute := cutoff.Unix() / 60 + endMinute := now.Unix() / 60 + for _, bucket := range buckets { + if !bucket.filled || bucket.minute < startMinute || bucket.minute > endMinute { + continue + } + agg.addAggregate(bucket.stats) + } + return agg +} + +// addSampleLocked stores a fresh sample in both raw and per-minute retention buffers. +func (task *probeTask) addSampleLocked(sample probeSample) { + cutoff := sample.timestamp.Add(-probeRawRetention) start := 0 for i := range task.samples { - if task.samples[i].timestamp.After(cutoff) { + if !task.samples[i].timestamp.Before(cutoff) { start = i break } @@ -199,10 +291,45 @@ func (pm *ProbeManager) executeProbe(task *probeTask) { task.samples = task.samples[:size] } task.samples = append(task.samples, sample) + + minute := sample.timestamp.Unix() / 60 + // Each slot stores one wall-clock minute, so the ring stays fixed-size at ~1h per probe. + bucket := &task.buckets[int(minute%int64(probeMinuteBucketLen))] + if !bucket.filled || bucket.minute != minute { + bucket.minute = minute + bucket.filled = true + bucket.stats = newProbeAggregate() + } + bucket.stats.addResponse(sample.responseMs) +} + +// executeProbe runs the configured probe and records the sample. +func (pm *ProbeManager) executeProbe(task *probeTask) { + var responseMs float64 + + switch task.config.Protocol { + case "icmp": + responseMs = probeICMP(task.config.Target) + case "tcp": + responseMs = probeTCP(task.config.Target, task.config.Port) + case "http": + responseMs = probeHTTP(pm.httpClient, task.config.Target) + default: + slog.Warn("unknown probe protocol", "protocol", task.config.Protocol) + return + } + + sample := probeSample{ + responseMs: responseMs, + timestamp: time.Now(), + } + + task.mu.Lock() + task.addSampleLocked(sample) task.mu.Unlock() } -// probeTCP measures pure TCP handshake latency (excluding DNS resolution). +// probeTCP measures pure TCP handshake response (excluding DNS resolution). // Returns -1 on failure. func probeTCP(target string, port uint16) float64 { // Resolve DNS first, outside the timing window @@ -222,7 +349,7 @@ func probeTCP(target string, port uint16) float64 { return float64(time.Since(start).Microseconds()) / 1000.0 } -// probeHTTP measures HTTP GET request latency. Returns -1 on failure. +// probeHTTP measures HTTP GET request response. Returns -1 on failure. func probeHTTP(client *http.Client, url string) float64 { start := time.Now() resp, err := client.Get(url) diff --git a/agent/probe_ping.go b/agent/probe_ping.go index 9011e67f..84fe8046 100644 --- a/agent/probe_ping.go +++ b/agent/probe_ping.go @@ -71,11 +71,11 @@ var ( } ) -// probeICMP sends an ICMP echo request and measures round-trip latency. +// probeICMP sends an ICMP echo request and measures round-trip response. // Supports both IPv4 and IPv6 targets. The ICMP method (raw socket, // unprivileged datagram, or exec fallback) is detected once per address // family and cached for subsequent probes. -// Returns latency in milliseconds, or -1 on failure. +// Returns response in milliseconds, or -1 on failure. func probeICMP(target string) float64 { family, ip := resolveICMPTarget(target) if family == nil { diff --git a/agent/probe_test.go b/agent/probe_test.go new file mode 100644 index 00000000..92d48f09 --- /dev/null +++ b/agent/probe_test.go @@ -0,0 +1,89 @@ +package agent + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProbeTaskAggregateLockedUsesRawSamplesForShortWindows(t *testing.T) { + now := time.Date(2026, time.April, 21, 12, 0, 0, 0, time.UTC) + task := &probeTask{} + + task.addSampleLocked(probeSample{responseMs: 10, timestamp: now.Add(-90 * time.Second)}) + task.addSampleLocked(probeSample{responseMs: 20, timestamp: now.Add(-30 * time.Second)}) + task.addSampleLocked(probeSample{responseMs: -1, timestamp: now.Add(-10 * time.Second)}) + + agg := task.aggregateLocked(time.Minute, now) + require.True(t, agg.hasData()) + assert.Equal(t, 2, agg.totalCount) + assert.Equal(t, 1, agg.successCount) + assert.Equal(t, 20.0, agg.result()[0]) + assert.Equal(t, 20.0, agg.result()[1]) + assert.Equal(t, 20.0, agg.result()[2]) + assert.Equal(t, 50.0, agg.result()[3]) +} + +func TestProbeTaskAggregateLockedUsesMinuteBucketsForLongWindows(t *testing.T) { + now := time.Date(2026, time.April, 21, 12, 0, 30, 0, time.UTC) + task := &probeTask{} + + task.addSampleLocked(probeSample{responseMs: 10, timestamp: now.Add(-11 * time.Minute)}) + task.addSampleLocked(probeSample{responseMs: 20, timestamp: now.Add(-9 * time.Minute)}) + task.addSampleLocked(probeSample{responseMs: 40, timestamp: now.Add(-5 * time.Minute)}) + task.addSampleLocked(probeSample{responseMs: -1, timestamp: now.Add(-90 * time.Second)}) + task.addSampleLocked(probeSample{responseMs: 30, timestamp: now.Add(-30 * time.Second)}) + + agg := task.aggregateLocked(10*time.Minute, now) + require.True(t, agg.hasData()) + assert.Equal(t, 4, agg.totalCount) + assert.Equal(t, 3, agg.successCount) + assert.Equal(t, 30.0, agg.result()[0]) + assert.Equal(t, 20.0, agg.result()[1]) + assert.Equal(t, 40.0, agg.result()[2]) + assert.Equal(t, 25.0, agg.result()[3]) +} + +func TestProbeTaskAddSampleLockedTrimsRawSamplesButKeepsBucketHistory(t *testing.T) { + now := time.Date(2026, time.April, 21, 12, 0, 0, 0, time.UTC) + task := &probeTask{} + + task.addSampleLocked(probeSample{responseMs: 10, timestamp: now.Add(-10 * time.Minute)}) + task.addSampleLocked(probeSample{responseMs: 20, timestamp: now}) + + require.Len(t, task.samples, 1) + assert.Equal(t, 20.0, task.samples[0].responseMs) + + agg := task.aggregateLocked(10*time.Minute, now) + require.True(t, agg.hasData()) + assert.Equal(t, 2, agg.totalCount) + assert.Equal(t, 2, agg.successCount) + assert.Equal(t, 15.0, agg.result()[0]) + assert.Equal(t, 10.0, agg.result()[1]) + assert.Equal(t, 20.0, agg.result()[2]) + assert.Equal(t, 0.0, agg.result()[3]) +} + +func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) { + now := time.Now().UTC() + task := &probeTask{} + task.addSampleLocked(probeSample{responseMs: 10, timestamp: now.Add(-30 * time.Minute)}) + task.addSampleLocked(probeSample{responseMs: 20, timestamp: now.Add(-9 * time.Minute)}) + task.addSampleLocked(probeSample{responseMs: 40, timestamp: now.Add(-5 * time.Minute)}) + task.addSampleLocked(probeSample{responseMs: -1, timestamp: now.Add(-90 * time.Second)}) + task.addSampleLocked(probeSample{responseMs: 30, timestamp: now.Add(-30 * time.Second)}) + + pm := &ProbeManager{probes: map[string]*probeTask{"icmp:example.com": task}} + + results := pm.GetResults(uint16(time.Minute / time.Millisecond)) + result, ok := results["icmp:example.com"] + require.True(t, ok) + require.Len(t, result, 5) + assert.Equal(t, 30.0, result[0]) + assert.Equal(t, 25.0, result[1]) + assert.Equal(t, 10.0, result[2]) + assert.Equal(t, 40.0, result[3]) + assert.Equal(t, 0.0, result[4]) +} diff --git a/internal/entities/probe/probe.go b/internal/entities/probe/probe.go index d4f71aa8..3a6845c4 100644 --- a/internal/entities/probe/probe.go +++ b/internal/entities/probe/probe.go @@ -12,13 +12,15 @@ type Config struct { // Result holds aggregated probe results for a single target. // -// 0: avg latency in ms +// 0: avg response in ms // -// 1: min latency in ms +// 1: average response over the last hour in ms // -// 2: max latency in ms +// 2: min response over the last hour in ms // -// 3: packet loss percentage (0-100) +// 3: max response over the last hour in ms +// +// 4: packet loss percentage (0-100) type Result []float64 // Key returns the map key used for this probe config (e.g. "icmp:1.1.1.1", "tcp:host:443", "http:https://example.com"). diff --git a/internal/hub/systems/system.go b/internal/hub/systems/system.go index 99274104..24e51f97 100644 --- a/internal/hub/systems/system.go +++ b/internal/hub/systems/system.go @@ -335,7 +335,7 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst if !realtimeActive { db = app.DB() nowString = time.Now().UTC().Format(types.DefaultDateLayout) - sql := fmt.Sprintf("UPDATE %s SET latency={:latency}, loss={:loss}, updated={:updated} WHERE id={:id}", collectionName) + sql := fmt.Sprintf("UPDATE %s SET resAvg={:resAvg}, resMin1h={:resMin1h}, resMax1h={:resMax1h}, resAvg1h={:resAvg1h}, loss={:loss}, updated={:updated} WHERE id={:id}", collectionName) updateQuery = db.NewQuery(sql) } @@ -349,12 +349,12 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst record.Set("type", "1m") err = app.SaveNoValidate(record) default: - if dataJson, e := json.Marshal(data); e == nil { + if dataJSON, marshalErr := json.Marshal(data); marshalErr == nil { sql := "INSERT INTO network_probe_stats (system, stats, type, created) VALUES ({:system}, {:stats}, {:type}, {:created})" insertQuery := db.NewQuery(sql) _, err = insertQuery.Bind(dbx.Params{ "system": systemId, - "stats": dataJson, + "stats": dataJSON, "type": "1m", "created": nowString, }).Execute() @@ -365,24 +365,29 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst } // update network_probes records - for key := range data { - probe := data[key] + for key, values := range data { id := MakeStableHashId(systemId, key) switch realtimeActive { case true: var record *core.Record record, err = app.FindRecordById(collectionName, id) if err == nil { - record.Set("latency", probe[0]) - record.Set("loss", probe[3]) + record.Set("resAvg", probeMetric(values, 0)) + record.Set("resAvg1h", probeMetric(values, 1)) + record.Set("resMin1h", probeMetric(values, 2)) + record.Set("resMax1h", probeMetric(values, 3)) + record.Set("loss", probeMetric(values, 4)) err = app.SaveNoValidate(record) } default: _, err = updateQuery.Bind(dbx.Params{ - "id": id, - "latency": probe[0], - "loss": probe[3], - "updated": nowString, + "id": id, + "resAvg": probeMetric(values, 0), + "resAvg1h": probeMetric(values, 1), + "resMin1h": probeMetric(values, 2), + "resMax1h": probeMetric(values, 3), + "loss": probeMetric(values, 4), + "updated": nowString, }).Execute() } if err != nil { @@ -393,6 +398,13 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst return nil } +func probeMetric(values probe.Result, index int) float64 { + if index < len(values) { + return values[index] + } + return 0 +} + // createContainerRecords creates container records func createContainerRecords(app core.App, data []*container.Stats, systemId string) error { if len(data) == 0 { diff --git a/internal/migrations/1776632983_updated_network_probes.go b/internal/migrations/1776632983_updated_network_probes.go deleted file mode 100644 index fc23f257..00000000 --- a/internal/migrations/1776632983_updated_network_probes.go +++ /dev/null @@ -1,62 +0,0 @@ -package migrations - -import ( - "github.com/pocketbase/pocketbase/core" - m "github.com/pocketbase/pocketbase/migrations" -) - -func init() { - m.Register(func(app core.App) error { - collection, err := app.FindCollectionByNameOrId("np_probes_001") - if err != nil { - return err - } - - // add field - if err := collection.Fields.AddMarshaledJSONAt(7, []byte(`{ - "hidden": false, - "id": "number926446584", - "max": null, - "min": null, - "name": "latency", - "onlyInt": false, - "presentable": false, - "required": false, - "system": false, - "type": "number" - }`)); err != nil { - return err - } - - // add field - if err := collection.Fields.AddMarshaledJSONAt(8, []byte(`{ - "hidden": false, - "id": "number3726709001", - "max": null, - "min": null, - "name": "loss", - "onlyInt": false, - "presentable": false, - "required": false, - "system": false, - "type": "number" - }`)); err != nil { - return err - } - - return app.Save(collection) - }, func(app core.App) error { - collection, err := app.FindCollectionByNameOrId("np_probes_001") - if err != nil { - return err - } - - // remove field - collection.Fields.RemoveById("number926446584") - - // remove field - collection.Fields.RemoveById("number3726709001") - - return app.Save(collection) - }) -} diff --git a/internal/migrations/1_add_network_probes.go b/internal/migrations/1_add_network_probes.go deleted file mode 100644 index a8f25377..00000000 --- a/internal/migrations/1_add_network_probes.go +++ /dev/null @@ -1,245 +0,0 @@ -package migrations - -import ( - "github.com/pocketbase/pocketbase/core" - m "github.com/pocketbase/pocketbase/migrations" -) - -func init() { - m.Register(func(app core.App) error { - jsonData := `[ - { - "id": "np_probes_001", - "listRule": null, - "viewRule": null, - "createRule": null, - "updateRule": null, - "deleteRule": null, - "name": "network_probes", - "type": "base", - "fields": [ - { - "autogeneratePattern": "[a-z0-9]{15}", - "hidden": false, - "id": "text3208210256", - "max": 15, - "min": 15, - "name": "id", - "pattern": "^[a-z0-9]+$", - "presentable": false, - "primaryKey": true, - "required": true, - "system": true, - "type": "text" - }, - { - "cascadeDelete": true, - "collectionId": "2hz5ncl8tizk5nx", - "hidden": false, - "id": "np_system", - "maxSelect": 1, - "minSelect": 0, - "name": "system", - "presentable": false, - "required": true, - "system": false, - "type": "relation" - }, - { - "hidden": false, - "id": "np_name", - "max": 200, - "min": 0, - "name": "name", - "pattern": "", - "presentable": false, - "primaryKey": false, - "required": false, - "system": false, - "type": "text" - }, - { - "hidden": false, - "id": "np_target", - "max": 500, - "min": 1, - "name": "target", - "pattern": "", - "presentable": false, - "primaryKey": false, - "required": true, - "system": false, - "type": "text" - }, - { - "hidden": false, - "id": "np_protocol", - "maxSelect": 1, - "name": "protocol", - "presentable": false, - "required": true, - "system": false, - "type": "select", - "values": ["icmp", "tcp", "http"] - }, - { - "hidden": false, - "id": "np_port", - "max": 65535, - "min": 0, - "name": "port", - "onlyInt": true, - "presentable": false, - "required": false, - "system": false, - "type": "number" - }, - { - "hidden": false, - "id": "np_interval", - "max": 3600, - "min": 1, - "name": "interval", - "onlyInt": true, - "presentable": false, - "required": true, - "system": false, - "type": "number" - }, - { - "hidden": false, - "id": "np_enabled", - "name": "enabled", - "presentable": false, - "required": false, - "system": false, - "type": "bool" - }, - { - "hidden": false, - "id": "autodate2990389176", - "name": "created", - "onCreate": true, - "onUpdate": false, - "presentable": false, - "system": false, - "type": "autodate" - }, - { - "hidden": false, - "id": "autodate3332085495", - "name": "updated", - "onCreate": true, - "onUpdate": true, - "presentable": false, - "system": false, - "type": "autodate" - } - ], - "indexes": [ - "CREATE INDEX ` + "`" + `idx_np_system_enabled` + "`" + ` ON ` + "`" + `network_probes` + "`" + ` (\n ` + "`" + `system` + "`" + `,\n ` + "`" + `enabled` + "`" + `\n)" - ], - "system": false - }, - { - "id": "np_stats_001", - "listRule": null, - "viewRule": null, - "createRule": null, - "updateRule": null, - "deleteRule": null, - "name": "network_probe_stats", - "type": "base", - "fields": [ - { - "autogeneratePattern": "[a-z0-9]{15}", - "hidden": false, - "id": "text3208210256", - "max": 15, - "min": 15, - "name": "id", - "pattern": "^[a-z0-9]+$", - "presentable": false, - "primaryKey": true, - "required": true, - "system": true, - "type": "text" - }, - { - "cascadeDelete": true, - "collectionId": "2hz5ncl8tizk5nx", - "hidden": false, - "id": "nps_system", - "maxSelect": 1, - "minSelect": 0, - "name": "system", - "presentable": false, - "required": true, - "system": false, - "type": "relation" - }, - { - "hidden": false, - "id": "nps_stats", - "maxSize": 2000000, - "name": "stats", - "presentable": false, - "required": true, - "system": false, - "type": "json" - }, - { - "hidden": false, - "id": "nps_type", - "maxSelect": 1, - "name": "type", - "presentable": false, - "required": true, - "system": false, - "type": "select", - "values": ["1m", "10m", "20m", "120m", "480m"] - }, - { - "hidden": false, - "id": "autodate2990389176", - "name": "created", - "onCreate": true, - "onUpdate": false, - "presentable": false, - "system": false, - "type": "autodate" - }, - { - "hidden": false, - "id": "autodate3332085495", - "name": "updated", - "onCreate": true, - "onUpdate": true, - "presentable": false, - "system": false, - "type": "autodate" - } - ], - "indexes": [ - "CREATE INDEX ` + "`" + `idx_nps_system_type_created` + "`" + ` ON ` + "`" + `network_probe_stats` + "`" + ` (\n ` + "`" + `system` + "`" + `,\n ` + "`" + `type` + "`" + `,\n ` + "`" + `created` + "`" + `\n)" - ], - "system": false - } -]` - - return app.ImportCollectionsByMarshaledJSON([]byte(jsonData), false) - }, func(app core.App) error { - // down: remove the network probe collections - if c, err := app.FindCollectionByNameOrId("network_probes"); err == nil { - if err := app.Delete(c); err != nil { - return err - } - } - if c, err := app.FindCollectionByNameOrId("network_probe_stats"); err == nil { - if err := app.Delete(c); err != nil { - return err - } - } - return nil - }) -} diff --git a/internal/records/probe_averaging_test.go b/internal/records/probe_averaging_test.go new file mode 100644 index 00000000..b711b239 --- /dev/null +++ b/internal/records/probe_averaging_test.go @@ -0,0 +1,58 @@ +//go:build testing + +package records_test + +import ( + "testing" + + "github.com/henrygd/beszel/internal/records" + "github.com/henrygd/beszel/internal/tests" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAverageProbeStats(t *testing.T) { + hub, err := tests.NewTestHub(t.TempDir()) + require.NoError(t, err) + defer hub.Cleanup() + + rm := records.NewRecordManager(hub) + user, err := tests.CreateUser(hub, "probe-avg@example.com", "testtesttest") + require.NoError(t, err) + system, err := tests.CreateRecord(hub, "systems", map[string]any{ + "name": "probe-avg-system", + "host": "localhost", + "port": "45876", + "status": "up", + "users": []string{user.Id}, + }) + require.NoError(t, err) + + recordA, err := tests.CreateRecord(hub, "network_probe_stats", map[string]any{ + "system": system.Id, + "type": "1m", + "stats": `{"icmp:1.1.1.1":[10,80,8,14,1]}`, + }) + require.NoError(t, err) + recordB, err := tests.CreateRecord(hub, "network_probe_stats", map[string]any{ + "system": system.Id, + "type": "1m", + "stats": `{"icmp:1.1.1.1":[40,100,9,50,5]}`, + }) + require.NoError(t, err) + + result := rm.AverageProbeStats(hub.DB(), records.RecordIds{ + {Id: recordA.Id}, + {Id: recordB.Id}, + }) + + stats, ok := result["icmp:1.1.1.1"] + require.True(t, ok) + require.Len(t, stats, 5) + assert.Equal(t, 25.0, stats[0]) + assert.Equal(t, 90.0, stats[1]) + assert.Equal(t, 8.0, stats[2]) + assert.Equal(t, 50.0, stats[3]) + assert.Equal(t, 3.0, stats[4]) +} diff --git a/internal/records/records.go b/internal/records/records.go index b9d9b9c1..961e50fc 100644 --- a/internal/records/records.go +++ b/internal/records/records.go @@ -507,11 +507,11 @@ func AverageContainerStatsSlice(records [][]container.Stats) []container.Stats { } // AverageProbeStats averages probe stats across multiple records. -// For each probe key: avg of avgs, min of mins, max of maxes, avg of losses. +// For each probe key: avg of average fields, min of mins, and max of maxes. func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) map[string]probe.Result { type probeValues struct { - sums probe.Result - count float64 + sums probe.Result + counts []int } query := db.NewQuery("SELECT stats FROM network_probe_stats WHERE id = {:id}") @@ -529,35 +529,52 @@ func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) ma for key, vals := range rawStats { s, ok := sums[key] if !ok { - s = &probeValues{sums: make(probe.Result, len(vals))} + s = &probeValues{sums: make(probe.Result, len(vals)), counts: make([]int, len(vals))} sums[key] = s } + if len(vals) > len(s.sums) { + expandedSums := make(probe.Result, len(vals)) + copy(expandedSums, s.sums) + s.sums = expandedSums + + expandedCounts := make([]int, len(vals)) + copy(expandedCounts, s.counts) + s.counts = expandedCounts + } for i := range vals { switch i { - case 1: // min fields - if s.count == 0 || vals[i] < s.sums[i] { + case 2: // min fields + if s.counts[i] == 0 || vals[i] < s.sums[i] { s.sums[i] = vals[i] } - case 2: // max fields - if vals[i] > s.sums[i] { + case 3: // max fields + if s.counts[i] == 0 || vals[i] > s.sums[i] { s.sums[i] = vals[i] } default: // average fields s.sums[i] += vals[i] } + s.counts[i]++ } - s.count++ } } // compute final averages result := make(map[string]probe.Result, len(sums)) for key, s := range sums { - if s.count == 0 { + if len(s.counts) == 0 { continue } - s.sums[0] = twoDecimals(s.sums[0] / s.count) // avg latency - s.sums[3] = twoDecimals(s.sums[3] / s.count) // packet loss + for i := range s.sums { + switch i { + case 2, 3: // min and max fields should not be averaged + continue + default: + if s.counts[i] > 0 { + s.sums[i] = twoDecimals(s.sums[i] / float64(s.counts[i])) + } + } + } result[key] = s.sums } return result diff --git a/internal/site/src/components/network-probes-table/network-probes-columns.tsx b/internal/site/src/components/network-probes-table/network-probes-columns.tsx index c9dbd739..3f6f3695 100644 --- a/internal/site/src/components/network-probes-table/network-probes-columns.tsx +++ b/internal/site/src/components/network-probes-table/network-probes-columns.tsx @@ -95,12 +95,12 @@ export function getProbeColumns(longestName = 0, longestTarget = 0): ColumnDef {getValue() as number}s, }, { - id: "latency", - accessorFn: (record) => record.latency, + id: "response", + accessorFn: (record) => record.response, invertSorting: true, - header: ({ column }) => , + header: ({ column }) => , cell: ({ row }) => { - const val = row.original.latency + const val = row.original.response if (!val) { return - } @@ -125,8 +125,8 @@ export function getProbeColumns(longestName = 0, longestTarget = 0): ColumnDef , cell: ({ row }) => { - const { loss, latency } = row.original - if (loss === undefined || (!latency && !loss)) { + const { loss, response } = row.original + if (loss === undefined || (!response && !loss)) { return - } let color = "bg-green-500" diff --git a/internal/site/src/components/network-probes-table/network-probes-table.tsx b/internal/site/src/components/network-probes-table/network-probes-table.tsx index 5dc4d0c5..f32ce38d 100644 --- a/internal/site/src/components/network-probes-table/network-probes-table.tsx +++ b/internal/site/src/components/network-probes-table/network-probes-table.tsx @@ -102,7 +102,7 @@ export default function NetworkProbesTableNew({ Network Probes
- ICMP/TCP/HTTP latency monitoring from agents + ICMP/TCP/HTTP response monitoring from agents
diff --git a/internal/site/src/components/network-probes-table/probe-dialog.tsx b/internal/site/src/components/network-probes-table/probe-dialog.tsx index dd765365..4faa6deb 100644 --- a/internal/site/src/components/network-probes-table/probe-dialog.tsx +++ b/internal/site/src/components/network-probes-table/probe-dialog.tsx @@ -78,7 +78,7 @@ export function AddProbeDialog({ systemId }: { systemId?: string }) { Add {{ foo: t`Network Probe` }} - Configure latency monitoring from this agent. + Configure response monitoring from this agent.
diff --git a/internal/site/src/components/routes/system/charts/probes-charts.tsx b/internal/site/src/components/routes/system/charts/probes-charts.tsx index a0cacf43..b21359df 100644 --- a/internal/site/src/components/routes/system/charts/probes-charts.tsx +++ b/internal/site/src/components/routes/system/charts/probes-charts.tsx @@ -44,7 +44,7 @@ function ProbeChart({ const filter = useStore($filter) const { dataPoints, visibleKeys } = useMemo(() => { - const sortedProbes = [...probes].sort((a, b) => b.latency - a.latency) + const sortedProbes = [...probes].sort((a, b) => b.response - a.response) const count = sortedProbes.length const points: DataPoint[] = [] const visibleKeys: string[] = [] @@ -103,7 +103,7 @@ function ProbeChart({ ) } -export function LatencyChart({ probeStats, grid, probes, chartData, empty }: ProbeChartProps) { +export function ResponseChart({ probeStats, grid, probes, chartData, empty }: ProbeChartProps) { const { t } = useLingui() return ( @@ -114,7 +114,7 @@ export function LatencyChart({ probeStats, grid, probes, chartData, empty }: Pro chartData={chartData} empty={empty} valueIndex={0} - title={t`Latency`} + title={t`Response`} description={t`Average round-trip time (ms)`} tickFormatter={(value) => `${toFixedFloat(value, value >= 10 ? 0 : 1)} ms`} contentFormatter={({ value }) => { diff --git a/internal/site/src/components/routes/system/lazy-tables.tsx b/internal/site/src/components/routes/system/lazy-tables.tsx index 1cf3923f..a2414fd5 100644 --- a/internal/site/src/components/routes/system/lazy-tables.tsx +++ b/internal/site/src/components/routes/system/lazy-tables.tsx @@ -1,7 +1,7 @@ import { lazy } from "react" import { useIntersectionObserver } from "@/lib/use-intersection-observer" import { cn } from "@/lib/utils" -import { LatencyChart, LossChart } from "./charts/probes-charts" +import { ResponseChart, LossChart } from "./charts/probes-charts" import type { SystemData } from "./use-system-data" import { $chartTime } from "@/lib/stores" import { useStore } from "@nanostores/react" @@ -63,7 +63,7 @@ function ProbesTable({ systemId, systemData }: { systemId: string; systemData: S {!!chartData && !!probes.length && (
-