mirror of
https://github.com/henrygd/beszel.git
synced 2026-04-21 04:01:50 +02:00
feat(hub): add network probe API, sync, result collection, and aggregation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -134,6 +134,11 @@ func (h *Hub) registerApiRoutes(se *core.ServeEvent) error {
|
|||||||
// get container info
|
// get container info
|
||||||
apiAuth.GET("/containers/info", h.getContainerInfo)
|
apiAuth.GET("/containers/info", h.getContainerInfo)
|
||||||
}
|
}
|
||||||
|
// network probe routes
|
||||||
|
apiAuth.GET("/network-probes", h.listNetworkProbes)
|
||||||
|
apiAuth.POST("/network-probes", h.createNetworkProbe).BindFunc(excludeReadOnlyRole)
|
||||||
|
apiAuth.DELETE("/network-probes", h.deleteNetworkProbe).BindFunc(excludeReadOnlyRole)
|
||||||
|
apiAuth.GET("/network-probe-stats", h.getNetworkProbeStats)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
197
internal/hub/api_probes.go
Normal file
197
internal/hub/api_probes.go
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
package hub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/pocketbase/dbx"
|
||||||
|
"github.com/pocketbase/pocketbase/core"
|
||||||
|
)
|
||||||
|
|
||||||
|
// listNetworkProbes handles GET /api/beszel/network-probes
|
||||||
|
func (h *Hub) listNetworkProbes(e *core.RequestEvent) error {
|
||||||
|
systemID := e.Request.URL.Query().Get("system")
|
||||||
|
if systemID == "" {
|
||||||
|
return e.BadRequestError("system parameter required", nil)
|
||||||
|
}
|
||||||
|
system, err := h.sm.GetSystem(systemID)
|
||||||
|
if err != nil || !system.HasUser(e.App, e.Auth) {
|
||||||
|
return e.NotFoundError("", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
records, err := e.App.FindRecordsByFilter(
|
||||||
|
"network_probes",
|
||||||
|
"system = {:system}",
|
||||||
|
"-created",
|
||||||
|
0, 0,
|
||||||
|
dbx.Params{"system": systemID},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return e.InternalServerError("", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type probeRecord struct {
|
||||||
|
Id string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Target string `json:"target"`
|
||||||
|
Protocol string `json:"protocol"`
|
||||||
|
Port int `json:"port"`
|
||||||
|
Interval int `json:"interval"`
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]probeRecord, 0, len(records))
|
||||||
|
for _, r := range records {
|
||||||
|
result = append(result, probeRecord{
|
||||||
|
Id: r.Id,
|
||||||
|
Name: r.GetString("name"),
|
||||||
|
Target: r.GetString("target"),
|
||||||
|
Protocol: r.GetString("protocol"),
|
||||||
|
Port: r.GetInt("port"),
|
||||||
|
Interval: r.GetInt("interval"),
|
||||||
|
Enabled: r.GetBool("enabled"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.JSON(http.StatusOK, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// createNetworkProbe handles POST /api/beszel/network-probes
|
||||||
|
func (h *Hub) createNetworkProbe(e *core.RequestEvent) error {
|
||||||
|
var req struct {
|
||||||
|
System string `json:"system"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Target string `json:"target"`
|
||||||
|
Protocol string `json:"protocol"`
|
||||||
|
Port int `json:"port"`
|
||||||
|
Interval int `json:"interval"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(e.Request.Body).Decode(&req); err != nil {
|
||||||
|
return e.BadRequestError("invalid request body", err)
|
||||||
|
}
|
||||||
|
if req.System == "" || req.Target == "" || req.Protocol == "" {
|
||||||
|
return e.BadRequestError("system, target, and protocol are required", nil)
|
||||||
|
}
|
||||||
|
if req.Protocol != "icmp" && req.Protocol != "tcp" && req.Protocol != "http" {
|
||||||
|
return e.BadRequestError("protocol must be icmp, tcp, or http", nil)
|
||||||
|
}
|
||||||
|
if req.Interval <= 0 {
|
||||||
|
req.Interval = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
system, err := h.sm.GetSystem(req.System)
|
||||||
|
if err != nil || !system.HasUser(e.App, e.Auth) {
|
||||||
|
return e.NotFoundError("", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
collection, err := e.App.FindCachedCollectionByNameOrId("network_probes")
|
||||||
|
if err != nil {
|
||||||
|
return e.InternalServerError("", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
record := core.NewRecord(collection)
|
||||||
|
record.Set("system", req.System)
|
||||||
|
record.Set("name", req.Name)
|
||||||
|
record.Set("target", req.Target)
|
||||||
|
record.Set("protocol", req.Protocol)
|
||||||
|
record.Set("port", req.Port)
|
||||||
|
record.Set("interval", req.Interval)
|
||||||
|
record.Set("enabled", true)
|
||||||
|
|
||||||
|
if err := e.App.Save(record); err != nil {
|
||||||
|
return e.InternalServerError("", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync probes to agent
|
||||||
|
h.syncProbesToAgent(req.System)
|
||||||
|
|
||||||
|
return e.JSON(http.StatusOK, map[string]string{"id": record.Id})
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteNetworkProbe handles DELETE /api/beszel/network-probes
|
||||||
|
func (h *Hub) deleteNetworkProbe(e *core.RequestEvent) error {
|
||||||
|
probeID := e.Request.URL.Query().Get("id")
|
||||||
|
if probeID == "" {
|
||||||
|
return e.BadRequestError("id parameter required", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
record, err := e.App.FindRecordById("network_probes", probeID)
|
||||||
|
if err != nil {
|
||||||
|
return e.NotFoundError("", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
systemID := record.GetString("system")
|
||||||
|
system, err := h.sm.GetSystem(systemID)
|
||||||
|
if err != nil || !system.HasUser(e.App, e.Auth) {
|
||||||
|
return e.NotFoundError("", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.App.Delete(record); err != nil {
|
||||||
|
return e.InternalServerError("", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync probes to agent
|
||||||
|
h.syncProbesToAgent(systemID)
|
||||||
|
|
||||||
|
return e.JSON(http.StatusOK, map[string]string{"status": "ok"})
|
||||||
|
}
|
||||||
|
|
||||||
|
// getNetworkProbeStats handles GET /api/beszel/network-probe-stats
|
||||||
|
func (h *Hub) getNetworkProbeStats(e *core.RequestEvent) error {
|
||||||
|
systemID := e.Request.URL.Query().Get("system")
|
||||||
|
statsType := e.Request.URL.Query().Get("type")
|
||||||
|
if systemID == "" {
|
||||||
|
return e.BadRequestError("system parameter required", nil)
|
||||||
|
}
|
||||||
|
if statsType == "" {
|
||||||
|
statsType = "1m"
|
||||||
|
}
|
||||||
|
|
||||||
|
system, err := h.sm.GetSystem(systemID)
|
||||||
|
if err != nil || !system.HasUser(e.App, e.Auth) {
|
||||||
|
return e.NotFoundError("", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
records, err := e.App.FindRecordsByFilter(
|
||||||
|
"network_probe_stats",
|
||||||
|
"system = {:system} && type = {:type}",
|
||||||
|
"created",
|
||||||
|
0, 0,
|
||||||
|
dbx.Params{"system": systemID, "type": statsType},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return e.InternalServerError("", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsRecord struct {
|
||||||
|
Stats json.RawMessage `json:"stats"`
|
||||||
|
Created string `json:"created"`
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]statsRecord, 0, len(records))
|
||||||
|
for _, r := range records {
|
||||||
|
statsJSON, _ := json.Marshal(r.Get("stats"))
|
||||||
|
result = append(result, statsRecord{
|
||||||
|
Stats: statsJSON,
|
||||||
|
Created: r.GetDateTime("created").Time().UTC().Format("2006-01-02 15:04:05.000Z"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.JSON(http.StatusOK, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncProbesToAgent fetches enabled probes for a system and sends them to the agent.
|
||||||
|
func (h *Hub) syncProbesToAgent(systemID string) {
|
||||||
|
system, err := h.sm.GetSystem(systemID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
configs := h.sm.GetProbeConfigsForSystem(systemID)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := system.SyncNetworkProbes(configs); err != nil {
|
||||||
|
h.Logger().Warn("failed to sync probes to agent", "system", systemID, "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
@@ -168,6 +168,9 @@ func (sys *System) update() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fetch and save network probe results
|
||||||
|
go sys.fetchAndSaveProbeResults()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/henrygd/beszel/internal/hub/ws"
|
"github.com/henrygd/beszel/internal/hub/ws"
|
||||||
|
|
||||||
|
"github.com/henrygd/beszel/internal/entities/probe"
|
||||||
"github.com/henrygd/beszel/internal/entities/system"
|
"github.com/henrygd/beszel/internal/entities/system"
|
||||||
"github.com/henrygd/beszel/internal/hub/expirymap"
|
"github.com/henrygd/beszel/internal/hub/expirymap"
|
||||||
|
|
||||||
@@ -15,6 +16,7 @@ import (
|
|||||||
"github.com/henrygd/beszel"
|
"github.com/henrygd/beszel"
|
||||||
|
|
||||||
"github.com/blang/semver"
|
"github.com/blang/semver"
|
||||||
|
"github.com/pocketbase/dbx"
|
||||||
"github.com/pocketbase/pocketbase/core"
|
"github.com/pocketbase/pocketbase/core"
|
||||||
"github.com/pocketbase/pocketbase/tools/store"
|
"github.com/pocketbase/pocketbase/tools/store"
|
||||||
"golang.org/x/crypto/ssh"
|
"golang.org/x/crypto/ssh"
|
||||||
@@ -317,6 +319,17 @@ func (sm *SystemManager) AddWebSocketSystem(systemId string, agentVersion semver
|
|||||||
if err := sm.AddRecord(systemRecord, system); err != nil {
|
if err := sm.AddRecord(systemRecord, system); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync network probes to the newly connected agent
|
||||||
|
go func() {
|
||||||
|
configs := sm.GetProbeConfigsForSystem(systemId)
|
||||||
|
if len(configs) > 0 {
|
||||||
|
if err := system.SyncNetworkProbes(configs); err != nil {
|
||||||
|
sm.hub.Logger().Warn("failed to sync probes on connect", "system", systemId, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -329,6 +342,31 @@ func (sm *SystemManager) resetFailedSmartFetchState(systemID string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetProbeConfigsForSystem returns all enabled probe configs for a system.
|
||||||
|
func (sm *SystemManager) GetProbeConfigsForSystem(systemID string) []probe.Config {
|
||||||
|
records, err := sm.hub.FindRecordsByFilter(
|
||||||
|
"network_probes",
|
||||||
|
"system = {:system} && enabled = true",
|
||||||
|
"",
|
||||||
|
0, 0,
|
||||||
|
dbx.Params{"system": systemID},
|
||||||
|
)
|
||||||
|
if err != nil || len(records) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
configs := make([]probe.Config, 0, len(records))
|
||||||
|
for _, r := range records {
|
||||||
|
configs = append(configs, probe.Config{
|
||||||
|
Target: r.GetString("target"),
|
||||||
|
Protocol: r.GetString("protocol"),
|
||||||
|
Port: uint16(r.GetInt("port")),
|
||||||
|
Interval: uint16(r.GetInt("interval")),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return configs
|
||||||
|
}
|
||||||
|
|
||||||
// createSSHClientConfig initializes the SSH client configuration for connecting to an agent's server
|
// createSSHClientConfig initializes the SSH client configuration for connecting to an agent's server
|
||||||
func (sm *SystemManager) createSSHClientConfig() error {
|
func (sm *SystemManager) createSSHClientConfig() error {
|
||||||
privateKey, err := sm.hub.GetSSHKey("")
|
privateKey, err := sm.hub.GetSSHKey("")
|
||||||
|
|||||||
59
internal/hub/systems/system_probes.go
Normal file
59
internal/hub/systems/system_probes.go
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
package systems
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/henrygd/beszel/internal/common"
|
||||||
|
"github.com/henrygd/beszel/internal/entities/probe"
|
||||||
|
"github.com/pocketbase/dbx"
|
||||||
|
"github.com/pocketbase/pocketbase/core"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SyncNetworkProbes sends probe configurations to the agent.
|
||||||
|
func (sys *System) SyncNetworkProbes(configs []probe.Config) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
var result string
|
||||||
|
return sys.request(ctx, common.SyncNetworkProbes, configs, &result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchNetworkProbeResults fetches probe results from the agent.
|
||||||
|
func (sys *System) FetchNetworkProbeResults() (map[string]probe.Result, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
var results map[string]probe.Result
|
||||||
|
err := sys.request(ctx, common.GetNetworkProbeResults, nil, &results)
|
||||||
|
return results, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchAndSaveProbeResults fetches probe results and saves them to the database.
|
||||||
|
func (sys *System) fetchAndSaveProbeResults() {
|
||||||
|
hub := sys.manager.hub
|
||||||
|
|
||||||
|
// Check if this system has any probes
|
||||||
|
count, err := hub.CountRecords("network_probes",
|
||||||
|
dbx.NewExp("system = {:system} AND enabled = true", dbx.Params{"system": sys.Id}))
|
||||||
|
if err != nil || count == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
results, err := sys.FetchNetworkProbeResults()
|
||||||
|
if err != nil || len(results) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
collection, err := hub.FindCachedCollectionByNameOrId("network_probe_stats")
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
record := core.NewRecord(collection)
|
||||||
|
record.Set("system", sys.Id)
|
||||||
|
record.Set("stats", results)
|
||||||
|
record.Set("type", "1m")
|
||||||
|
|
||||||
|
if err := hub.SaveNoValidate(record); err != nil {
|
||||||
|
hub.Logger().Warn("failed to save probe stats", "system", sys.Id, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -82,7 +82,7 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
// wrap the operations in a transaction
|
// wrap the operations in a transaction
|
||||||
rm.app.RunInTransaction(func(txApp core.App) error {
|
rm.app.RunInTransaction(func(txApp core.App) error {
|
||||||
var err error
|
var err error
|
||||||
collections := [2]*core.Collection{}
|
collections := [3]*core.Collection{}
|
||||||
collections[0], err = txApp.FindCachedCollectionByNameOrId("system_stats")
|
collections[0], err = txApp.FindCachedCollectionByNameOrId("system_stats")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -91,6 +91,10 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
collections[2], err = txApp.FindCachedCollectionByNameOrId("network_probe_stats")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
var systems RecordIds
|
var systems RecordIds
|
||||||
db := txApp.DB()
|
db := txApp.DB()
|
||||||
|
|
||||||
@@ -150,8 +154,9 @@ func (rm *RecordManager) CreateLongerRecords() {
|
|||||||
case "system_stats":
|
case "system_stats":
|
||||||
longerRecord.Set("stats", rm.AverageSystemStats(db, recordIds))
|
longerRecord.Set("stats", rm.AverageSystemStats(db, recordIds))
|
||||||
case "container_stats":
|
case "container_stats":
|
||||||
|
|
||||||
longerRecord.Set("stats", rm.AverageContainerStats(db, recordIds))
|
longerRecord.Set("stats", rm.AverageContainerStats(db, recordIds))
|
||||||
|
case "network_probe_stats":
|
||||||
|
longerRecord.Set("stats", rm.AverageProbeStats(db, recordIds))
|
||||||
}
|
}
|
||||||
if err := txApp.SaveNoValidate(longerRecord); err != nil {
|
if err := txApp.SaveNoValidate(longerRecord); err != nil {
|
||||||
log.Println("failed to save longer record", "err", err)
|
log.Println("failed to save longer record", "err", err)
|
||||||
@@ -504,6 +509,67 @@ func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AverageProbeStats averages probe stats across multiple records.
|
||||||
|
// For each probe key: avg of avgs, min of mins, max of maxes, avg of losses.
|
||||||
|
func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) map[string]map[string]float64 {
|
||||||
|
type probeValues struct {
|
||||||
|
avgSum float64
|
||||||
|
minVal float64
|
||||||
|
maxVal float64
|
||||||
|
lossSum float64
|
||||||
|
count float64
|
||||||
|
}
|
||||||
|
|
||||||
|
sums := make(map[string]*probeValues)
|
||||||
|
var rawStats map[string]map[string]float64
|
||||||
|
|
||||||
|
for _, record := range records {
|
||||||
|
statsRecord.Stats = statsRecord.Stats[:0]
|
||||||
|
rawStats = nil
|
||||||
|
|
||||||
|
queryParams["id"] = record.Id
|
||||||
|
db.NewQuery("SELECT stats FROM network_probe_stats WHERE id = {:id}").Bind(queryParams).One(&statsRecord)
|
||||||
|
if err := json.Unmarshal(statsRecord.Stats, &rawStats); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, vals := range rawStats {
|
||||||
|
s, ok := sums[key]
|
||||||
|
if !ok {
|
||||||
|
s = &probeValues{minVal: math.MaxFloat64}
|
||||||
|
sums[key] = s
|
||||||
|
}
|
||||||
|
s.avgSum += vals["avg"]
|
||||||
|
if vals["min"] < s.minVal {
|
||||||
|
s.minVal = vals["min"]
|
||||||
|
}
|
||||||
|
if vals["max"] > s.maxVal {
|
||||||
|
s.maxVal = vals["max"]
|
||||||
|
}
|
||||||
|
s.lossSum += vals["loss"]
|
||||||
|
s.count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(map[string]map[string]float64, len(sums))
|
||||||
|
for key, s := range sums {
|
||||||
|
if s.count == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
minVal := s.minVal
|
||||||
|
if minVal == math.MaxFloat64 {
|
||||||
|
minVal = 0
|
||||||
|
}
|
||||||
|
result[key] = map[string]float64{
|
||||||
|
"avg": twoDecimals(s.avgSum / s.count),
|
||||||
|
"min": twoDecimals(minVal),
|
||||||
|
"max": twoDecimals(s.maxVal),
|
||||||
|
"loss": twoDecimals(s.lossSum / s.count),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// Delete old records
|
// Delete old records
|
||||||
func (rm *RecordManager) DeleteOldRecords() {
|
func (rm *RecordManager) DeleteOldRecords() {
|
||||||
rm.app.RunInTransaction(func(txApp core.App) error {
|
rm.app.RunInTransaction(func(txApp core.App) error {
|
||||||
@@ -553,7 +619,7 @@ func deleteOldAlertsHistory(app core.App, countToKeep, countBeforeDeletion int)
|
|||||||
// Deletes system_stats records older than what is displayed in the UI
|
// Deletes system_stats records older than what is displayed in the UI
|
||||||
func deleteOldSystemStats(app core.App) error {
|
func deleteOldSystemStats(app core.App) error {
|
||||||
// Collections to process
|
// Collections to process
|
||||||
collections := [2]string{"system_stats", "container_stats"}
|
collections := [3]string{"system_stats", "container_stats", "network_probe_stats"}
|
||||||
|
|
||||||
// Record types and their retention periods
|
// Record types and their retention periods
|
||||||
type RecordDeletionData struct {
|
type RecordDeletionData struct {
|
||||||
|
|||||||
Reference in New Issue
Block a user