From 5fc774666f00208ce9675914df50b1c3207cde0b Mon Sep 17 00:00:00 2001 From: henrygd Date: Wed, 22 Apr 2026 21:40:52 -0400 Subject: [PATCH] updates --- agent/probe.go | 9 ++- agent/probe_test.go | 39 ++++++--- internal/entities/probe/probe.go | 22 ++---- internal/hub/probes.go | 12 ++- internal/hub/probes_test.go | 79 +++++++++++++++++++ internal/hub/systems/system.go | 11 ++- internal/hub/systems/system_manager.go | 27 ++----- .../network-probes-columns.tsx | 15 ++-- .../network-probes-table.tsx | 2 +- .../routes/system/charts/probes-charts.tsx | 17 ++-- internal/site/src/lib/use-network-probes.ts | 10 +-- internal/site/src/types.d.ts | 2 +- 12 files changed, 160 insertions(+), 85 deletions(-) create mode 100644 internal/hub/probes_test.go diff --git a/agent/probe.go b/agent/probe.go index e39c854c..d7142701 100644 --- a/agent/probe.go +++ b/agent/probe.go @@ -161,7 +161,10 @@ func (pm *ProbeManager) SyncProbes(configs []probe.Config) { // Build set of new keys newKeys := make(map[string]probe.Config, len(configs)) for _, cfg := range configs { - newKeys[cfg.Key()] = cfg + if cfg.ID == "" { + continue + } + newKeys[cfg.ID] = cfg } // Stop removed probes @@ -196,7 +199,7 @@ func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result { now := time.Now() duration := time.Duration(durationMs) * time.Millisecond - for key, task := range pm.probes { + for _, task := range pm.probes { task.mu.Lock() agg := task.aggregateLocked(duration, now) hourAgg := task.aggregateLocked(time.Hour, now) @@ -220,7 +223,7 @@ func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result { } else { result = probe.Result{result[0], hourAvg, 0, 0, hourLoss} } - results[key] = result + results[task.config.ID] = result } return results diff --git a/agent/probe_test.go b/agent/probe_test.go index 92a2a94e..93256fea 100644 --- a/agent/probe_test.go +++ b/agent/probe_test.go @@ -72,7 +72,7 @@ func TestProbeTaskAddSampleLockedTrimsRawSamplesButKeepsBucketHistory(t *testing func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) { now := time.Now().UTC() - task := &probeTask{} + task := &probeTask{config: probe.Config{ID: "probe-1"}} task.addSampleLocked(probeSample{responseMs: 10, timestamp: now.Add(-30 * time.Minute)}) task.addSampleLocked(probeSample{responseMs: 20, timestamp: now.Add(-9 * time.Minute)}) task.addSampleLocked(probeSample{responseMs: 40, timestamp: now.Add(-5 * time.Minute)}) @@ -82,7 +82,7 @@ func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) { pm := &ProbeManager{probes: map[string]*probeTask{"icmp:example.com": task}} results := pm.GetResults(uint16(time.Minute / time.Millisecond)) - result, ok := results["icmp:example.com"] + result, ok := results["probe-1"] require.True(t, ok) require.Len(t, result, 5) assert.Equal(t, 30.0, result[0]) @@ -94,14 +94,14 @@ func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) { func TestProbeManagerGetResultsIncludesLossOnlyHourData(t *testing.T) { now := time.Now().UTC() - task := &probeTask{} + task := &probeTask{config: probe.Config{ID: "probe-1"}} task.addSampleLocked(probeSample{responseMs: -1, timestamp: now.Add(-30 * time.Second)}) task.addSampleLocked(probeSample{responseMs: -1, timestamp: now.Add(-10 * time.Second)}) pm := &ProbeManager{probes: map[string]*probeTask{"icmp:example.com": task}} results := pm.GetResults(uint16(time.Minute / time.Millisecond)) - result, ok := results["icmp:example.com"] + result, ok := results["probe-1"] require.True(t, ok) require.Len(t, result, 5) assert.Equal(t, 0.0, result[0]) @@ -111,23 +111,42 @@ func TestProbeManagerGetResultsIncludesLossOnlyHourData(t *testing.T) { assert.Equal(t, 100.0, result[4]) } +func TestProbeConfigResultKeyUsesSyncedID(t *testing.T) { + cfg := probe.Config{ID: "probe-1", Target: "1.1.1.1", Protocol: "icmp", Interval: 10} + assert.Equal(t, "probe-1", cfg.ID) +} + +func TestProbeManagerSyncProbesSkipsConfigsWithoutStableID(t *testing.T) { + validCfg := probe.Config{ID: "probe-1", Target: "https://example.com", Protocol: "http", Interval: 10} + invalidCfg := probe.Config{Target: "1.1.1.1", Protocol: "icmp", Interval: 10} + + pm := newProbeManager() + pm.SyncProbes([]probe.Config{validCfg, invalidCfg}) + defer pm.Stop() + + _, validExists := pm.probes[validCfg.ID] + _, invalidExists := pm.probes[invalidCfg.ID] + assert.True(t, validExists) + assert.False(t, invalidExists) +} + func TestProbeManagerSyncProbesStopsRemovedTasksButKeepsExisting(t *testing.T) { - keepCfg := probe.Config{Target: "https://example.com", Protocol: "http", Interval: 10} - removeCfg := probe.Config{Target: "1.1.1.1", Protocol: "icmp", Interval: 10} + keepCfg := probe.Config{ID: "probe-1", Target: "https://example.com", Protocol: "http", Interval: 10} + removeCfg := probe.Config{ID: "probe-2", Target: "1.1.1.1", Protocol: "icmp", Interval: 10} keptTask := &probeTask{config: keepCfg, cancel: make(chan struct{})} removedTask := &probeTask{config: removeCfg, cancel: make(chan struct{})} pm := &ProbeManager{ probes: map[string]*probeTask{ - keepCfg.Key(): keptTask, - removeCfg.Key(): removedTask, + keepCfg.ID: keptTask, + removeCfg.ID: removedTask, }, } pm.SyncProbes([]probe.Config{keepCfg}) - assert.Same(t, keptTask, pm.probes[keepCfg.Key()]) - _, exists := pm.probes[removeCfg.Key()] + assert.Same(t, keptTask, pm.probes[keepCfg.ID]) + _, exists := pm.probes[removeCfg.ID] assert.False(t, exists) select { diff --git a/internal/entities/probe/probe.go b/internal/entities/probe/probe.go index b7dd3c46..13ca64dd 100644 --- a/internal/entities/probe/probe.go +++ b/internal/entities/probe/probe.go @@ -1,13 +1,13 @@ package probe -import "strconv" - // Config defines a network probe task sent from hub to agent. type Config struct { - Target string `cbor:"0,keyasint" json:"target"` - Protocol string `cbor:"1,keyasint" json:"protocol"` // "icmp", "tcp", or "http" - Port uint16 `cbor:"2,keyasint,omitempty" json:"port,omitempty"` - Interval uint16 `cbor:"3,keyasint" json:"interval"` // seconds + // ID is the stable network_probes record ID generated by the hub. + ID string `cbor:"0,keyasint"` + Target string `cbor:"1,keyasint"` + Protocol string `cbor:"2,keyasint"` // "icmp", "tcp", or "http" + Port uint16 `cbor:"3,keyasint,omitempty"` + Interval uint16 `cbor:"4,keyasint"` // seconds } // Result holds aggregated probe results for a single target. @@ -22,13 +22,3 @@ type Config struct { // // 4: packet loss percentage over the last hour (0-100) type Result []float64 - -// Key returns the map key used for this probe config (e.g. "icmp:1.1.1.1", "tcp:host:443", "http:https://example.com"). -func (c Config) Key() string { - switch c.Protocol { - case "tcp": - return c.Protocol + ":" + c.Target + ":" + strconv.FormatUint(uint64(c.Port), 10) - default: - return c.Protocol + ":" + c.Target - } -} diff --git a/internal/hub/probes.go b/internal/hub/probes.go index a504c99a..9ca3c78d 100644 --- a/internal/hub/probes.go +++ b/internal/hub/probes.go @@ -1,11 +1,20 @@ package hub import ( + "strconv" + "github.com/henrygd/beszel/internal/entities/probe" "github.com/henrygd/beszel/internal/hub/systems" "github.com/pocketbase/pocketbase/core" ) +// generateProbeID creates a stable hash ID for a probe based on its configuration and the system it belongs to. +func generateProbeID(systemId string, config probe.Config) string { + intervalStr := strconv.FormatUint(uint64(config.Interval), 10) + portStr := strconv.FormatUint(uint64(config.Port), 10) + return systems.MakeStableHashId(systemId, config.Protocol, config.Target, portStr, intervalStr) +} + func bindNetworkProbesEvents(h *Hub) { // on create, make sure the id is set to a stable hash h.OnRecordCreate("network_probes").BindFunc(func(e *core.RecordEvent) error { @@ -16,8 +25,7 @@ func bindNetworkProbesEvents(h *Hub) { Port: uint16(e.Record.GetInt("port")), Interval: uint16(e.Record.GetInt("interval")), } - key := config.Key() - id := systems.MakeStableHashId(systemID, key) + id := generateProbeID(systemID, *config) e.Record.Set("id", id) return e.Next() }) diff --git a/internal/hub/probes_test.go b/internal/hub/probes_test.go new file mode 100644 index 00000000..c67bf4f5 --- /dev/null +++ b/internal/hub/probes_test.go @@ -0,0 +1,79 @@ +package hub + +import ( + "testing" + + "github.com/henrygd/beszel/internal/entities/probe" + "github.com/stretchr/testify/assert" +) + +func TestGenerateProbeID(t *testing.T) { + tests := []struct { + name string + systemID string + config probe.Config + expected string + }{ + { + name: "HTTP probe on example.com", + systemID: "sys123", + config: probe.Config{ + Protocol: "http", + Target: "example.com", + Port: 80, + Interval: 60, + }, + expected: "d5f27931", + }, + { + name: "HTTP probe on example.com with different system ID", + systemID: "sys1234", + config: probe.Config{ + Protocol: "http", + Target: "example.com", + Port: 80, + Interval: 60, + }, + expected: "6f8b17f1", + }, + { + name: "Same probe, different interval", + systemID: "sys1234", + config: probe.Config{ + Protocol: "http", + Target: "example.com", + Port: 80, + Interval: 120, + }, + expected: "6d4baf8", + }, + { + name: "ICMP probe on 1.1.1.1", + systemID: "sys456", + config: probe.Config{ + Protocol: "icmp", + Target: "1.1.1.1", + Port: 0, + Interval: 10, + }, + expected: "80b5836b", + }, { + name: "ICMP probe on 1.1.1.1 with different system ID", + systemID: "sys4567", + config: probe.Config{ + Protocol: "icmp", + Target: "1.1.1.1", + Port: 0, + Interval: 10, + }, + expected: "a6652680", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := generateProbeID(tt.systemID, tt.config) + assert.Equal(t, tt.expected, got, "generateProbeID() = %v, want %v", got, tt.expected) + }) + } +} diff --git a/internal/hub/systems/system.go b/internal/hub/systems/system.go index 6c6e1496..f9f4ecd6 100644 --- a/internal/hub/systems/system.go +++ b/internal/hub/systems/system.go @@ -335,7 +335,7 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst if !realtimeActive { db = app.DB() nowString = time.Now().UTC().Format(types.DefaultDateLayout) - sql := fmt.Sprintf("UPDATE %s SET resAvg={:resAvg}, resMin1h={:resMin1h}, resMax1h={:resMax1h}, resAvg1h={:resAvg1h}, loss1h={:loss1h}, updated={:updated} WHERE id={:id}", collectionName) + sql := fmt.Sprintf("UPDATE %s SET resAvg={:res}, resMin1h={:resMin1h}, resMax1h={:resMax1h}, resAvg1h={:resAvg1h}, loss1h={:loss1h}, updated={:updated} WHERE id={:id}", collectionName) updateQuery = db.NewQuery(sql) } @@ -365,14 +365,13 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst } // update network_probes records - for key, values := range data { - id := MakeStableHashId(systemId, key) + for id, values := range data { switch realtimeActive { case true: var record *core.Record record, err = app.FindRecordById(collectionName, id) if err == nil { - record.Set("resAvg", probeMetric(values, 0)) + record.Set("res", probeMetric(values, 0)) record.Set("resAvg1h", probeMetric(values, 1)) record.Set("resMin1h", probeMetric(values, 2)) record.Set("resMax1h", probeMetric(values, 3)) @@ -382,7 +381,7 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst default: _, err = updateQuery.Bind(dbx.Params{ "id": id, - "resAvg": probeMetric(values, 0), + "res": probeMetric(values, 0), "resAvg1h": probeMetric(values, 1), "resMin1h": probeMetric(values, 2), "resMax1h": probeMetric(values, 3), @@ -391,7 +390,7 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst }).Execute() } if err != nil { - app.Logger().Warn("Failed to update probe", "system", systemId, "probe", key, "err", err) + app.Logger().Warn("Failed to update probe", "system", systemId, "probe", id, "err", err) } } diff --git a/internal/hub/systems/system_manager.go b/internal/hub/systems/system_manager.go index 40d78b0e..1ffcfb86 100644 --- a/internal/hub/systems/system_manager.go +++ b/internal/hub/systems/system_manager.go @@ -325,7 +325,7 @@ func (sm *SystemManager) AddWebSocketSystem(systemId string, agentVersion semver configs := sm.GetProbeConfigsForSystem(systemId) if len(configs) > 0 { if err := system.SyncNetworkProbes(configs); err != nil { - sm.hub.Logger().Warn("failed to sync probes on connect", "system", systemId, "err", err) + sm.hub.Logger().Warn("failed to sync probes to agent", "system", systemId, "err", err) } } }() @@ -344,26 +344,11 @@ func (sm *SystemManager) resetFailedSmartFetchState(systemID string) { // GetProbeConfigsForSystem returns all enabled probe configs for a system. func (sm *SystemManager) GetProbeConfigsForSystem(systemID string) []probe.Config { - records, err := sm.hub.FindRecordsByFilter( - "network_probes", - "system = {:system} && enabled = true", - "", - 0, 0, - dbx.Params{"system": systemID}, - ) - if err != nil || len(records) == 0 { - return nil - } - - configs := make([]probe.Config, 0, len(records)) - for _, r := range records { - configs = append(configs, probe.Config{ - Target: r.GetString("target"), - Protocol: r.GetString("protocol"), - Port: uint16(r.GetInt("port")), - Interval: uint16(r.GetInt("interval")), - }) - } + var configs []probe.Config + _ = sm.hub.DB(). + NewQuery("SELECT id, target, protocol, port, interval FROM network_probes WHERE system = {:system} AND enabled = true"). + Bind(dbx.Params{"system": systemID}). + All(&configs) return configs } diff --git a/internal/site/src/components/network-probes-table/network-probes-columns.tsx b/internal/site/src/components/network-probes-table/network-probes-columns.tsx index 0c036d33..69149f9d 100644 --- a/internal/site/src/components/network-probes-table/network-probes-columns.tsx +++ b/internal/site/src/components/network-probes-table/network-probes-columns.tsx @@ -124,22 +124,22 @@ export function getProbeColumns(longestName = 0, longestTarget = 0): ColumnDef record.loss, + accessorFn: (record) => record.loss1h, invertSorting: true, - header: ({ column }) => , + header: ({ column }) => , cell: ({ row }) => { - const { loss, res } = row.original - if (loss === undefined || (!res && !loss)) { + const { loss1h, res } = row.original + if (loss1h === undefined || (!res && !loss1h)) { return - } let color = "bg-green-500" - if (loss) { - color = loss > 20 ? "bg-red-500" : "bg-yellow-500" + if (loss1h) { + color = loss1h > 20 ? "bg-red-500" : "bg-yellow-500" } return ( - {loss}% + {loss1h}% ) }, @@ -232,7 +232,6 @@ function HeaderButton({ > {Icon && } {name} - {/* */} ) } diff --git a/internal/site/src/components/network-probes-table/network-probes-table.tsx b/internal/site/src/components/network-probes-table/network-probes-table.tsx index f32ce38d..ef0a8574 100644 --- a/internal/site/src/components/network-probes-table/network-probes-table.tsx +++ b/internal/site/src/components/network-probes-table/network-probes-table.tsx @@ -102,7 +102,7 @@ export default function NetworkProbesTableNew({ Network Probes
- ICMP/TCP/HTTP response monitoring from agents + Response time monitoring from agents.
diff --git a/internal/site/src/components/routes/system/charts/probes-charts.tsx b/internal/site/src/components/routes/system/charts/probes-charts.tsx index c8c25103..3741edbf 100644 --- a/internal/site/src/components/routes/system/charts/probes-charts.tsx +++ b/internal/site/src/components/routes/system/charts/probes-charts.tsx @@ -7,7 +7,6 @@ import type { ChartData, NetworkProbeRecord, NetworkProbeStatsRecord } from "@/t import { useMemo } from "react" import { atom } from "nanostores" import { useStore } from "@nanostores/react" -import { probeKey } from "@/lib/use-network-probes" const $filter = atom("") @@ -47,7 +46,7 @@ function ProbeChart({ const sortedProbes = [...probes].sort((a, b) => b.resAvg1h - a.resAvg1h) const count = sortedProbes.length const points: DataPoint[] = [] - const visibleKeys: string[] = [] + const visibleIDs: string[] = [] const filterTerms = filter ? filter .toLowerCase() @@ -56,25 +55,25 @@ function ProbeChart({ : [] for (let i = 0; i < count; i++) { const p = sortedProbes[i] - const key = probeKey(p) - const filtered = filterTerms.length > 0 && !filterTerms.some((term) => key.toLowerCase().includes(term)) + const label = p.name || p.target + const filtered = filterTerms.length > 0 && !filterTerms.some((term) => label.toLowerCase().includes(term)) if (filtered) { continue } - visibleKeys.push(key) + visibleIDs.push(p.id) points.push({ order: i, - label: p.name || p.target, - dataKey: (record: NetworkProbeStatsRecord) => record.stats?.[key]?.[valueIndex] ?? "-", + label, + dataKey: (record: NetworkProbeStatsRecord) => record.stats?.[p.id]?.[valueIndex] ?? "-", color: count <= 5 ? i + 1 : `hsl(${(i * 360) / count}, var(--chart-saturation), var(--chart-lightness))`, }) } - return { dataPoints: points, visibleKeys } + return { dataPoints: points, visibleKeys: visibleIDs } }, [probes, filter, valueIndex]) const filteredProbeStats = useMemo(() => { if (!visibleKeys.length) return probeStats - return probeStats.filter((record) => visibleKeys.some((key) => record.stats?.[key] != null)) + return probeStats.filter((record) => visibleKeys.some((id) => record.stats?.[id] != null)) }, [probeStats, visibleKeys]) const legend = dataPoints.length < 10 diff --git a/internal/site/src/lib/use-network-probes.ts b/internal/site/src/lib/use-network-probes.ts index 8ac6c59d..b3acdd26 100644 --- a/internal/site/src/lib/use-network-probes.ts +++ b/internal/site/src/lib/use-network-probes.ts @@ -32,7 +32,7 @@ function appendCacheValue( } const NETWORK_PROBE_FIELDS = - "id,name,system,target,protocol,port,interval,res,resMin1h,resMax1h,resAvg1h,loss,enabled,updated" + "id,name,system,target,protocol,port,interval,res,resMin1h,resMax1h,resAvg1h,loss1h,enabled,updated" interface UseNetworkProbesProps { systemId?: string @@ -245,16 +245,10 @@ export function useNetworkProbesData(props: UseNetworkProbesProps) { } } -export function probeKey(p: NetworkProbeRecord) { - if (p.protocol === "tcp") return `${p.protocol}:${p.target}:${p.port}` - return `${p.protocol}:${p.target}` -} - function probesToStats(probes: NetworkProbeRecord[]): NetworkProbeStatsRecord["stats"] { const stats: NetworkProbeStatsRecord["stats"] = {} for (const probe of probes) { - const key = probeKey(probe) - stats[key] = [probe.res, probe.resAvg1h, probe.resMin1h, probe.resMax1h, probe.loss] + stats[probe.id] = [probe.res, probe.resAvg1h, probe.resMin1h, probe.resMax1h, probe.loss1h] } return stats } diff --git a/internal/site/src/types.d.ts b/internal/site/src/types.d.ts index f0f487c8..a09f03a6 100644 --- a/internal/site/src/types.d.ts +++ b/internal/site/src/types.d.ts @@ -556,7 +556,7 @@ export interface NetworkProbeRecord { resMin1h: number resMax1h: number resAvg1h: number - loss: number + loss1h: number interval: number enabled: boolean updated: string