This commit is contained in:
henrygd
2026-04-29 15:49:43 -04:00
parent b89314889d
commit d2eb3b259a
11 changed files with 103 additions and 110 deletions

View File

@@ -147,27 +147,27 @@ func (agg probeAggregate) hasData() bool {
return agg.totalCount > 0 return agg.totalCount > 0
} }
// result converts the aggregate into the probe result slice format. // result converts the aggregate into the probe result format.
func (agg probeAggregate) result() probe.Result { func (agg probeAggregate) result() probe.Result {
avg := agg.avgResponse() avg := agg.avgResponse()
minUs := 0.0 result := probe.Result{
if agg.successCount > 0 { AvgResponse: avg,
minUs = float64(agg.minUs) MinResponse: agg.minUs,
MaxResponse: agg.maxUs,
PacketLoss: agg.lossPercentage(),
} }
return probe.Result{ if agg.successCount == 0 {
avg, result.MinResponse, result.MaxResponse = 0, 0
minUs,
float64(agg.maxUs),
agg.lossPercentage(),
} }
return result
} }
// avgResponse returns the rounded average of successful samples. // avgResponse returns the rounded average of successful samples.
func (agg probeAggregate) avgResponse() float64 { func (agg probeAggregate) avgResponse() int64 {
if agg.successCount == 0 { if agg.successCount == 0 {
return 0 return 0
} }
return float64(agg.sumUs / agg.successCount) return agg.sumUs / agg.successCount
} }
@@ -398,33 +398,20 @@ func (task *probeTask) resultLocked(duration time.Duration, now time.Time) (prob
agg := task.aggregateLocked(duration, now) agg := task.aggregateLocked(duration, now)
hourAgg := task.aggregateLocked(time.Hour, now) hourAgg := task.aggregateLocked(time.Hour, now)
if !agg.hasData() { if !agg.hasData() {
return nil, false return probe.Result{}, false
} }
result := agg.result() result := agg.result()
res := result[0] result.AvgResponse1h = hourAgg.avgResponse()
res1h := hourAgg.avgResponse() result.MinResponse1h = hourAgg.minUs
resMin := result[1] result.MaxResponse1h = hourAgg.maxUs
resMin1h := float64(hourAgg.minUs) result.PacketLoss1h = hourAgg.lossPercentage()
resMax := result[2]
resMax1h := float64(hourAgg.maxUs)
loss := result[3]
loss1h := hourAgg.lossPercentage()
if hourAgg.successCount == 0 { if hourAgg.successCount == 0 {
resMin1h, resMax1h = 0, 0 result.MinResponse1h, result.MaxResponse1h = 0, 0
} }
return probe.Result{ return result, true
res,
res1h,
resMin,
resMin1h,
resMax,
resMax1h,
loss,
loss1h,
}, true
} }
// aggregateSamplesSince aggregates raw samples newer than the cutoff. // aggregateSamplesSince aggregates raw samples newer than the cutoff.

View File

