From e71ffd4d2adc046688bf9b913b06032b75344563 Mon Sep 17 00:00:00 2001 From: henrygd Date: Sun, 19 Apr 2026 21:44:21 -0400 Subject: [PATCH] updates --- agent/agent.go | 7 ++- agent/handlers.go | 12 ---- agent/probe.go | 8 +-- internal/common/common-ws.go | 2 - internal/entities/probe/probe.go | 19 +++--- internal/entities/system/system.go | 12 ++-- internal/hub/probes.go | 18 ++++++ internal/hub/systems/system.go | 39 +++++++++--- internal/hub/systems/system_probes.go | 62 +++++++++---------- internal/hub/systems/system_realtime.go | 28 +-------- internal/hub/systems/system_smart.go | 2 +- internal/hub/systems/system_systemd_test.go | 18 +++--- .../network-probes-columns.tsx | 15 ++++- .../network-probes-table.tsx | 6 +- .../network-probes-table/probe-dialog.tsx | 1 + 15 files changed, 136 insertions(+), 113 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 7a774378..888a4d79 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -48,7 +48,7 @@ type Agent struct { keys []gossh.PublicKey // SSH public keys smartManager *SmartManager // Manages SMART data systemdManager *systemdManager // Manages systemd services - probeManager *ProbeManager // Manages network probes + probeManager *ProbeManager // Manages network probes } // NewAgent creates a new agent with the given data directory for persisting data. @@ -182,6 +182,11 @@ func (a *Agent) gatherStats(options common.DataRequestOptions) *system.CombinedD } } + if a.probeManager != nil { + data.Probes = a.probeManager.GetResults() + slog.Debug("Probes", "data", data.Probes) + } + // skip updating systemd services if cache time is not the default 60sec interval if a.systemdManager != nil && cacheTimeMs == defaultDataCacheTimeMs { totalCount := uint16(a.systemdManager.getServiceStatsCount()) diff --git a/agent/handlers.go b/agent/handlers.go index f46aebdf..deab1afb 100644 --- a/agent/handlers.go +++ b/agent/handlers.go @@ -53,7 +53,6 @@ func NewHandlerRegistry() *HandlerRegistry { registry.Register(common.GetSmartData, &GetSmartDataHandler{}) registry.Register(common.GetSystemdInfo, &GetSystemdInfoHandler{}) registry.Register(common.SyncNetworkProbes, &SyncNetworkProbesHandler{}) - registry.Register(common.GetNetworkProbeResults, &GetNetworkProbeResultsHandler{}) return registry } @@ -222,14 +221,3 @@ func (h *SyncNetworkProbesHandler) Handle(hctx *HandlerContext) error { slog.Info("network probes synced", "count", len(configs)) return hctx.SendResponse("ok", hctx.RequestID) } - -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// - -// GetNetworkProbeResultsHandler handles probe results request from hub -type GetNetworkProbeResultsHandler struct{} - -func (h *GetNetworkProbeResultsHandler) Handle(hctx *HandlerContext) error { - results := hctx.Agent.probeManager.GetResults() - return hctx.SendResponse(results, hctx.RequestID) -} diff --git a/agent/probe.go b/agent/probe.go index f6165444..1e28589b 100644 --- a/agent/probe.go +++ b/agent/probe.go @@ -120,10 +120,10 @@ func (pm *ProbeManager) GetResults() map[string]probe.Result { } results[key] = probe.Result{ - AvgMs: avg, - MinMs: math.Round(minMs*100) / 100, - MaxMs: math.Round(maxMs*100) / 100, - Loss: math.Round(float64(lossCount)/float64(count)*10000) / 100, + avg, // average latency in ms + math.Round(minMs*100) / 100, // min latency in ms + math.Round(maxMs*100) / 100, // max latency in ms + math.Round(float64(lossCount)/float64(count)*10000) / 100, // packet loss percentage } } diff --git a/internal/common/common-ws.go b/internal/common/common-ws.go index 89a3c823..68bd250f 100644 --- a/internal/common/common-ws.go +++ b/internal/common/common-ws.go @@ -24,8 +24,6 @@ const ( GetSystemdInfo // Sync network probe configuration to agent SyncNetworkProbes - // Request network probe results from agent - GetNetworkProbeResults // Add new actions here... ) diff --git a/internal/entities/probe/probe.go b/internal/entities/probe/probe.go index b040c8f0..d4f71aa8 100644 --- a/internal/entities/probe/probe.go +++ b/internal/entities/probe/probe.go @@ -1,6 +1,6 @@ package probe -import "fmt" +import "strconv" // Config defines a network probe task sent from hub to agent. type Config struct { @@ -11,18 +11,21 @@ type Config struct { } // Result holds aggregated probe results for a single target. -type Result struct { - AvgMs float64 `cbor:"0,keyasint" json:"avg"` - MinMs float64 `cbor:"1,keyasint" json:"min"` - MaxMs float64 `cbor:"2,keyasint" json:"max"` - Loss float64 `cbor:"3,keyasint" json:"loss"` // packet loss % -} +// +// 0: avg latency in ms +// +// 1: min latency in ms +// +// 2: max latency in ms +// +// 3: packet loss percentage (0-100) +type Result []float64 // Key returns the map key used for this probe config (e.g. "icmp:1.1.1.1", "tcp:host:443", "http:https://example.com"). func (c Config) Key() string { switch c.Protocol { case "tcp": - return c.Protocol + ":" + c.Target + ":" + fmt.Sprintf("%d", c.Port) + return c.Protocol + ":" + c.Target + ":" + strconv.FormatUint(uint64(c.Port), 10) default: return c.Protocol + ":" + c.Target } diff --git a/internal/entities/system/system.go b/internal/entities/system/system.go index 81da1fee..9aa90efc 100644 --- a/internal/entities/system/system.go +++ b/internal/entities/system/system.go @@ -7,6 +7,7 @@ import ( "time" "github.com/henrygd/beszel/internal/entities/container" + "github.com/henrygd/beszel/internal/entities/probe" "github.com/henrygd/beszel/internal/entities/systemd" ) @@ -174,9 +175,10 @@ type Details struct { // Final data structure to return to the hub type CombinedData struct { - Stats Stats `json:"stats" cbor:"0,keyasint"` - Info Info `json:"info" cbor:"1,keyasint"` - Containers []*container.Stats `json:"container" cbor:"2,keyasint"` - SystemdServices []*systemd.Service `json:"systemd,omitempty" cbor:"3,keyasint,omitempty"` - Details *Details `cbor:"4,keyasint,omitempty"` + Stats Stats `json:"stats" cbor:"0,keyasint"` + Info Info `json:"info" cbor:"1,keyasint"` + Containers []*container.Stats `json:"container" cbor:"2,keyasint"` + SystemdServices []*systemd.Service `json:"systemd,omitempty" cbor:"3,keyasint,omitempty"` + Details *Details `cbor:"4,keyasint,omitempty"` + Probes map[string]probe.Result `cbor:"5,keyasint,omitempty"` } diff --git a/internal/hub/probes.go b/internal/hub/probes.go index 849fe70d..a504c99a 100644 --- a/internal/hub/probes.go +++ b/internal/hub/probes.go @@ -1,10 +1,27 @@ package hub import ( + "github.com/henrygd/beszel/internal/entities/probe" + "github.com/henrygd/beszel/internal/hub/systems" "github.com/pocketbase/pocketbase/core" ) func bindNetworkProbesEvents(h *Hub) { + // on create, make sure the id is set to a stable hash + h.OnRecordCreate("network_probes").BindFunc(func(e *core.RecordEvent) error { + systemID := e.Record.GetString("system") + config := &probe.Config{ + Target: e.Record.GetString("target"), + Protocol: e.Record.GetString("protocol"), + Port: uint16(e.Record.GetInt("port")), + Interval: uint16(e.Record.GetInt("interval")), + } + key := config.Key() + id := systems.MakeStableHashId(systemID, key) + e.Record.Set("id", id) + return e.Next() + }) + // sync probe to agent on creation h.OnRecordAfterCreateSuccess("network_probes").BindFunc(func(e *core.RecordEvent) error { systemID := e.Record.GetString("system") @@ -17,6 +34,7 @@ func bindNetworkProbesEvents(h *Hub) { h.syncProbesToAgent(systemID) return e.Next() }) + // TODO: if enabled changes, sync to agent } // syncProbesToAgent fetches enabled probes for a system and sends them to the agent. diff --git a/internal/hub/systems/system.go b/internal/hub/systems/system.go index 7a7ed6ea..b47e07f0 100644 --- a/internal/hub/systems/system.go +++ b/internal/hub/systems/system.go @@ -18,6 +18,7 @@ import ( "github.com/henrygd/beszel/internal/hub/ws" "github.com/henrygd/beszel/internal/entities/container" + "github.com/henrygd/beszel/internal/entities/probe" "github.com/henrygd/beszel/internal/entities/smart" "github.com/henrygd/beszel/internal/entities/system" "github.com/henrygd/beszel/internal/entities/systemd" @@ -29,6 +30,7 @@ import ( "github.com/lxzan/gws" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/core" + "github.com/pocketbase/pocketbase/tools/types" "golang.org/x/crypto/ssh" ) @@ -167,11 +169,6 @@ func (sys *System) update() error { } } - // Fetch and save network probe results - if sys.hasEnabledProbes() { - go sys.fetchAndSaveProbeResults() - } - return err } @@ -243,6 +240,12 @@ func (sys *System) createRecords(data *system.CombinedData) (*core.Record, error } } + if data.Probes != nil { + if err := updateNetworkProbesRecords(txApp, data.Probes, sys.Id); err != nil { + return err + } + } + // update system record (do this last because it triggers alerts and we need above records to be inserted first) systemRecord.Set("status", up) systemRecord.Set("info", data.Info) @@ -294,7 +297,7 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s for i, service := range data { suffix := fmt.Sprintf("%d", i) valueStrings = append(valueStrings, fmt.Sprintf("({:id%[1]s}, {:system}, {:name%[1]s}, {:state%[1]s}, {:sub%[1]s}, {:cpu%[1]s}, {:cpuPeak%[1]s}, {:memory%[1]s}, {:memPeak%[1]s}, {:updated})", suffix)) - params["id"+suffix] = makeStableHashId(systemId, service.Name) + params["id"+suffix] = MakeStableHashId(systemId, service.Name) params["name"+suffix] = service.Name params["state"+suffix] = service.State params["sub"+suffix] = service.Sub @@ -311,6 +314,28 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s return err } +func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, systemId string) error { + if len(data) == 0 { + return nil + } + collectionName := "network_probes" + for key := range data { + probe := data[key] + id := MakeStableHashId(systemId, key) + params := dbx.Params{ + // "system": systemId, + "latency": probe[0], + "loss": probe[3], + "updated": time.Now().UTC().Format(types.DefaultDateLayout), + } + _, err := app.DB().Update(collectionName, params, dbx.HashExp{"id": id}).Execute() + if err != nil { + app.Logger().Warn("Failed to update network probe record", "system", systemId, "probe", key, "err", err) + } + } + return nil +} + // createContainerRecords creates container records func createContainerRecords(app core.App, data []*container.Stats, systemId string) error { if len(data) == 0 { @@ -545,7 +570,7 @@ func (sys *System) FetchSmartDataFromAgent() (map[string]smart.SmartData, error) return result, err } -func makeStableHashId(strings ...string) string { +func MakeStableHashId(strings ...string) string { hash := fnv.New32a() for _, str := range strings { hash.Write([]byte(str)) diff --git a/internal/hub/systems/system_probes.go b/internal/hub/systems/system_probes.go index b5a6aa63..69dfe4d5 100644 --- a/internal/hub/systems/system_probes.go +++ b/internal/hub/systems/system_probes.go @@ -6,8 +6,6 @@ import ( "github.com/henrygd/beszel/internal/common" "github.com/henrygd/beszel/internal/entities/probe" - "github.com/pocketbase/dbx" - "github.com/pocketbase/pocketbase/core" ) // SyncNetworkProbes sends probe configurations to the agent. @@ -19,41 +17,41 @@ func (sys *System) SyncNetworkProbes(configs []probe.Config) error { } // FetchNetworkProbeResults fetches probe results from the agent. -func (sys *System) FetchNetworkProbeResults() (map[string]probe.Result, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - var results map[string]probe.Result - err := sys.request(ctx, common.GetNetworkProbeResults, nil, &results) - return results, err -} +// func (sys *System) FetchNetworkProbeResults() (map[string]probe.Result, error) { +// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +// defer cancel() +// var results map[string]probe.Result +// err := sys.request(ctx, common.GetNetworkProbeResults, nil, &results) +// return results, err +// } // hasEnabledProbes returns true if this system has any enabled network probes. -func (sys *System) hasEnabledProbes() bool { - count, err := sys.manager.hub.CountRecords("network_probes", - dbx.NewExp("system = {:system} AND enabled = true", dbx.Params{"system": sys.Id})) - return err == nil && count > 0 -} +// func (sys *System) hasEnabledProbes() bool { +// count, err := sys.manager.hub.CountRecords("network_probes", +// dbx.NewExp("system = {:system} AND enabled = true", dbx.Params{"system": sys.Id})) +// return err == nil && count > 0 +// } // fetchAndSaveProbeResults fetches probe results and saves them to the database. -func (sys *System) fetchAndSaveProbeResults() { - hub := sys.manager.hub +// func (sys *System) fetchAndSaveProbeResults() { +// hub := sys.manager.hub - results, err := sys.FetchNetworkProbeResults() - if err != nil || len(results) == 0 { - return - } +// results, err := sys.FetchNetworkProbeResults() +// if err != nil || len(results) == 0 { +// return +// } - collection, err := hub.FindCachedCollectionByNameOrId("network_probe_stats") - if err != nil { - return - } +// collection, err := hub.FindCachedCollectionByNameOrId("network_probe_stats") +// if err != nil { +// return +// } - record := core.NewRecord(collection) - record.Set("system", sys.Id) - record.Set("stats", results) - record.Set("type", "1m") +// record := core.NewRecord(collection) +// record.Set("system", sys.Id) +// record.Set("stats", results) +// record.Set("type", "1m") - if err := hub.SaveNoValidate(record); err != nil { - hub.Logger().Warn("failed to save probe stats", "system", sys.Id, "err", err) - } -} +// if err := hub.SaveNoValidate(record); err != nil { +// hub.Logger().Warn("failed to save probe stats", "system", sys.Id, "err", err) +// } +// } diff --git a/internal/hub/systems/system_realtime.go b/internal/hub/systems/system_realtime.go index 729feff1..6c27d0bd 100644 --- a/internal/hub/systems/system_realtime.go +++ b/internal/hub/systems/system_realtime.go @@ -7,21 +7,10 @@ import ( "time" "github.com/henrygd/beszel/internal/common" - "github.com/henrygd/beszel/internal/entities/container" - "github.com/henrygd/beszel/internal/entities/probe" - "github.com/henrygd/beszel/internal/entities/system" "github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/tools/subscriptions" ) -// realtimePayload wraps system data with optional network probe results for realtime broadcast. -type realtimePayload struct { - Stats system.Stats `json:"stats"` - Info system.Info `json:"info"` - Containers []*container.Stats `json:"container"` - Probes map[string]probe.Result `json:"probes,omitempty"` -} - type subscriptionInfo struct { subscription string connectedClients uint8 @@ -153,27 +142,16 @@ func (sm *SystemManager) startRealtimeWorker() { // fetchRealtimeDataAndNotify fetches realtime data for all active subscriptions and notifies the clients. func (sm *SystemManager) fetchRealtimeDataAndNotify() { for systemId, info := range activeSubscriptions { - sys, err := sm.GetSystem(systemId) + system, err := sm.GetSystem(systemId) if err != nil { continue } go func() { - data, err := sys.fetchDataFromAgent(common.DataRequestOptions{CacheTimeMs: 1000}) + data, err := system.fetchDataFromAgent(common.DataRequestOptions{CacheTimeMs: 1000}) if err != nil { return } - payload := realtimePayload{ - Stats: data.Stats, - Info: data.Info, - Containers: data.Containers, - } - // Fetch network probe results (lightweight in-memory read on agent) - if sys.hasEnabledProbes() { - if probes, err := sys.FetchNetworkProbeResults(); err == nil && len(probes) > 0 { - payload.Probes = probes - } - } - bytes, err := json.Marshal(payload) + bytes, err := json.Marshal(data) if err == nil { notify(sm.hub, info.subscription, bytes) } diff --git a/internal/hub/systems/system_smart.go b/internal/hub/systems/system_smart.go index c40f643d..d5a98151 100644 --- a/internal/hub/systems/system_smart.go +++ b/internal/hub/systems/system_smart.go @@ -84,7 +84,7 @@ func (sys *System) saveSmartDevices(smartData map[string]smart.SmartData) error func (sys *System) upsertSmartDeviceRecord(collection *core.Collection, deviceKey string, device smart.SmartData) error { hub := sys.manager.hub - recordID := makeStableHashId(sys.Id, deviceKey) + recordID := MakeStableHashId(sys.Id, deviceKey) record, err := hub.FindRecordById(collection, recordID) if err != nil { diff --git a/internal/hub/systems/system_systemd_test.go b/internal/hub/systems/system_systemd_test.go index c2d890d5..d5b103e0 100644 --- a/internal/hub/systems/system_systemd_test.go +++ b/internal/hub/systems/system_systemd_test.go @@ -14,9 +14,9 @@ func TestGetSystemdServiceId(t *testing.T) { serviceName := "nginx.service" // Call multiple times and ensure same result - id1 := makeStableHashId(systemId, serviceName) - id2 := makeStableHashId(systemId, serviceName) - id3 := makeStableHashId(systemId, serviceName) + id1 := MakeStableHashId(systemId, serviceName) + id2 := MakeStableHashId(systemId, serviceName) + id3 := MakeStableHashId(systemId, serviceName) assert.Equal(t, id1, id2) assert.Equal(t, id2, id3) @@ -29,10 +29,10 @@ func TestGetSystemdServiceId(t *testing.T) { serviceName1 := "nginx.service" serviceName2 := "apache.service" - id1 := makeStableHashId(systemId1, serviceName1) - id2 := makeStableHashId(systemId2, serviceName1) - id3 := makeStableHashId(systemId1, serviceName2) - id4 := makeStableHashId(systemId2, serviceName2) + id1 := MakeStableHashId(systemId1, serviceName1) + id2 := MakeStableHashId(systemId2, serviceName1) + id3 := MakeStableHashId(systemId1, serviceName2) + id4 := MakeStableHashId(systemId2, serviceName2) // All IDs should be different assert.NotEqual(t, id1, id2) @@ -56,14 +56,14 @@ func TestGetSystemdServiceId(t *testing.T) { } for _, tc := range testCases { - id := makeStableHashId(tc.systemId, tc.serviceName) + id := MakeStableHashId(tc.systemId, tc.serviceName) // FNV-32 produces 8 hex characters assert.Len(t, id, 8, "ID should be 8 characters for systemId='%s', serviceName='%s'", tc.systemId, tc.serviceName) } }) t.Run("hexadecimal output", func(t *testing.T) { - id := makeStableHashId("test-system", "test-service") + id := MakeStableHashId("test-system", "test-service") assert.NotEmpty(t, id) // Should only contain hexadecimal characters diff --git a/internal/site/src/components/network-probes-table/network-probes-columns.tsx b/internal/site/src/components/network-probes-table/network-probes-columns.tsx index c6fa5a1a..9c3e52d3 100644 --- a/internal/site/src/components/network-probes-table/network-probes-columns.tsx +++ b/internal/site/src/components/network-probes-table/network-probes-columns.tsx @@ -110,9 +110,16 @@ export function getProbeColumns(longestName = 0, longestTarget = 0): ColumnDef- } + let color = "bg-green-500" + if (val > 200) { + color = "bg-yellow-500" + } + if (!val || val > 2000) { + color = "bg-red-500" + } return ( - 100 ? "bg-yellow-500" : "bg-green-500")} /> + {decimalString(val, val < 100 ? 2 : 1).toLocaleString()} ms ) @@ -128,9 +135,13 @@ export function getProbeColumns(longestName = 0, longestTarget = 0): ColumnDef- } + let color = "bg-green-500" + if (val > 0) { + color = val > 20 ? "bg-red-500" : "bg-yellow-500" + } return ( - 0 ? "bg-yellow-500" : "bg-green-500")} /> + {val}% ) diff --git a/internal/site/src/components/network-probes-table/network-probes-table.tsx b/internal/site/src/components/network-probes-table/network-probes-table.tsx index e2dcd93c..e334c7f0 100644 --- a/internal/site/src/components/network-probes-table/network-probes-table.tsx +++ b/internal/site/src/components/network-probes-table/network-probes-table.tsx @@ -25,7 +25,7 @@ import { cn, getVisualStringWidth, useBrowserStorage } from "@/lib/utils" import type { NetworkProbeRecord } from "@/types" import { AddProbeDialog } from "./probe-dialog" -const NETWORK_PROBE_FIELDS = "id,name,system,target,protocol,port,interval,enabled,updated" +const NETWORK_PROBE_FIELDS = "id,name,system,target,protocol,port,interval,latency,loss,enabled,updated" export default function NetworkProbesTableNew({ systemId }: { systemId?: string }) { const loadTime = Date.now() @@ -174,10 +174,6 @@ export default function NetworkProbesTableNew({ systemId }: { systemId?: string const rows = table.getRowModel().rows const visibleColumns = table.getVisibleLeafColumns() - if (!data.length && !globalFilter) { - return null - } - return ( diff --git a/internal/site/src/components/network-probes-table/probe-dialog.tsx b/internal/site/src/components/network-probes-table/probe-dialog.tsx index 05ca5f94..6b357bc1 100644 --- a/internal/site/src/components/network-probes-table/probe-dialog.tsx +++ b/internal/site/src/components/network-probes-table/probe-dialog.tsx @@ -47,6 +47,7 @@ export function AddProbeDialog({ systemId }: { systemId: string }) { protocol, port: protocol === "tcp" ? Number(port) : 0, interval: Number(probeInterval), + enabled: true, }) resetForm() setOpen(false)