From f9feaf5343825707b1e951cd9de4803b0a67a5c8 Mon Sep 17 00:00:00 2001 From: xiaomiku01 Date: Sat, 11 Apr 2026 00:33:56 +0800 Subject: [PATCH] feat(hub): add network probe API, sync, result collection, and aggregation Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/hub/api.go | 5 + internal/hub/api_probes.go | 197 +++++++++++++++++++++++++ internal/hub/systems/system.go | 3 + internal/hub/systems/system_manager.go | 38 +++++ internal/hub/systems/system_probes.go | 59 ++++++++ internal/records/records.go | 72 ++++++++- 6 files changed, 371 insertions(+), 3 deletions(-) create mode 100644 internal/hub/api_probes.go create mode 100644 internal/hub/systems/system_probes.go diff --git a/internal/hub/api.go b/internal/hub/api.go index d2d009c0..f7a3d0a3 100644 --- a/internal/hub/api.go +++ b/internal/hub/api.go @@ -134,6 +134,11 @@ func (h *Hub) registerApiRoutes(se *core.ServeEvent) error { // get container info apiAuth.GET("/containers/info", h.getContainerInfo) } + // network probe routes + apiAuth.GET("/network-probes", h.listNetworkProbes) + apiAuth.POST("/network-probes", h.createNetworkProbe).BindFunc(excludeReadOnlyRole) + apiAuth.DELETE("/network-probes", h.deleteNetworkProbe).BindFunc(excludeReadOnlyRole) + apiAuth.GET("/network-probe-stats", h.getNetworkProbeStats) return nil } diff --git a/internal/hub/api_probes.go b/internal/hub/api_probes.go new file mode 100644 index 00000000..615daa41 --- /dev/null +++ b/internal/hub/api_probes.go @@ -0,0 +1,197 @@ +package hub + +import ( + "encoding/json" + "net/http" + + "github.com/pocketbase/dbx" + "github.com/pocketbase/pocketbase/core" +) + +// listNetworkProbes handles GET /api/beszel/network-probes +func (h *Hub) listNetworkProbes(e *core.RequestEvent) error { + systemID := e.Request.URL.Query().Get("system") + if systemID == "" { + return e.BadRequestError("system parameter required", nil) + } + system, err := h.sm.GetSystem(systemID) + if err != nil || !system.HasUser(e.App, e.Auth) { + return e.NotFoundError("", nil) + } + + records, err := e.App.FindRecordsByFilter( + "network_probes", + "system = {:system}", + "-created", + 0, 0, + dbx.Params{"system": systemID}, + ) + if err != nil { + return e.InternalServerError("", err) + } + + type probeRecord struct { + Id string `json:"id"` + Name string `json:"name"` + Target string `json:"target"` + Protocol string `json:"protocol"` + Port int `json:"port"` + Interval int `json:"interval"` + Enabled bool `json:"enabled"` + } + + result := make([]probeRecord, 0, len(records)) + for _, r := range records { + result = append(result, probeRecord{ + Id: r.Id, + Name: r.GetString("name"), + Target: r.GetString("target"), + Protocol: r.GetString("protocol"), + Port: r.GetInt("port"), + Interval: r.GetInt("interval"), + Enabled: r.GetBool("enabled"), + }) + } + + return e.JSON(http.StatusOK, result) +} + +// createNetworkProbe handles POST /api/beszel/network-probes +func (h *Hub) createNetworkProbe(e *core.RequestEvent) error { + var req struct { + System string `json:"system"` + Name string `json:"name"` + Target string `json:"target"` + Protocol string `json:"protocol"` + Port int `json:"port"` + Interval int `json:"interval"` + } + if err := json.NewDecoder(e.Request.Body).Decode(&req); err != nil { + return e.BadRequestError("invalid request body", err) + } + if req.System == "" || req.Target == "" || req.Protocol == "" { + return e.BadRequestError("system, target, and protocol are required", nil) + } + if req.Protocol != "icmp" && req.Protocol != "tcp" && req.Protocol != "http" { + return e.BadRequestError("protocol must be icmp, tcp, or http", nil) + } + if req.Interval <= 0 { + req.Interval = 10 + } + + system, err := h.sm.GetSystem(req.System) + if err != nil || !system.HasUser(e.App, e.Auth) { + return e.NotFoundError("", nil) + } + + collection, err := e.App.FindCachedCollectionByNameOrId("network_probes") + if err != nil { + return e.InternalServerError("", err) + } + + record := core.NewRecord(collection) + record.Set("system", req.System) + record.Set("name", req.Name) + record.Set("target", req.Target) + record.Set("protocol", req.Protocol) + record.Set("port", req.Port) + record.Set("interval", req.Interval) + record.Set("enabled", true) + + if err := e.App.Save(record); err != nil { + return e.InternalServerError("", err) + } + + // Sync probes to agent + h.syncProbesToAgent(req.System) + + return e.JSON(http.StatusOK, map[string]string{"id": record.Id}) +} + +// deleteNetworkProbe handles DELETE /api/beszel/network-probes +func (h *Hub) deleteNetworkProbe(e *core.RequestEvent) error { + probeID := e.Request.URL.Query().Get("id") + if probeID == "" { + return e.BadRequestError("id parameter required", nil) + } + + record, err := e.App.FindRecordById("network_probes", probeID) + if err != nil { + return e.NotFoundError("", nil) + } + + systemID := record.GetString("system") + system, err := h.sm.GetSystem(systemID) + if err != nil || !system.HasUser(e.App, e.Auth) { + return e.NotFoundError("", nil) + } + + if err := e.App.Delete(record); err != nil { + return e.InternalServerError("", err) + } + + // Sync probes to agent + h.syncProbesToAgent(systemID) + + return e.JSON(http.StatusOK, map[string]string{"status": "ok"}) +} + +// getNetworkProbeStats handles GET /api/beszel/network-probe-stats +func (h *Hub) getNetworkProbeStats(e *core.RequestEvent) error { + systemID := e.Request.URL.Query().Get("system") + statsType := e.Request.URL.Query().Get("type") + if systemID == "" { + return e.BadRequestError("system parameter required", nil) + } + if statsType == "" { + statsType = "1m" + } + + system, err := h.sm.GetSystem(systemID) + if err != nil || !system.HasUser(e.App, e.Auth) { + return e.NotFoundError("", nil) + } + + records, err := e.App.FindRecordsByFilter( + "network_probe_stats", + "system = {:system} && type = {:type}", + "created", + 0, 0, + dbx.Params{"system": systemID, "type": statsType}, + ) + if err != nil { + return e.InternalServerError("", err) + } + + type statsRecord struct { + Stats json.RawMessage `json:"stats"` + Created string `json:"created"` + } + + result := make([]statsRecord, 0, len(records)) + for _, r := range records { + statsJSON, _ := json.Marshal(r.Get("stats")) + result = append(result, statsRecord{ + Stats: statsJSON, + Created: r.GetDateTime("created").Time().UTC().Format("2006-01-02 15:04:05.000Z"), + }) + } + + return e.JSON(http.StatusOK, result) +} + +// syncProbesToAgent fetches enabled probes for a system and sends them to the agent. +func (h *Hub) syncProbesToAgent(systemID string) { + system, err := h.sm.GetSystem(systemID) + if err != nil { + return + } + + configs := h.sm.GetProbeConfigsForSystem(systemID) + + go func() { + if err := system.SyncNetworkProbes(configs); err != nil { + h.Logger().Warn("failed to sync probes to agent", "system", systemID, "err", err) + } + }() +} diff --git a/internal/hub/systems/system.go b/internal/hub/systems/system.go index 947ef257..df54e4cf 100644 --- a/internal/hub/systems/system.go +++ b/internal/hub/systems/system.go @@ -168,6 +168,9 @@ func (sys *System) update() error { } } + // Fetch and save network probe results + go sys.fetchAndSaveProbeResults() + return err } diff --git a/internal/hub/systems/system_manager.go b/internal/hub/systems/system_manager.go index 32112f2c..40d78b0e 100644 --- a/internal/hub/systems/system_manager.go +++ b/internal/hub/systems/system_manager.go @@ -7,6 +7,7 @@ import ( "github.com/henrygd/beszel/internal/hub/ws" + "github.com/henrygd/beszel/internal/entities/probe" "github.com/henrygd/beszel/internal/entities/system" "github.com/henrygd/beszel/internal/hub/expirymap" @@ -15,6 +16,7 @@ import ( "github.com/henrygd/beszel" "github.com/blang/semver" + "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/tools/store" "golang.org/x/crypto/ssh" @@ -317,6 +319,17 @@ func (sm *SystemManager) AddWebSocketSystem(systemId string, agentVersion semver if err := sm.AddRecord(systemRecord, system); err != nil { return err } + + // Sync network probes to the newly connected agent + go func() { + configs := sm.GetProbeConfigsForSystem(systemId) + if len(configs) > 0 { + if err := system.SyncNetworkProbes(configs); err != nil { + sm.hub.Logger().Warn("failed to sync probes on connect", "system", systemId, "err", err) + } + } + }() + return nil } @@ -329,6 +342,31 @@ func (sm *SystemManager) resetFailedSmartFetchState(systemID string) { } } +// GetProbeConfigsForSystem returns all enabled probe configs for a system. +func (sm *SystemManager) GetProbeConfigsForSystem(systemID string) []probe.Config { + records, err := sm.hub.FindRecordsByFilter( + "network_probes", + "system = {:system} && enabled = true", + "", + 0, 0, + dbx.Params{"system": systemID}, + ) + if err != nil || len(records) == 0 { + return nil + } + + configs := make([]probe.Config, 0, len(records)) + for _, r := range records { + configs = append(configs, probe.Config{ + Target: r.GetString("target"), + Protocol: r.GetString("protocol"), + Port: uint16(r.GetInt("port")), + Interval: uint16(r.GetInt("interval")), + }) + } + return configs +} + // createSSHClientConfig initializes the SSH client configuration for connecting to an agent's server func (sm *SystemManager) createSSHClientConfig() error { privateKey, err := sm.hub.GetSSHKey("") diff --git a/internal/hub/systems/system_probes.go b/internal/hub/systems/system_probes.go new file mode 100644 index 00000000..4a71a458 --- /dev/null +++ b/internal/hub/systems/system_probes.go @@ -0,0 +1,59 @@ +package systems + +import ( + "context" + "time" + + "github.com/henrygd/beszel/internal/common" + "github.com/henrygd/beszel/internal/entities/probe" + "github.com/pocketbase/dbx" + "github.com/pocketbase/pocketbase/core" +) + +// SyncNetworkProbes sends probe configurations to the agent. +func (sys *System) SyncNetworkProbes(configs []probe.Config) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var result string + return sys.request(ctx, common.SyncNetworkProbes, configs, &result) +} + +// FetchNetworkProbeResults fetches probe results from the agent. +func (sys *System) FetchNetworkProbeResults() (map[string]probe.Result, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var results map[string]probe.Result + err := sys.request(ctx, common.GetNetworkProbeResults, nil, &results) + return results, err +} + +// fetchAndSaveProbeResults fetches probe results and saves them to the database. +func (sys *System) fetchAndSaveProbeResults() { + hub := sys.manager.hub + + // Check if this system has any probes + count, err := hub.CountRecords("network_probes", + dbx.NewExp("system = {:system} AND enabled = true", dbx.Params{"system": sys.Id})) + if err != nil || count == 0 { + return + } + + results, err := sys.FetchNetworkProbeResults() + if err != nil || len(results) == 0 { + return + } + + collection, err := hub.FindCachedCollectionByNameOrId("network_probe_stats") + if err != nil { + return + } + + record := core.NewRecord(collection) + record.Set("system", sys.Id) + record.Set("stats", results) + record.Set("type", "1m") + + if err := hub.SaveNoValidate(record); err != nil { + hub.Logger().Warn("failed to save probe stats", "system", sys.Id, "err", err) + } +} diff --git a/internal/records/records.go b/internal/records/records.go index 74e8e519..60f267b5 100644 --- a/internal/records/records.go +++ b/internal/records/records.go @@ -82,7 +82,7 @@ func (rm *RecordManager) CreateLongerRecords() { // wrap the operations in a transaction rm.app.RunInTransaction(func(txApp core.App) error { var err error - collections := [2]*core.Collection{} + collections := [3]*core.Collection{} collections[0], err = txApp.FindCachedCollectionByNameOrId("system_stats") if err != nil { return err @@ -91,6 +91,10 @@ func (rm *RecordManager) CreateLongerRecords() { if err != nil { return err } + collections[2], err = txApp.FindCachedCollectionByNameOrId("network_probe_stats") + if err != nil { + return err + } var systems RecordIds db := txApp.DB() @@ -150,8 +154,9 @@ func (rm *RecordManager) CreateLongerRecords() { case "system_stats": longerRecord.Set("stats", rm.AverageSystemStats(db, recordIds)) case "container_stats": - longerRecord.Set("stats", rm.AverageContainerStats(db, recordIds)) + case "network_probe_stats": + longerRecord.Set("stats", rm.AverageProbeStats(db, recordIds)) } if err := txApp.SaveNoValidate(longerRecord); err != nil { log.Println("failed to save longer record", "err", err) @@ -504,6 +509,67 @@ func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds return result } +// AverageProbeStats averages probe stats across multiple records. +// For each probe key: avg of avgs, min of mins, max of maxes, avg of losses. +func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) map[string]map[string]float64 { + type probeValues struct { + avgSum float64 + minVal float64 + maxVal float64 + lossSum float64 + count float64 + } + + sums := make(map[string]*probeValues) + var rawStats map[string]map[string]float64 + + for _, record := range records { + statsRecord.Stats = statsRecord.Stats[:0] + rawStats = nil + + queryParams["id"] = record.Id + db.NewQuery("SELECT stats FROM network_probe_stats WHERE id = {:id}").Bind(queryParams).One(&statsRecord) + if err := json.Unmarshal(statsRecord.Stats, &rawStats); err != nil { + continue + } + + for key, vals := range rawStats { + s, ok := sums[key] + if !ok { + s = &probeValues{minVal: math.MaxFloat64} + sums[key] = s + } + s.avgSum += vals["avg"] + if vals["min"] < s.minVal { + s.minVal = vals["min"] + } + if vals["max"] > s.maxVal { + s.maxVal = vals["max"] + } + s.lossSum += vals["loss"] + s.count++ + } + } + + result := make(map[string]map[string]float64, len(sums)) + for key, s := range sums { + if s.count == 0 { + continue + } + minVal := s.minVal + if minVal == math.MaxFloat64 { + minVal = 0 + } + result[key] = map[string]float64{ + "avg": twoDecimals(s.avgSum / s.count), + "min": twoDecimals(minVal), + "max": twoDecimals(s.maxVal), + "loss": twoDecimals(s.lossSum / s.count), + } + } + return result +} + // Delete old records func (rm *RecordManager) DeleteOldRecords() { rm.app.RunInTransaction(func(txApp core.App) error { @@ -553,7 +619,7 @@ func deleteOldAlertsHistory(app core.App, countToKeep, countBeforeDeletion int) // Deletes system_stats records older than what is displayed in the UI func deleteOldSystemStats(app core.App) error { // Collections to process - collections := [2]string{"system_stats", "container_stats"} + collections := [3]string{"system_stats", "container_stats", "network_probe_stats"} // Record types and their retention periods type RecordDeletionData struct {