mirror of
https://github.com/henrygd/beszel.git
synced 2026-04-21 12:11:49 +02:00
updates
This commit is contained in:
@@ -78,7 +78,7 @@ func setCollectionAuthSettings(app core.App) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := applyCollectionRules(app, []string{"containers", "container_stats", "system_stats", "systemd_services"}, collectionRules{
|
||||
if err := applyCollectionRules(app, []string{"containers", "container_stats", "system_stats", "systemd_services", "network_probe_stats"}, collectionRules{
|
||||
list: &systemScopedReadRule,
|
||||
}); err != nil {
|
||||
return err
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"log/slog"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
@@ -318,21 +319,77 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
collectionName := "network_probes"
|
||||
nowString := time.Now().UTC().Format(types.DefaultDateLayout)
|
||||
|
||||
// If realtime updates are active, we save via PocketBase records to trigger realtime events.
|
||||
// Otherwise we can do a more efficient direct update via SQL
|
||||
realtimeActive := utils.RealtimeActiveForCollection(app, collectionName, func(filterQuery string) bool {
|
||||
slog.Info("Checking realtime subscription filter for network probes", "filterQuery", filterQuery)
|
||||
return !strings.Contains(filterQuery, "system") || strings.Contains(filterQuery, systemId)
|
||||
})
|
||||
|
||||
var db dbx.Builder
|
||||
var nowString string
|
||||
var updateQuery *dbx.Query
|
||||
if !realtimeActive {
|
||||
db = app.DB()
|
||||
nowString = time.Now().UTC().Format(types.DefaultDateLayout)
|
||||
sql := fmt.Sprintf("UPDATE %s SET latency={:latency}, loss={:loss}, updated={:updated} WHERE id={:id}", collectionName)
|
||||
updateQuery = db.NewQuery(sql)
|
||||
}
|
||||
|
||||
// insert network probe stats records
|
||||
switch realtimeActive {
|
||||
case true:
|
||||
collection, _ := app.FindCachedCollectionByNameOrId("network_probe_stats")
|
||||
record := core.NewRecord(collection)
|
||||
record.Set("system", systemId)
|
||||
record.Set("stats", data)
|
||||
record.Set("type", "1m")
|
||||
err = app.SaveNoValidate(record)
|
||||
default:
|
||||
if dataJson, e := json.Marshal(data); e == nil {
|
||||
sql := "INSERT INTO network_probe_stats (system, stats, type, created) VALUES ({:system}, {:stats}, {:type}, {:created})"
|
||||
insertQuery := db.NewQuery(sql)
|
||||
_, err = insertQuery.Bind(dbx.Params{
|
||||
"system": systemId,
|
||||
"stats": dataJson,
|
||||
"type": "1m",
|
||||
"created": nowString,
|
||||
}).Execute()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
app.Logger().Error("Failed to update probe stats", "system", systemId, "err", err)
|
||||
}
|
||||
|
||||
// update network_probes records
|
||||
for key := range data {
|
||||
probe := data[key]
|
||||
id := MakeStableHashId(systemId, key)
|
||||
params := dbx.Params{
|
||||
"latency": probe[0],
|
||||
"loss": probe[3],
|
||||
"updated": nowString,
|
||||
switch realtimeActive {
|
||||
case true:
|
||||
var record *core.Record
|
||||
record, err = app.FindRecordById(collectionName, id)
|
||||
if err == nil {
|
||||
record.Set("latency", probe[0])
|
||||
record.Set("loss", probe[3])
|
||||
err = app.SaveNoValidate(record)
|
||||
}
|
||||
default:
|
||||
_, err = updateQuery.Bind(dbx.Params{
|
||||
"id": id,
|
||||
"latency": probe[0],
|
||||
"loss": probe[3],
|
||||
"updated": nowString,
|
||||
}).Execute()
|
||||
}
|
||||
_, err := app.DB().Update(collectionName, params, dbx.HashExp{"id": id}).Execute()
|
||||
if err != nil {
|
||||
app.Logger().Warn("Failed to update probe", "system", systemId, "probe", key, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
// Package utils provides utility functions for the hub.
|
||||
package utils
|
||||
|
||||
import "os"
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
)
|
||||
|
||||
// GetEnv retrieves an environment variable with a "BESZEL_HUB_" prefix, or falls back to the unprefixed key.
|
||||
func GetEnv(key string) (value string, exists bool) {
|
||||
@@ -10,3 +14,26 @@ func GetEnv(key string) (value string, exists bool) {
|
||||
}
|
||||
return os.LookupEnv(key)
|
||||
}
|
||||
|
||||
// realtimeActiveForCollection checks if there are active WebSocket subscriptions for the given collection.
|
||||
func RealtimeActiveForCollection(app core.App, collectionName string, validateFn func(filterQuery string) bool) bool {
|
||||
broker := app.SubscriptionsBroker()
|
||||
if broker.TotalClients() == 0 {
|
||||
return false
|
||||
}
|
||||
for _, client := range broker.Clients() {
|
||||
subs := client.Subscriptions(collectionName)
|
||||
if len(subs) > 0 {
|
||||
if validateFn == nil {
|
||||
return true
|
||||
}
|
||||
for k := range subs {
|
||||
filter := subs[k].Query["filter"]
|
||||
if validateFn(filter) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user