This commit is contained in:
henrygd
2026-04-19 21:44:21 -04:00
parent ea19ef6334
commit e71ffd4d2a
15 changed files with 136 additions and 113 deletions

View File

@@ -18,6 +18,7 @@ import (
"github.com/henrygd/beszel/internal/hub/ws"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/smart"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/entities/systemd"
@@ -29,6 +30,7 @@ import (
"github.com/lxzan/gws"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/types"
"golang.org/x/crypto/ssh"
)
@@ -167,11 +169,6 @@ func (sys *System) update() error {
}
}
// Fetch and save network probe results
if sys.hasEnabledProbes() {
go sys.fetchAndSaveProbeResults()
}
return err
}
@@ -243,6 +240,12 @@ func (sys *System) createRecords(data *system.CombinedData) (*core.Record, error
}
}
if data.Probes != nil {
if err := updateNetworkProbesRecords(txApp, data.Probes, sys.Id); err != nil {
return err
}
}
// update system record (do this last because it triggers alerts and we need above records to be inserted first)
systemRecord.Set("status", up)
systemRecord.Set("info", data.Info)
@@ -294,7 +297,7 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s
for i, service := range data {
suffix := fmt.Sprintf("%d", i)
valueStrings = append(valueStrings, fmt.Sprintf("({:id%[1]s}, {:system}, {:name%[1]s}, {:state%[1]s}, {:sub%[1]s}, {:cpu%[1]s}, {:cpuPeak%[1]s}, {:memory%[1]s}, {:memPeak%[1]s}, {:updated})", suffix))
params["id"+suffix] = makeStableHashId(systemId, service.Name)
params["id"+suffix] = MakeStableHashId(systemId, service.Name)
params["name"+suffix] = service.Name
params["state"+suffix] = service.State
params["sub"+suffix] = service.Sub
@@ -311,6 +314,28 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s
return err
}
func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, systemId string) error {
if len(data) == 0 {
return nil
}
collectionName := "network_probes"
for key := range data {
probe := data[key]
id := MakeStableHashId(systemId, key)
params := dbx.Params{
// "system": systemId,
"latency": probe[0],
"loss": probe[3],
"updated": time.Now().UTC().Format(types.DefaultDateLayout),
}
_, err := app.DB().Update(collectionName, params, dbx.HashExp{"id": id}).Execute()
if err != nil {
app.Logger().Warn("Failed to update network probe record", "system", systemId, "probe", key, "err", err)
}
}
return nil
}
// createContainerRecords creates container records
func createContainerRecords(app core.App, data []*container.Stats, systemId string) error {
if len(data) == 0 {
@@ -545,7 +570,7 @@ func (sys *System) FetchSmartDataFromAgent() (map[string]smart.SmartData, error)
return result, err
}
func makeStableHashId(strings ...string) string {
func MakeStableHashId(strings ...string) string {
hash := fnv.New32a()
for _, str := range strings {
hash.Write([]byte(str))

View File

@@ -6,8 +6,6 @@ import (
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
)
// SyncNetworkProbes sends probe configurations to the agent.
@@ -19,41 +17,41 @@ func (sys *System) SyncNetworkProbes(configs []probe.Config) error {
}
// FetchNetworkProbeResults fetches probe results from the agent.
func (sys *System) FetchNetworkProbeResults() (map[string]probe.Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var results map[string]probe.Result
err := sys.request(ctx, common.GetNetworkProbeResults, nil, &results)
return results, err
}
// func (sys *System) FetchNetworkProbeResults() (map[string]probe.Result, error) {
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// defer cancel()
// var results map[string]probe.Result
// err := sys.request(ctx, common.GetNetworkProbeResults, nil, &results)
// return results, err
// }
// hasEnabledProbes returns true if this system has any enabled network probes.
func (sys *System) hasEnabledProbes() bool {
count, err := sys.manager.hub.CountRecords("network_probes",
dbx.NewExp("system = {:system} AND enabled = true", dbx.Params{"system": sys.Id}))
return err == nil && count > 0
}
// func (sys *System) hasEnabledProbes() bool {
// count, err := sys.manager.hub.CountRecords("network_probes",
// dbx.NewExp("system = {:system} AND enabled = true", dbx.Params{"system": sys.Id}))
// return err == nil && count > 0
// }
// fetchAndSaveProbeResults fetches probe results and saves them to the database.
func (sys *System) fetchAndSaveProbeResults() {
hub := sys.manager.hub
// func (sys *System) fetchAndSaveProbeResults() {
// hub := sys.manager.hub
results, err := sys.FetchNetworkProbeResults()
if err != nil || len(results) == 0 {
return
}
// results, err := sys.FetchNetworkProbeResults()
// if err != nil || len(results) == 0 {
// return
// }
collection, err := hub.FindCachedCollectionByNameOrId("network_probe_stats")
if err != nil {
return
}
// collection, err := hub.FindCachedCollectionByNameOrId("network_probe_stats")
// if err != nil {
// return
// }
record := core.NewRecord(collection)
record.Set("system", sys.Id)
record.Set("stats", results)
record.Set("type", "1m")
// record := core.NewRecord(collection)
// record.Set("system", sys.Id)
// record.Set("stats", results)
// record.Set("type", "1m")
if err := hub.SaveNoValidate(record); err != nil {
hub.Logger().Warn("failed to save probe stats", "system", sys.Id, "err", err)
}
}
// if err := hub.SaveNoValidate(record); err != nil {
// hub.Logger().Warn("failed to save probe stats", "system", sys.Id, "err", err)
// }
// }

View File

@@ -7,21 +7,10 @@ import (
"time"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/subscriptions"
)
// realtimePayload wraps system data with optional network probe results for realtime broadcast.
type realtimePayload struct {
Stats system.Stats `json:"stats"`
Info system.Info `json:"info"`
Containers []*container.Stats `json:"container"`
Probes map[string]probe.Result `json:"probes,omitempty"`
}
type subscriptionInfo struct {
subscription string
connectedClients uint8
@@ -153,27 +142,16 @@ func (sm *SystemManager) startRealtimeWorker() {
// fetchRealtimeDataAndNotify fetches realtime data for all active subscriptions and notifies the clients.
func (sm *SystemManager) fetchRealtimeDataAndNotify() {
for systemId, info := range activeSubscriptions {
sys, err := sm.GetSystem(systemId)
system, err := sm.GetSystem(systemId)
if err != nil {
continue
}
go func() {
data, err := sys.fetchDataFromAgent(common.DataRequestOptions{CacheTimeMs: 1000})
data, err := system.fetchDataFromAgent(common.DataRequestOptions{CacheTimeMs: 1000})
if err != nil {
return
}
payload := realtimePayload{
Stats: data.Stats,
Info: data.Info,
Containers: data.Containers,
}
// Fetch network probe results (lightweight in-memory read on agent)
if sys.hasEnabledProbes() {
if probes, err := sys.FetchNetworkProbeResults(); err == nil && len(probes) > 0 {
payload.Probes = probes
}
}
bytes, err := json.Marshal(payload)
bytes, err := json.Marshal(data)
if err == nil {
notify(sm.hub, info.subscription, bytes)
}

View File

@@ -84,7 +84,7 @@ func (sys *System) saveSmartDevices(smartData map[string]smart.SmartData) error
func (sys *System) upsertSmartDeviceRecord(collection *core.Collection, deviceKey string, device smart.SmartData) error {
hub := sys.manager.hub
recordID := makeStableHashId(sys.Id, deviceKey)
recordID := MakeStableHashId(sys.Id, deviceKey)
record, err := hub.FindRecordById(collection, recordID)
if err != nil {

View File

@@ -14,9 +14,9 @@ func TestGetSystemdServiceId(t *testing.T) {
serviceName := "nginx.service"
// Call multiple times and ensure same result
id1 := makeStableHashId(systemId, serviceName)
id2 := makeStableHashId(systemId, serviceName)
id3 := makeStableHashId(systemId, serviceName)
id1 := MakeStableHashId(systemId, serviceName)
id2 := MakeStableHashId(systemId, serviceName)
id3 := MakeStableHashId(systemId, serviceName)
assert.Equal(t, id1, id2)
assert.Equal(t, id2, id3)
@@ -29,10 +29,10 @@ func TestGetSystemdServiceId(t *testing.T) {
serviceName1 := "nginx.service"
serviceName2 := "apache.service"
id1 := makeStableHashId(systemId1, serviceName1)
id2 := makeStableHashId(systemId2, serviceName1)
id3 := makeStableHashId(systemId1, serviceName2)
id4 := makeStableHashId(systemId2, serviceName2)
id1 := MakeStableHashId(systemId1, serviceName1)
id2 := MakeStableHashId(systemId2, serviceName1)
id3 := MakeStableHashId(systemId1, serviceName2)
id4 := MakeStableHashId(systemId2, serviceName2)
// All IDs should be different
assert.NotEqual(t, id1, id2)
@@ -56,14 +56,14 @@ func TestGetSystemdServiceId(t *testing.T) {
}
for _, tc := range testCases {
id := makeStableHashId(tc.systemId, tc.serviceName)
id := MakeStableHashId(tc.systemId, tc.serviceName)
// FNV-32 produces 8 hex characters
assert.Len(t, id, 8, "ID should be 8 characters for systemId='%s', serviceName='%s'", tc.systemId, tc.serviceName)
}
})
t.Run("hexadecimal output", func(t *testing.T) {
id := makeStableHashId("test-system", "test-service")
id := MakeStableHashId("test-system", "test-service")
assert.NotEmpty(t, id)
// Should only contain hexadecimal characters