@@ -24,10 +24,11 @@ func TestProbeTaskAggregateLockedUsesRawSamplesForShortWindows(t *testing.T) {
require.True(t, agg.hasData()) require.True(t, agg.hasData())
assert.Equal(t, int64(2), agg.totalCount) assert.Equal(t, int64(2), agg.totalCount)
assert.Equal(t, int64(1), agg.successCount) assert.Equal(t, int64(1), agg.successCount)
assert.Equal(t, 20.0, agg.result()[0]) result := agg.result()
assert.Equal(t, 20.0, agg.result()[1]) assert.Equal(t, int64(20), result.AvgResponse)
assert.Equal(t, 20.0, agg.result()[2]) assert.Equal(t, int64(20), result.MinResponse)
assert.Equal(t, 50.0, agg.result()[3]) assert.Equal(t, int64(20), result.MaxResponse)
assert.Equal(t, 50.0, result.PacketLoss)
} }
func TestProbeTaskAggregateLockedUsesMinuteBucketsForLongWindows(t *testing.T) { func TestProbeTaskAggregateLockedUsesMinuteBucketsForLongWindows(t *testing.T) {
@@ -44,10 +45,11 @@ func TestProbeTaskAggregateLockedUsesMinuteBucketsForLongWindows(t *testing.T) {
require.True(t, agg.hasData()) require.True(t, agg.hasData())
assert.Equal(t, int64(4), agg.totalCount) assert.Equal(t, int64(4), agg.totalCount)
assert.Equal(t, int64(3), agg.successCount) assert.Equal(t, int64(3), agg.successCount)
assert.Equal(t, 30.0, agg.result()[0]) result := agg.result()
assert.Equal(t, 20.0, agg.result()[1]) assert.Equal(t, int64(30), result.AvgResponse)
assert.Equal(t, 40.0, agg.result()[2]) assert.Equal(t, int64(20), result.MinResponse)
assert.Equal(t, 25.0, agg.result()[3]) assert.Equal(t, int64(40), result.MaxResponse)
assert.Equal(t, 25.0, result.PacketLoss)
} }
func TestProbeTaskAddSampleLockedTrimsRawSamplesButKeepsBucketHistory(t *testing.T) { func TestProbeTaskAddSampleLockedTrimsRawSamplesButKeepsBucketHistory(t *testing.T) {
@@ -64,10 +66,11 @@ func TestProbeTaskAddSampleLockedTrimsRawSamplesButKeepsBucketHistory(t *testing
require.True(t, agg.hasData()) require.True(t, agg.hasData())
assert.Equal(t, int64(2), agg.totalCount) assert.Equal(t, int64(2), agg.totalCount)
assert.Equal(t, int64(2), agg.successCount) assert.Equal(t, int64(2), agg.successCount)
assert.Equal(t, 15.0, agg.result()[0]) result := agg.result()
assert.Equal(t, 10.0, agg.result()[1]) assert.Equal(t, int64(15), result.AvgResponse)
assert.Equal(t, 20.0, agg.result()[2]) assert.Equal(t, int64(10), result.MinResponse)
assert.Equal(t, 0.0, agg.result()[3]) assert.Equal(t, int64(20), result.MaxResponse)
assert.Equal(t, 0.0, result.PacketLoss)
} }
func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) { func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) {
@@ -84,13 +87,14 @@ func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) {
results := pm.GetResults(uint16(time.Minute / time.Millisecond)) results := pm.GetResults(uint16(time.Minute / time.Millisecond))
result, ok := results["probe-1"] result, ok := results["probe-1"]
require.True(t, ok) require.True(t, ok)
require.Len(t, result, 6) assert.Equal(t, int64(30), result.AvgResponse)
assert.Equal(t, 30.0, result[0]) assert.Equal(t, int64(25), result.AvgResponse1h)
assert.Equal(t, 25.0, result[1]) assert.Equal(t, int64(30), result.MinResponse)
assert.Equal(t, 10.0, result[2]) assert.Equal(t, int64(10), result.MinResponse1h)
assert.Equal(t, 40.0, result[3]) assert.Equal(t, int64(30), result.MaxResponse)
assert.Equal(t, 50.0, result[4]) assert.Equal(t, int64(40), result.MaxResponse1h)
assert.Equal(t, 20.0, result[5]) assert.Equal(t, 50.0, result.PacketLoss)
assert.Equal(t, 20.0, result.PacketLoss1h)
} }
func TestProbeManagerGetResultsIncludesLossOnlyHourData(t *testing.T) { func TestProbeManagerGetResultsIncludesLossOnlyHourData(t *testing.T) {
@@ -104,13 +108,14 @@ func TestProbeManagerGetResultsIncludesLossOnlyHourData(t *testing.T) {
results := pm.GetResults(uint16(time.Minute / time.Millisecond)) results := pm.GetResults(uint16(time.Minute / time.Millisecond))
result, ok := results["probe-1"] result, ok := results["probe-1"]
require.True(t, ok) require.True(t, ok)
require.Len(t, result, 6) assert.Equal(t, int64(0), result.AvgResponse)
assert.Equal(t, 0.0, result[0]) assert.Equal(t, int64(0), result.AvgResponse1h)
assert.Equal(t, 0.0, result[1]) assert.Equal(t, int64(0), result.MinResponse)
assert.Equal(t, 0.0, result[2]) assert.Equal(t, int64(0), result.MinResponse1h)
assert.Equal(t, 0.0, result[3]) assert.Equal(t, int64(0), result.MaxResponse)
assert.Equal(t, 100.0, result[4]) assert.Equal(t, int64(0), result.MaxResponse1h)
assert.Equal(t, 100.0, result[5]) assert.Equal(t, 100.0, result.PacketLoss)
assert.Equal(t, 100.0, result.PacketLoss1h)
} }
func TestProbeConfigResultKeyUsesSyncedID(t *testing.T) { func TestProbeConfigResultKeyUsesSyncedID(t *testing.T) {
@@ -207,10 +212,9 @@ func TestProbeManagerApplySyncUpsertRunsImmediatelyAndReturnsResult(t *testing.T
defer pm.Stop() defer pm.Stop()
require.NoError(t, err) require.NoError(t, err)
require.Len(t, resp.Result, 6) assert.GreaterOrEqual(t, resp.Result.AvgResponse, int64(0))
assert.GreaterOrEqual(t, resp.Result[0], 0.0) assert.Equal(t, 0.0, resp.Result.PacketLoss)
assert.Equal(t, 0.0, resp.Result[4]) assert.Equal(t, 0.0, resp.Result.PacketLoss1h)
assert.Equal(t, 0.0, resp.Result[5])
task := pm.probes["probe-1"] task := pm.probes["probe-1"]
require.NotNil(t, task) require.NotNil(t, task)
@@ -252,7 +256,7 @@ func TestProbeManagerUpsertProbeKeepsHistoryWhenOnlyIntervalChanges(t *testing.T
require.True(t, agg.hasData()) require.True(t, agg.hasData())
assert.Equal(t, int64(2), agg.totalCount) assert.Equal(t, int64(2), agg.totalCount)
assert.Equal(t, int64(2), agg.successCount) assert.Equal(t, int64(2), agg.successCount)
assert.Equal(t, 18.0, agg.avgResponse()) assert.Equal(t, int64(18), agg.avgResponse())
select { select {
case <-existingTask.cancel: case <-existingTask.cancel:

View File

@@ -51,14 +51,15 @@ type SyncResponse struct {
// 6: packet loss percentage (0-100) // 6: packet loss percentage (0-100)
// //
// 7: 1h packet loss percentage (0-100) // 7: 1h packet loss percentage (0-100)
type Result []float64 type Result struct {
AvgResponse int64 `cbor:"0,keyasint,omitempty"`
// Get returns the value at the specified index or 0 if the index is out of range. AvgResponse1h int64 `cbor:"1,keyasint,omitempty"`
func (r Result) Get(index int) float64 { MinResponse int64 `cbor:"2,keyasint,omitempty"`
if index < len(r) { MinResponse1h int64 `cbor:"3,keyasint,omitempty"`
return r[index] MaxResponse int64 `cbor:"4,keyasint,omitempty"`
} MaxResponse1h int64 `cbor:"5,keyasint,omitempty"`
return 0 PacketLoss float64 `cbor:"6,keyasint,omitempty"`
PacketLoss1h float64 `cbor:"7,keyasint,omitempty"`
} }
// Stats holds only 1m values for a single target, which are used for charts. // Stats holds only 1m values for a single target, which are used for charts.
@@ -74,9 +75,9 @@ type Stats []float64
func (s Stats) FromResult(result Result) Stats { func (s Stats) FromResult(result Result) Stats {
return Stats{ return Stats{
result.Get(0), // avg response float64(result.AvgResponse),
result.Get(2), // min response float64(result.MinResponse),
result.Get(4), // max response float64(result.MaxResponse),
result.Get(6), // packet loss result.PacketLoss,
} }
} }

View File

@@ -96,14 +96,12 @@ func probeConfigFromRecord(record *core.Record) *probe.Config {
// setProbeResultFields stores the latest probe result values on the record. // setProbeResultFields stores the latest probe result values on the record.
func setProbeResultFields(record *core.Record, result probe.Result) { func setProbeResultFields(record *core.Record, result probe.Result) {
now := time.Now().UTC() nowString := time.Now().UTC().Format(types.DefaultDateLayout)
nowString := now.Format(types.DefaultDateLayout) record.Set("res", result.AvgResponse)
record.Set("res", result.Get(0)) record.Set("resAvg1h", result.AvgResponse1h)
record.Set("resAvg1h", result.Get(1)) record.Set("resMin1h", result.MinResponse1h)
record.Set("resMin1h", result.Get(3)) record.Set("resMax1h", result.MaxResponse1h)
record.Set("resMax1h", result.Get(5)) record.Set("loss1h", result.PacketLoss1h)
// record.Set("loss", result.Get(4))
record.Set("loss1h", result.Get(7))
record.Set("updated", nowString) record.Set("updated", nowString)
} }

View File

@@ -320,7 +320,7 @@ func updateNetworkProbesRecords(app core.App, probeResults map[string]probe.Resu
return nil return nil
} }
var err error var err error
probeCollectionName := "network_probes" const probeCollectionName = "network_probes"
// If realtime updates are active, we save via PocketBase records to trigger realtime events. // If realtime updates are active, we save via PocketBase records to trigger realtime events.
// Otherwise we can do a more efficient direct update via SQL // Otherwise we can do a more efficient direct update via SQL
@@ -345,14 +345,14 @@ func updateNetworkProbesRecords(app core.App, probeResults map[string]probe.Resu
} }
// update network_probes records // update network_probes records
for id, values := range probeResults { for id, result := range probeResults {
probeData := map[string]any{ probeData := map[string]any{
"id": id, "id": id,
"res": values.Get(0), "res": result.AvgResponse,
"resAvg1h": values.Get(1), "resAvg1h": result.AvgResponse1h,
"resMin1h": values.Get(3), "resMin1h": result.MinResponse1h,
"resMax1h": values.Get(5), "resMax1h": result.MaxResponse1h,
"loss1h": values.Get(7), "loss1h": result.PacketLoss1h,
"updated": nowString, "updated": nowString,
} }
switch realtimeActive { switch realtimeActive {
@@ -372,12 +372,12 @@ func updateNetworkProbesRecords(app core.App, probeResults map[string]probe.Resu
} }
// handle stats collection as well // handle stats collection as well
statsCollectionName := "network_probe_stats" const statsCollectionName = "network_probe_stats"
// we don't need the hour values for the stats collection // we don't need the hour values for the stats collection
stats := make(map[string]probe.Stats, len(probeResults)) stats := make(map[string]probe.Stats, len(probeResults))
for key, values := range probeResults { for key, result := range probeResults {
stats[key] = probe.Stats{}.FromResult(values) stats[key] = probe.Stats{}.FromResult(result)
} }
statsRecordData := map[string]any{ statsRecordData := map[string]any{

View File

@@ -24,7 +24,7 @@ func (sys *System) UpsertNetworkProbe(config probe.Config, runNow bool) (*probe.
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(resp.Result) == 0 { if resp.Result == (probe.Result{}) {
return nil, nil return nil, nil
} }
result := resp.Result result := resp.Result

View File

@@ -32,13 +32,13 @@ func TestAverageProbeStats(t *testing.T) {
recordA, err := tests.CreateRecord(hub, "network_probe_stats", map[string]any{ recordA, err := tests.CreateRecord(hub, "network_probe_stats", map[string]any{
"system": system.Id, "system": system.Id,
"type": "1m", "type": "1m",
"stats": `{"icmp:1.1.1.1":[10,80,8,14,1]}`, "stats": `{"icmp:1.1.1.1":[10,5,20,1.5]}`,
}) })
require.NoError(t, err) require.NoError(t, err)
recordB, err := tests.CreateRecord(hub, "network_probe_stats", map[string]any{ recordB, err := tests.CreateRecord(hub, "network_probe_stats", map[string]any{
"system": system.Id, "system": system.Id,
"type": "1m", "type": "1m",
"stats": `{"icmp:1.1.1.1":[40,100,9,50,5]}`, "stats": `{"icmp:1.1.1.1":[22.5,10,60,0]}`,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -49,10 +49,9 @@ func TestAverageProbeStats(t *testing.T) {
stats, ok := result["icmp:1.1.1.1"] stats, ok := result["icmp:1.1.1.1"]
require.True(t, ok) require.True(t, ok)
require.Len(t, stats, 5) require.Len(t, stats, 4)
assert.Equal(t, 25.0, stats[0]) assert.InDelta(t, 16.25, stats[0], 0.001) // avg of avg
assert.Equal(t, 90.0, stats[1]) assert.InDelta(t, 5, stats[1], 0.001) // min of mins
assert.Equal(t, 8.0, stats[2]) assert.InDelta(t, 60, stats[2], 0.001) // max of maxes
assert.Equal(t, 50.0, stats[3]) assert.InDelta(t, 0.75, stats[3], 0.001) // avg of packet loss
assert.Equal(t, 3.0, stats[4])
} }

View File

@@ -532,9 +532,9 @@ func AverageContainerStatsSlice(records [][]container.Stats) []container.Stats {
// AverageProbeStats averages probe stats across multiple records. // AverageProbeStats averages probe stats across multiple records.
// For each probe key: avg of average fields, min of mins, and max of maxes. // For each probe key: avg of average fields, min of mins, and max of maxes.
func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) map[string]probe.Result { func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) map[string]probe.Stats {
type probeValues struct { type probeValues struct {
sums probe.Result sums probe.Stats
counts []int counts []int
} }
@@ -546,18 +546,18 @@ func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) ma
for _, rec := range records { for _, rec := range records {
row.Stats = row.Stats[:0] row.Stats = row.Stats[:0]
query.Bind(dbx.Params{"id": rec.Id}).One(&row) query.Bind(dbx.Params{"id": rec.Id}).One(&row)
var rawStats map[string]probe.Result var rawStats map[string]probe.Stats
if err := json.Unmarshal(row.Stats, &rawStats); err != nil { if err := json.Unmarshal(row.Stats, &rawStats); err != nil {
continue continue
} }
for key, vals := range rawStats { for key, vals := range rawStats {
s, ok := sums[key] s, ok := sums[key]
if !ok { if !ok {
s = &probeValues{sums: make(probe.Result, len(vals)), counts: make([]int, len(vals))} s = &probeValues{sums: make(probe.Stats, len(vals)), counts: make([]int, len(vals))}
sums[key] = s sums[key] = s
} }
if len(vals) > len(s.sums) { if len(vals) > len(s.sums) {
expandedSums := make(probe.Result, len(vals)) expandedSums := make(probe.Stats, len(vals))
copy(expandedSums, s.sums) copy(expandedSums, s.sums)
s.sums = expandedSums s.sums = expandedSums
@@ -584,7 +584,7 @@ func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) ma
} }
// compute final averages // compute final averages
result := make(map[string]probe.Result, len(sums)) result := make(map[string]probe.Stats, len(sums))
for key, s := range sums { for key, s := range sums {
if len(s.counts) == 0 { if len(s.counts) == 0 {
continue continue

View File

@@ -37,7 +37,7 @@ import { $allSystemsById, $chartTime, $direction } from "@/lib/stores"
import { cn, isVisuallyLonger, useBrowserStorage } from "@/lib/utils" import { cn, isVisuallyLonger, useBrowserStorage } from "@/lib/utils"
import type { NetworkProbeRecord } from "@/types" import type { NetworkProbeRecord } from "@/types"
import { AddProbeDialog, EditProbeDialog } from "./probe-dialog" import { AddProbeDialog, EditProbeDialog } from "./probe-dialog"
import { XIcon } from "lucide-react" import { ArrowLeftRightIcon, EthernetPortIcon, GlobeIcon, ServerIcon, XIcon } from "lucide-react"
import { Sheet, SheetContent, SheetDescription, SheetHeader, SheetTitle } from "@/components/ui/sheet" import { Sheet, SheetContent, SheetDescription, SheetHeader, SheetTitle } from "@/components/ui/sheet"
import ChartTimeSelect from "@/components/charts/chart-time-select" import ChartTimeSelect from "@/components/charts/chart-time-select"
import { LossChart, AvgMinMaxResponseChart } from "@/components/routes/system/charts/probes-charts" import { LossChart, AvgMinMaxResponseChart } from "@/components/routes/system/charts/probes-charts"
@@ -501,16 +501,20 @@ function NetworkProbeSheetContent({
<SheetHeader className="mb-0 border-b p-0 pb-4"> <SheetHeader className="mb-0 border-b p-0 pb-4">
<SheetTitle>{probeLabel}</SheetTitle> <SheetTitle>{probeLabel}</SheetTitle>
<SheetDescription className="flex flex-wrap items-center gap-x-2 gap-y-1"> <SheetDescription className="flex flex-wrap items-center gap-x-2 gap-y-1">
<ServerIcon className="size-3.5 text-muted-foreground" />
<Link className="hover:underline" href={getPagePath($router, "system", { id: system?.id ?? "" })}> <Link className="hover:underline" href={getPagePath($router, "system", { id: system?.id ?? "" })}>
{system?.name ?? ""} {system?.name ?? ""}
</Link> </Link>
<Separator orientation="vertical" className="h-2.5 bg-muted-foreground opacity-70" /> <Separator orientation="vertical" className="h-2.5 bg-muted-foreground opacity-70" />
<ArrowLeftRightIcon className="size-3.5 text-muted-foreground" />
{probe.protocol.toUpperCase()} {probe.protocol.toUpperCase()}
<Separator orientation="vertical" className="h-2.5 bg-muted-foreground opacity-70" /> <Separator orientation="vertical" className="h-2.5 bg-muted-foreground opacity-70" />
<GlobeIcon className="size-3.5 text-muted-foreground" />
{probe.target} {probe.target}
{probe.port > 0 && ( {probe.port > 0 && (
<> <>
<Separator orientation="vertical" className="h-2.5 bg-muted-foreground opacity-70" /> <Separator orientation="vertical" className="h-2.5 bg-muted-foreground opacity-70" />
<EthernetPortIcon className="size-3.5 text-muted-foreground" />
<span>{probe.port}</span> <span>{probe.port}</span>
</> </>
)} )}

View File

@@ -40,7 +40,7 @@ type NormalizedProbeValues = Omit<ProbeValues, "system" | "interval"> & {
type BulkProbeLineSource = Pick<NetworkProbeRecord, "target" | "protocol" | "port" | "interval" | "name"> type BulkProbeLineSource = Pick<NetworkProbeRecord, "target" | "protocol" | "port" | "interval" | "name">
const defaultInterval = 20 const defaultInterval = 30
const ProbeProtocolSchema = v.picklist(["icmp", "tcp", "http"]) const ProbeProtocolSchema = v.picklist(["icmp", "tcp", "http"])

View File

@@ -81,7 +81,7 @@ function ProbeChart({
return probeStats.filter((record) => visibleKeys.some((id) => record.stats?.[id] != null)) return probeStats.filter((record) => visibleKeys.some((id) => record.stats?.[id] != null))
}, [probeStats, visibleKeys]) }, [probeStats, visibleKeys])
const legend = dataPoints.length < 10 && dataPoints.length > 1 const legend = dataPoints.length < 10 && showFilter
return ( return (
<ChartCard <ChartCard