Compare commits

...

9 Commits

Author SHA1 Message Date
henrygd
b386ce5190 hub: add ExpiryMap.UpdateExpiration and sync SMART fetch intervals (#1800)
- Update smartFetchMap expiration when agent smart interval changes
- Prevent background SMART fetching before initial system details are
loaded
- Add buffer to SMART fetch timing check
- Get rid of unnecessary pointers in expirymap
2026-03-11 16:25:52 -04:00
henrygd
e527534016 ensure deprecated system fields are migrated to newer structures
also removes refs to legacy load avg fields (l1, l5, l15) that were
around for a very short period
2026-03-10 18:46:57 -04:00
Victor Eduardo
ec7ad632a9 fix: Use historical records to average disk usage for extra disk alerts (#1801)
- Introduced a new test file `alerts_disk_test.go` to validate the behavior of disk alerts using historical data for extra filesystems.
- Enhanced the `HandleSystemAlerts` function to correctly calculate disk usage for extra filesystems based on historical records.
- Updated the `SystemAlertStats` struct to include `ExtraFs` for tracking additional filesystem statistics.
2026-03-09 18:32:35 -04:00
VACInc
963fce5a33 agent: mark mdraid rebuild as warning, not failed (#1797) 2026-03-09 17:54:53 -04:00
Sven van Ginkel
d38c0da06d fix: bypass NIC auto-filter when interface is explicitly whitelisted via NICS (#1805)
Co-authored-by: henrygd <hank@henrygd.me>
2026-03-09 17:47:59 -04:00
henrygd
cae6ac4626 update go version to 1.26.1 2026-03-09 16:10:38 -04:00
henrygd
6b1ff264f2 gpu(amd): add workaround for misreported sysfs filesize (#1799) 2026-03-09 14:53:52 -04:00
henrygd
35d0e792ad refactor(expirymap): optimize performance and add StopCleaner method 2026-03-08 19:09:41 -04:00
henrygd
654cd06b19 respect SMART_INTERVAL across agent reconnects (#1800)
Move tracking of the last SMART data fetch from individual System
instances to the SystemManager using a TTL-based ExpiryMap.

This ensures that the SMART_INTERVAL is respected even if an
agent connection is dropped and re-established, preventing
redundant data collection on every reconnect.
2026-03-08 19:03:50 -04:00
21 changed files with 677 additions and 191 deletions

View File

@@ -33,8 +33,8 @@ func (gm *GPUManager) hasAmdSysfs() bool {
return false return false
} }
for _, vendorPath := range cards { for _, vendorPath := range cards {
vendor, err := os.ReadFile(vendorPath) vendor, err := utils.ReadStringFileLimited(vendorPath, 64)
if err == nil && strings.TrimSpace(string(vendor)) == "0x1002" { if err == nil && vendor == "0x1002" {
return true return true
} }
} }
@@ -88,12 +88,11 @@ func (gm *GPUManager) collectAmdStats() error {
// isAmdGpu checks whether a DRM card path belongs to AMD vendor ID 0x1002. // isAmdGpu checks whether a DRM card path belongs to AMD vendor ID 0x1002.
func isAmdGpu(cardPath string) bool { func isAmdGpu(cardPath string) bool {
vendorPath := filepath.Join(cardPath, "device/vendor") vendor, err := utils.ReadStringFileLimited(filepath.Join(cardPath, "device/vendor"), 64)
vendor, err := os.ReadFile(vendorPath)
if err != nil { if err != nil {
return false return false
} }
return strings.TrimSpace(string(vendor)) == "0x1002" return vendor == "0x1002"
} }
// updateAmdGpuData reads GPU metrics from sysfs and updates the GPU data map. // updateAmdGpuData reads GPU metrics from sysfs and updates the GPU data map.
@@ -155,11 +154,11 @@ func (gm *GPUManager) updateAmdGpuData(cardPath string) bool {
// readSysfsFloat reads and parses a numeric value from a sysfs file. // readSysfsFloat reads and parses a numeric value from a sysfs file.
func readSysfsFloat(path string) (float64, error) { func readSysfsFloat(path string) (float64, error) {
val, err := os.ReadFile(path) val, err := utils.ReadStringFileLimited(path, 64)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return strconv.ParseFloat(strings.TrimSpace(string(val)), 64) return strconv.ParseFloat(val, 64)
} }
// normalizeHexID normalizes hex IDs by trimming spaces, lowercasing, and dropping 0x. // normalizeHexID normalizes hex IDs by trimming spaces, lowercasing, and dropping 0x.
@@ -274,16 +273,16 @@ func cacheMissingAmdgpuName(deviceID, revisionID string) {
// Falls back to showing the raw device ID if not found in the lookup table. // Falls back to showing the raw device ID if not found in the lookup table.
func getAmdGpuName(devicePath string) string { func getAmdGpuName(devicePath string) string {
// Try product_name first (works for some enterprise GPUs) // Try product_name first (works for some enterprise GPUs)
if prod, err := os.ReadFile(filepath.Join(devicePath, "product_name")); err == nil { if prod, err := utils.ReadStringFileLimited(filepath.Join(devicePath, "product_name"), 128); err == nil {
return strings.TrimSpace(string(prod)) return prod
} }
// Read PCI device ID and look it up // Read PCI device ID and look it up
if deviceID, err := os.ReadFile(filepath.Join(devicePath, "device")); err == nil { if deviceID, err := utils.ReadStringFileLimited(filepath.Join(devicePath, "device"), 64); err == nil {
id := normalizeHexID(string(deviceID)) id := normalizeHexID(deviceID)
revision := "" revision := ""
if revBytes, revErr := os.ReadFile(filepath.Join(devicePath, "revision")); revErr == nil { if rev, revErr := utils.ReadStringFileLimited(filepath.Join(devicePath, "revision"), 64); revErr == nil {
revision = normalizeHexID(string(revBytes)) revision = normalizeHexID(rev)
} }
if name, found, done := getCachedAmdgpuName(id, revision); found { if name, found, done := getCachedAmdgpuName(id, revision); found {

View File

@@ -170,11 +170,18 @@ func mdraidSmartStatus(health mdraidHealth) string {
case "inactive", "faulty", "broken", "stopped": case "inactive", "faulty", "broken", "stopped":
return "FAILED" return "FAILED"
} }
// During rebuild/recovery, arrays are often temporarily degraded; report as
// warning instead of hard failure while synchronization is in progress.
syncAction := strings.ToLower(strings.TrimSpace(health.syncAction))
switch syncAction {
case "resync", "recover", "reshape":
return "WARNING"
}
if health.degraded > 0 { if health.degraded > 0 {
return "FAILED" return "FAILED"
} }
switch strings.ToLower(strings.TrimSpace(health.syncAction)) { switch syncAction {
case "resync", "recover", "reshape", "check", "repair": case "check", "repair":
return "WARNING" return "WARNING"
} }
switch state { switch state {

View File

@@ -85,6 +85,9 @@ func TestMdraidSmartStatus(t *testing.T) {
if got := mdraidSmartStatus(mdraidHealth{arrayState: "inactive"}); got != "FAILED" { if got := mdraidSmartStatus(mdraidHealth{arrayState: "inactive"}); got != "FAILED" {
t.Fatalf("mdraidSmartStatus(inactive) = %q, want FAILED", got) t.Fatalf("mdraidSmartStatus(inactive) = %q, want FAILED", got)
} }
if got := mdraidSmartStatus(mdraidHealth{arrayState: "active", degraded: 1, syncAction: "recover"}); got != "WARNING" {
t.Fatalf("mdraidSmartStatus(degraded+recover) = %q, want WARNING", got)
}
if got := mdraidSmartStatus(mdraidHealth{arrayState: "active", degraded: 1}); got != "FAILED" { if got := mdraidSmartStatus(mdraidHealth{arrayState: "active", degraded: 1}); got != "FAILED" {
t.Fatalf("mdraidSmartStatus(degraded) = %q, want FAILED", got) t.Fatalf("mdraidSmartStatus(degraded) = %q, want FAILED", got)
} }

View File

@@ -104,10 +104,7 @@ func (a *Agent) initializeNetIoStats() {
// get current network I/O stats and record valid interfaces // get current network I/O stats and record valid interfaces
if netIO, err := psutilNet.IOCounters(true); err == nil { if netIO, err := psutilNet.IOCounters(true); err == nil {
for _, v := range netIO { for _, v := range netIO {
if nicsEnvExists && !isValidNic(v.Name, nicCfg) { if skipNetworkInterface(v, nicCfg) {
continue
}
if a.skipNetworkInterface(v) {
continue continue
} }
slog.Info("Detected network interface", "name", v.Name, "sent", v.BytesSent, "recv", v.BytesRecv) slog.Info("Detected network interface", "name", v.Name, "sent", v.BytesSent, "recv", v.BytesRecv)
@@ -216,10 +213,8 @@ func (a *Agent) applyNetworkTotals(
totalBytesSent, totalBytesRecv uint64, totalBytesSent, totalBytesRecv uint64,
bytesSentPerSecond, bytesRecvPerSecond uint64, bytesSentPerSecond, bytesRecvPerSecond uint64,
) { ) {
networkSentPs := utils.BytesToMegabytes(float64(bytesSentPerSecond)) if bytesSentPerSecond > 10_000_000_000 || bytesRecvPerSecond > 10_000_000_000 {
networkRecvPs := utils.BytesToMegabytes(float64(bytesRecvPerSecond)) slog.Warn("Invalid net stats. Resetting.", "sent", bytesSentPerSecond, "recv", bytesRecvPerSecond)
if networkSentPs > 10_000 || networkRecvPs > 10_000 {
slog.Warn("Invalid net stats. Resetting.", "sent", networkSentPs, "recv", networkRecvPs)
for _, v := range netIO { for _, v := range netIO {
if _, exists := a.netInterfaces[v.Name]; !exists { if _, exists := a.netInterfaces[v.Name]; !exists {
continue continue
@@ -229,21 +224,29 @@ func (a *Agent) applyNetworkTotals(
a.initializeNetIoStats() a.initializeNetIoStats()
delete(a.netIoStats, cacheTimeMs) delete(a.netIoStats, cacheTimeMs)
delete(a.netInterfaceDeltaTrackers, cacheTimeMs) delete(a.netInterfaceDeltaTrackers, cacheTimeMs)
systemStats.NetworkSent = 0
systemStats.NetworkRecv = 0
systemStats.Bandwidth[0], systemStats.Bandwidth[1] = 0, 0 systemStats.Bandwidth[0], systemStats.Bandwidth[1] = 0, 0
return return
} }
systemStats.NetworkSent = networkSentPs
systemStats.NetworkRecv = networkRecvPs
systemStats.Bandwidth[0], systemStats.Bandwidth[1] = bytesSentPerSecond, bytesRecvPerSecond systemStats.Bandwidth[0], systemStats.Bandwidth[1] = bytesSentPerSecond, bytesRecvPerSecond
nis.BytesSent = totalBytesSent nis.BytesSent = totalBytesSent
nis.BytesRecv = totalBytesRecv nis.BytesRecv = totalBytesRecv
a.netIoStats[cacheTimeMs] = nis a.netIoStats[cacheTimeMs] = nis
} }
func (a *Agent) skipNetworkInterface(v psutilNet.IOCountersStat) bool { // skipNetworkInterface returns true if the network interface should be ignored.
func skipNetworkInterface(v psutilNet.IOCountersStat, nicCfg *NicConfig) bool {
if nicCfg != nil {
if !isValidNic(v.Name, nicCfg) {
return true
}
// In whitelist mode, we honor explicit inclusion without auto-filtering.
if !nicCfg.isBlacklist {
return false
}
// In blacklist mode, still apply the auto-filter below.
}
switch { switch {
case strings.HasPrefix(v.Name, "lo"), case strings.HasPrefix(v.Name, "lo"),
strings.HasPrefix(v.Name, "docker"), strings.HasPrefix(v.Name, "docker"),

View File

@@ -261,6 +261,39 @@ func TestNewNicConfig(t *testing.T) {
}) })
} }
} }
func TestSkipNetworkInterface(t *testing.T) {
tests := []struct {
name string
nic psutilNet.IOCountersStat
nicCfg *NicConfig
expectSkip bool
}{
{"loopback lo", psutilNet.IOCountersStat{Name: "lo", BytesSent: 100, BytesRecv: 100}, nil, true},
{"loopback lo0", psutilNet.IOCountersStat{Name: "lo0", BytesSent: 100, BytesRecv: 100}, nil, true},
{"docker prefix", psutilNet.IOCountersStat{Name: "docker0", BytesSent: 100, BytesRecv: 100}, nil, true},
{"br- prefix", psutilNet.IOCountersStat{Name: "br-lan", BytesSent: 100, BytesRecv: 100}, nil, true},
{"veth prefix", psutilNet.IOCountersStat{Name: "veth0abc", BytesSent: 100, BytesRecv: 100}, nil, true},
{"bond prefix", psutilNet.IOCountersStat{Name: "bond0", BytesSent: 100, BytesRecv: 100}, nil, true},
{"cali prefix", psutilNet.IOCountersStat{Name: "cali1234", BytesSent: 100, BytesRecv: 100}, nil, true},
{"zero BytesRecv", psutilNet.IOCountersStat{Name: "eth0", BytesSent: 100, BytesRecv: 0}, nil, true},
{"zero BytesSent", psutilNet.IOCountersStat{Name: "eth0", BytesSent: 0, BytesRecv: 100}, nil, true},
{"both zero", psutilNet.IOCountersStat{Name: "eth0", BytesSent: 0, BytesRecv: 0}, nil, true},
{"normal eth0", psutilNet.IOCountersStat{Name: "eth0", BytesSent: 100, BytesRecv: 200}, nil, false},
{"normal wlan0", psutilNet.IOCountersStat{Name: "wlan0", BytesSent: 1, BytesRecv: 1}, nil, false},
{"whitelist overrides skip (docker)", psutilNet.IOCountersStat{Name: "docker0", BytesSent: 100, BytesRecv: 100}, newNicConfig("docker0"), false},
{"whitelist overrides skip (lo)", psutilNet.IOCountersStat{Name: "lo", BytesSent: 100, BytesRecv: 100}, newNicConfig("lo"), false},
{"whitelist exclusion", psutilNet.IOCountersStat{Name: "eth1", BytesSent: 100, BytesRecv: 100}, newNicConfig("eth0"), true},
{"blacklist skip lo", psutilNet.IOCountersStat{Name: "lo", BytesSent: 100, BytesRecv: 100}, newNicConfig("-eth0"), true},
{"blacklist explicit eth0", psutilNet.IOCountersStat{Name: "eth0", BytesSent: 100, BytesRecv: 100}, newNicConfig("-eth0"), true},
{"blacklist allow eth1", psutilNet.IOCountersStat{Name: "eth1", BytesSent: 100, BytesRecv: 100}, newNicConfig("-eth0"), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expectSkip, skipNetworkInterface(tt.nic, tt.nicCfg))
})
}
}
func TestEnsureNetworkInterfacesMap(t *testing.T) { func TestEnsureNetworkInterfacesMap(t *testing.T) {
var a Agent var a Agent
var stats system.Stats var stats system.Stats
@@ -383,8 +416,6 @@ func TestApplyNetworkTotals(t *testing.T) {
totalBytesSent uint64 totalBytesSent uint64
totalBytesRecv uint64 totalBytesRecv uint64
expectReset bool expectReset bool
expectedNetworkSent float64
expectedNetworkRecv float64
expectedBandwidthSent uint64 expectedBandwidthSent uint64
expectedBandwidthRecv uint64 expectedBandwidthRecv uint64
}{ }{
@@ -395,8 +426,6 @@ func TestApplyNetworkTotals(t *testing.T) {
totalBytesSent: 10000000, totalBytesSent: 10000000,
totalBytesRecv: 20000000, totalBytesRecv: 20000000,
expectReset: false, expectReset: false,
expectedNetworkSent: 0.95, // ~1 MB/s rounded to 2 decimals
expectedNetworkRecv: 1.91, // ~2 MB/s rounded to 2 decimals
expectedBandwidthSent: 1000000, expectedBandwidthSent: 1000000,
expectedBandwidthRecv: 2000000, expectedBandwidthRecv: 2000000,
}, },
@@ -424,18 +453,6 @@ func TestApplyNetworkTotals(t *testing.T) {
totalBytesRecv: 20000000, totalBytesRecv: 20000000,
expectReset: true, expectReset: true,
}, },
{
name: "Valid network stats - at threshold boundary",
bytesSentPerSecond: 10485750000, // ~9999.99 MB/s (rounds to 9999.99)
bytesRecvPerSecond: 10485750000, // ~9999.99 MB/s (rounds to 9999.99)
totalBytesSent: 10000000,
totalBytesRecv: 20000000,
expectReset: false,
expectedNetworkSent: 9999.99,
expectedNetworkRecv: 9999.99,
expectedBandwidthSent: 10485750000,
expectedBandwidthRecv: 10485750000,
},
{ {
name: "Zero values", name: "Zero values",
bytesSentPerSecond: 0, bytesSentPerSecond: 0,
@@ -443,8 +460,6 @@ func TestApplyNetworkTotals(t *testing.T) {
totalBytesSent: 0, totalBytesSent: 0,
totalBytesRecv: 0, totalBytesRecv: 0,
expectReset: false, expectReset: false,
expectedNetworkSent: 0.0,
expectedNetworkRecv: 0.0,
expectedBandwidthSent: 0, expectedBandwidthSent: 0,
expectedBandwidthRecv: 0, expectedBandwidthRecv: 0,
}, },
@@ -481,14 +496,10 @@ func TestApplyNetworkTotals(t *testing.T) {
// Should have reset network tracking state - maps cleared and stats zeroed // Should have reset network tracking state - maps cleared and stats zeroed
assert.NotContains(t, a.netIoStats, cacheTimeMs, "cache entry should be cleared after reset") assert.NotContains(t, a.netIoStats, cacheTimeMs, "cache entry should be cleared after reset")
assert.NotContains(t, a.netInterfaceDeltaTrackers, cacheTimeMs, "tracker should be cleared on reset") assert.NotContains(t, a.netInterfaceDeltaTrackers, cacheTimeMs, "tracker should be cleared on reset")
assert.Zero(t, systemStats.NetworkSent)
assert.Zero(t, systemStats.NetworkRecv)
assert.Zero(t, systemStats.Bandwidth[0]) assert.Zero(t, systemStats.Bandwidth[0])
assert.Zero(t, systemStats.Bandwidth[1]) assert.Zero(t, systemStats.Bandwidth[1])
} else { } else {
// Should have applied stats // Should have applied stats
assert.Equal(t, tt.expectedNetworkSent, systemStats.NetworkSent)
assert.Equal(t, tt.expectedNetworkRecv, systemStats.NetworkRecv)
assert.Equal(t, tt.expectedBandwidthSent, systemStats.Bandwidth[0]) assert.Equal(t, tt.expectedBandwidthSent, systemStats.Bandwidth[0])
assert.Equal(t, tt.expectedBandwidthRecv, systemStats.Bandwidth[1]) assert.Equal(t, tt.expectedBandwidthRecv, systemStats.Bandwidth[1])

View File

@@ -1,6 +1,7 @@
package utils package utils
import ( import (
"io"
"math" "math"
"os" "os"
"strconv" "strconv"
@@ -50,6 +51,23 @@ func ReadStringFileOK(path string) (string, bool) {
return strings.TrimSpace(string(b)), true return strings.TrimSpace(string(b)), true
} }
// ReadStringFileLimited reads a file into a string with a maximum size (in bytes) to avoid
// allocating large buffers and potential panics with pseudo-files when the size is misreported.
func ReadStringFileLimited(path string, maxSize int) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
buf := make([]byte, maxSize)
n, err := f.Read(buf)
if err != nil && err != io.EOF {
return "", err
}
return strings.TrimSpace(string(buf[:n])), nil
}
// FileExists reports whether the given path exists. // FileExists reports whether the given path exists.
func FileExists(path string) bool { func FileExists(path string) bool {
_, err := os.Stat(path) _, err := os.Stat(path)

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/henrygd/beszel module github.com/henrygd/beszel
go 1.26.0 go 1.26.1
require ( require (
github.com/blang/semver v3.5.1+incompatible github.com/blang/semver v3.5.1+incompatible

View File

@@ -40,16 +40,22 @@ type UserNotificationSettings struct {
Webhooks []string `json:"webhooks"` Webhooks []string `json:"webhooks"`
} }
type SystemAlertFsStats struct {
DiskTotal float64 `json:"d"`
DiskUsed float64 `json:"du"`
}
// Values pulled from system_stats.stats that are relevant to alerts.
type SystemAlertStats struct { type SystemAlertStats struct {
Cpu float64 `json:"cpu"` Cpu float64 `json:"cpu"`
Mem float64 `json:"mp"` Mem float64 `json:"mp"`
Disk float64 `json:"dp"` Disk float64 `json:"dp"`
NetSent float64 `json:"ns"` Bandwidth [2]uint64 `json:"b"`
NetRecv float64 `json:"nr"`
GPU map[string]SystemAlertGPUData `json:"g"` GPU map[string]SystemAlertGPUData `json:"g"`
Temperatures map[string]float32 `json:"t"` Temperatures map[string]float32 `json:"t"`
LoadAvg [3]float64 `json:"la"` LoadAvg [3]float64 `json:"la"`
Battery [2]uint8 `json:"bat"` Battery [2]uint8 `json:"bat"`
ExtraFs map[string]SystemAlertFsStats `json:"efs"`
} }
type SystemAlertGPUData struct { type SystemAlertGPUData struct {
@@ -259,13 +265,14 @@ func (am *AlertManager) SendShoutrrrAlert(notificationUrl, title, message, link,
} }
// Add link // Add link
if scheme == "ntfy" { switch scheme {
case "ntfy":
queryParams.Add("Actions", fmt.Sprintf("view, %s, %s", linkText, link)) queryParams.Add("Actions", fmt.Sprintf("view, %s, %s", linkText, link))
} else if scheme == "lark" { case "lark":
queryParams.Add("link", link) queryParams.Add("link", link)
} else if scheme == "bark" { case "bark":
queryParams.Add("url", link) queryParams.Add("url", link)
} else { default:
message += "\n\n" + link message += "\n\n" + link
} }

View File

@@ -0,0 +1,155 @@
//go:build testing
package alerts_test
import (
"encoding/json"
"testing"
"time"
"github.com/henrygd/beszel/internal/entities/system"
beszelTests "github.com/henrygd/beszel/internal/tests"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/tools/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestDiskAlertExtraFsMultiMinute tests that multi-minute disk alerts correctly use
// historical per-minute values for extra (non-root) filesystems, not the current live snapshot.
func TestDiskAlertExtraFsMultiMinute(t *testing.T) {
hub, user := beszelTests.GetHubWithUser(t)
defer hub.Cleanup()
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up")
require.NoError(t, err)
systemRecord := systems[0]
// Disk alert: threshold 80%, min=2 (requires historical averaging)
diskAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "Disk",
"system": systemRecord.Id,
"user": user.Id,
"value": 80, // threshold: 80%
"min": 2, // 2 minutes - requires historical averaging
})
require.NoError(t, err)
assert.False(t, diskAlert.GetBool("triggered"), "Alert should not be triggered initially")
am := hub.GetAlertManager()
now := time.Now().UTC()
extraFsHigh := map[string]*system.FsStats{
"/mnt/data": {DiskTotal: 1000, DiskUsed: 920}, // 92% - above threshold
}
// Insert 4 historical records spread over 3 minutes (same pattern as battery tests).
// The oldest record must predate (now - 2min) so the alert time window is valid.
recordTimes := []time.Duration{
-180 * time.Second, // 3 min ago - anchors oldest record before alert.time
-90 * time.Second,
-60 * time.Second,
-30 * time.Second,
}
for _, offset := range recordTimes {
stats := system.Stats{
DiskPct: 30, // root disk at 30% - below threshold
ExtraFs: extraFsHigh,
}
statsJSON, _ := json.Marshal(stats)
recordTime := now.Add(offset)
record, err := beszelTests.CreateRecord(hub, "system_stats", map[string]any{
"system": systemRecord.Id,
"type": "1m",
"stats": string(statsJSON),
})
require.NoError(t, err)
record.SetRaw("created", recordTime.Format(types.DefaultDateLayout))
err = hub.SaveNoValidate(record)
require.NoError(t, err)
}
combinedDataHigh := &system.CombinedData{
Stats: system.Stats{
DiskPct: 30,
ExtraFs: extraFsHigh,
},
Info: system.Info{
DiskPct: 30,
},
}
systemRecord.Set("updated", now)
err = hub.SaveNoValidate(systemRecord)
require.NoError(t, err)
err = am.HandleSystemAlerts(systemRecord, combinedDataHigh)
require.NoError(t, err)
time.Sleep(20 * time.Millisecond)
diskAlert, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": diskAlert.Id})
require.NoError(t, err)
assert.True(t, diskAlert.GetBool("triggered"),
"Alert SHOULD be triggered when extra disk average (92%%) exceeds threshold (80%%)")
// --- Resolution: extra disk drops to 50%, alert should resolve ---
extraFsLow := map[string]*system.FsStats{
"/mnt/data": {DiskTotal: 1000, DiskUsed: 500}, // 50% - below threshold
}
newNow := now.Add(2 * time.Minute)
recordTimesLow := []time.Duration{
-180 * time.Second,
-90 * time.Second,
-60 * time.Second,
-30 * time.Second,
}
for _, offset := range recordTimesLow {
stats := system.Stats{
DiskPct: 30,
ExtraFs: extraFsLow,
}
statsJSON, _ := json.Marshal(stats)
recordTime := newNow.Add(offset)
record, err := beszelTests.CreateRecord(hub, "system_stats", map[string]any{
"system": systemRecord.Id,
"type": "1m",
"stats": string(statsJSON),
})
require.NoError(t, err)
record.SetRaw("created", recordTime.Format(types.DefaultDateLayout))
err = hub.SaveNoValidate(record)
require.NoError(t, err)
}
combinedDataLow := &system.CombinedData{
Stats: system.Stats{
DiskPct: 30,
ExtraFs: extraFsLow,
},
Info: system.Info{
DiskPct: 30,
},
}
systemRecord.Set("updated", newNow)
err = hub.SaveNoValidate(systemRecord)
require.NoError(t, err)
err = am.HandleSystemAlerts(systemRecord, combinedDataLow)
require.NoError(t, err)
time.Sleep(20 * time.Millisecond)
diskAlert, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": diskAlert.Id})
require.NoError(t, err)
assert.False(t, diskAlert.GetBool("triggered"),
"Alert should be resolved when extra disk average (50%%) drops below threshold (80%%)")
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/pocketbase/dbx" "github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/types" "github.com/pocketbase/pocketbase/tools/types"
"github.com/spf13/cast"
) )
func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *system.CombinedData) error { func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *system.CombinedData) error {
@@ -92,7 +91,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
} }
} }
min := max(1, cast.ToUint8(alertRecord.Get("min"))) min := max(1, uint8(alertRecord.GetInt("min")))
alert := SystemAlertData{ alert := SystemAlertData{
systemRecord: systemRecord, systemRecord: systemRecord,
@@ -192,22 +191,24 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
case "Memory": case "Memory":
alert.val += stats.Mem alert.val += stats.Mem
case "Bandwidth": case "Bandwidth":
alert.val += stats.NetSent + stats.NetRecv alert.val += float64(stats.Bandwidth[0]+stats.Bandwidth[1]) / (1024 * 1024)
case "Disk": case "Disk":
if alert.mapSums == nil { if alert.mapSums == nil {
alert.mapSums = make(map[string]float32, len(data.Stats.ExtraFs)+1) alert.mapSums = make(map[string]float32, len(stats.ExtraFs)+1)
} }
// add root disk // add root disk
if _, ok := alert.mapSums["root"]; !ok { if _, ok := alert.mapSums["root"]; !ok {
alert.mapSums["root"] = 0.0 alert.mapSums["root"] = 0.0
} }
alert.mapSums["root"] += float32(stats.Disk) alert.mapSums["root"] += float32(stats.Disk)
// add extra disks // add extra disks from historical record
for key, fs := range data.Stats.ExtraFs { for key, fs := range stats.ExtraFs {
if _, ok := alert.mapSums[key]; !ok { if fs.DiskTotal > 0 {
alert.mapSums[key] = 0.0 if _, ok := alert.mapSums[key]; !ok {
alert.mapSums[key] = 0.0
}
alert.mapSums[key] += float32(fs.DiskUsed / fs.DiskTotal * 100)
} }
alert.mapSums[key] += float32(fs.DiskUsed / fs.DiskTotal * 100)
} }
case "Temperature": case "Temperature":
if alert.mapSums == nil { if alert.mapSums == nil {

View File

@@ -12,8 +12,9 @@ import (
type Stats struct { type Stats struct {
Cpu float64 `json:"cpu" cbor:"0,keyasint"` Cpu float64 `json:"cpu" cbor:"0,keyasint"`
MaxCpu float64 `json:"cpum,omitempty" cbor:"1,keyasint,omitempty"` MaxCpu float64 `json:"cpum,omitempty" cbor:"-"`
Mem float64 `json:"m" cbor:"2,keyasint"` Mem float64 `json:"m" cbor:"2,keyasint"`
MaxMem float64 `json:"mm,omitempty" cbor:"-"`
MemUsed float64 `json:"mu" cbor:"3,keyasint"` MemUsed float64 `json:"mu" cbor:"3,keyasint"`
MemPct float64 `json:"mp" cbor:"4,keyasint"` MemPct float64 `json:"mp" cbor:"4,keyasint"`
MemBuffCache float64 `json:"mb" cbor:"5,keyasint"` MemBuffCache float64 `json:"mb" cbor:"5,keyasint"`
@@ -23,26 +24,25 @@ type Stats struct {
DiskTotal float64 `json:"d" cbor:"9,keyasint"` DiskTotal float64 `json:"d" cbor:"9,keyasint"`
DiskUsed float64 `json:"du" cbor:"10,keyasint"` DiskUsed float64 `json:"du" cbor:"10,keyasint"`
DiskPct float64 `json:"dp" cbor:"11,keyasint"` DiskPct float64 `json:"dp" cbor:"11,keyasint"`
DiskReadPs float64 `json:"dr" cbor:"12,keyasint"` DiskReadPs float64 `json:"dr,omitzero" cbor:"12,keyasint,omitzero"`
DiskWritePs float64 `json:"dw" cbor:"13,keyasint"` DiskWritePs float64 `json:"dw,omitzero" cbor:"13,keyasint,omitzero"`
MaxDiskReadPs float64 `json:"drm,omitempty" cbor:"14,keyasint,omitempty"` MaxDiskReadPs float64 `json:"drm,omitempty" cbor:"-"`
MaxDiskWritePs float64 `json:"dwm,omitempty" cbor:"15,keyasint,omitempty"` MaxDiskWritePs float64 `json:"dwm,omitempty" cbor:"-"`
NetworkSent float64 `json:"ns,omitzero" cbor:"16,keyasint,omitzero"` NetworkSent float64 `json:"ns,omitzero" cbor:"16,keyasint,omitzero"`
NetworkRecv float64 `json:"nr,omitzero" cbor:"17,keyasint,omitzero"` NetworkRecv float64 `json:"nr,omitzero" cbor:"17,keyasint,omitzero"`
MaxNetworkSent float64 `json:"nsm,omitempty" cbor:"18,keyasint,omitempty"` MaxNetworkSent float64 `json:"nsm,omitempty" cbor:"-"`
MaxNetworkRecv float64 `json:"nrm,omitempty" cbor:"19,keyasint,omitempty"` MaxNetworkRecv float64 `json:"nrm,omitempty" cbor:"-"`
Temperatures map[string]float64 `json:"t,omitempty" cbor:"20,keyasint,omitempty"` Temperatures map[string]float64 `json:"t,omitempty" cbor:"20,keyasint,omitempty"`
ExtraFs map[string]*FsStats `json:"efs,omitempty" cbor:"21,keyasint,omitempty"` ExtraFs map[string]*FsStats `json:"efs,omitempty" cbor:"21,keyasint,omitempty"`
GPUData map[string]GPUData `json:"g,omitempty" cbor:"22,keyasint,omitempty"` GPUData map[string]GPUData `json:"g,omitempty" cbor:"22,keyasint,omitempty"`
LoadAvg1 float64 `json:"l1,omitempty" cbor:"23,keyasint,omitempty"` // LoadAvg1 float64 `json:"l1,omitempty" cbor:"23,keyasint,omitempty"`
LoadAvg5 float64 `json:"l5,omitempty" cbor:"24,keyasint,omitempty"` // LoadAvg5 float64 `json:"l5,omitempty" cbor:"24,keyasint,omitempty"`
LoadAvg15 float64 `json:"l15,omitempty" cbor:"25,keyasint,omitempty"` // LoadAvg15 float64 `json:"l15,omitempty" cbor:"25,keyasint,omitempty"`
Bandwidth [2]uint64 `json:"b,omitzero" cbor:"26,keyasint,omitzero"` // [sent bytes, recv bytes] Bandwidth [2]uint64 `json:"b,omitzero" cbor:"26,keyasint,omitzero"` // [sent bytes, recv bytes]
MaxBandwidth [2]uint64 `json:"bm,omitzero" cbor:"27,keyasint,omitzero"` // [sent bytes, recv bytes] MaxBandwidth [2]uint64 `json:"bm,omitzero" cbor:"-"` // [sent bytes, recv bytes]
// TODO: remove other load fields in future release in favor of load avg array // TODO: remove other load fields in future release in favor of load avg array
LoadAvg [3]float64 `json:"la,omitempty" cbor:"28,keyasint"` LoadAvg [3]float64 `json:"la,omitempty" cbor:"28,keyasint"`
Battery [2]uint8 `json:"bat,omitzero" cbor:"29,keyasint,omitzero"` // [percent, charge state, current] Battery [2]uint8 `json:"bat,omitzero" cbor:"29,keyasint,omitzero"` // [percent, charge state, current]
MaxMem float64 `json:"mm,omitempty" cbor:"30,keyasint,omitempty"`
NetworkInterfaces map[string][4]uint64 `json:"ni,omitempty" cbor:"31,keyasint,omitempty"` // [upload bytes, download bytes, total upload, total download] NetworkInterfaces map[string][4]uint64 `json:"ni,omitempty" cbor:"31,keyasint,omitempty"` // [upload bytes, download bytes, total upload, total download]
DiskIO [2]uint64 `json:"dio,omitzero" cbor:"32,keyasint,omitzero"` // [read bytes, write bytes] DiskIO [2]uint64 `json:"dio,omitzero" cbor:"32,keyasint,omitzero"` // [read bytes, write bytes]
MaxDiskIO [2]uint64 `json:"diom,omitzero" cbor:"-"` // [max read bytes, max write bytes] MaxDiskIO [2]uint64 `json:"diom,omitzero" cbor:"-"` // [max read bytes, max write bytes]
@@ -90,8 +90,8 @@ type FsStats struct {
TotalWrite uint64 `json:"-"` TotalWrite uint64 `json:"-"`
DiskReadPs float64 `json:"r" cbor:"2,keyasint"` DiskReadPs float64 `json:"r" cbor:"2,keyasint"`
DiskWritePs float64 `json:"w" cbor:"3,keyasint"` DiskWritePs float64 `json:"w" cbor:"3,keyasint"`
MaxDiskReadPS float64 `json:"rm,omitempty" cbor:"4,keyasint,omitempty"` MaxDiskReadPS float64 `json:"rm,omitempty" cbor:"-"`
MaxDiskWritePS float64 `json:"wm,omitempty" cbor:"5,keyasint,omitempty"` MaxDiskWritePS float64 `json:"wm,omitempty" cbor:"-"`
// TODO: remove DiskReadPs and DiskWritePs in future release in favor of DiskReadBytes and DiskWriteBytes // TODO: remove DiskReadPs and DiskWritePs in future release in favor of DiskReadBytes and DiskWriteBytes
DiskReadBytes uint64 `json:"rb" cbor:"6,keyasint,omitempty"` DiskReadBytes uint64 `json:"rb" cbor:"6,keyasint,omitempty"`
DiskWriteBytes uint64 `json:"wb" cbor:"7,keyasint,omitempty"` DiskWriteBytes uint64 `json:"wb" cbor:"7,keyasint,omitempty"`
@@ -129,23 +129,23 @@ type Info struct {
KernelVersion string `json:"k,omitempty" cbor:"1,keyasint,omitempty"` // deprecated - moved to Details struct KernelVersion string `json:"k,omitempty" cbor:"1,keyasint,omitempty"` // deprecated - moved to Details struct
Cores int `json:"c,omitzero" cbor:"2,keyasint,omitzero"` // deprecated - moved to Details struct Cores int `json:"c,omitzero" cbor:"2,keyasint,omitzero"` // deprecated - moved to Details struct
// Threads is needed in Info struct to calculate load average thresholds // Threads is needed in Info struct to calculate load average thresholds
Threads int `json:"t,omitempty" cbor:"3,keyasint,omitempty"` Threads int `json:"t,omitempty" cbor:"3,keyasint,omitempty"`
CpuModel string `json:"m,omitempty" cbor:"4,keyasint,omitempty"` // deprecated - moved to Details struct CpuModel string `json:"m,omitempty" cbor:"4,keyasint,omitempty"` // deprecated - moved to Details struct
Uptime uint64 `json:"u" cbor:"5,keyasint"` Uptime uint64 `json:"u" cbor:"5,keyasint"`
Cpu float64 `json:"cpu" cbor:"6,keyasint"` Cpu float64 `json:"cpu" cbor:"6,keyasint"`
MemPct float64 `json:"mp" cbor:"7,keyasint"` MemPct float64 `json:"mp" cbor:"7,keyasint"`
DiskPct float64 `json:"dp" cbor:"8,keyasint"` DiskPct float64 `json:"dp" cbor:"8,keyasint"`
Bandwidth float64 `json:"b" cbor:"9,keyasint"` Bandwidth float64 `json:"b,omitzero" cbor:"9,keyasint"` // deprecated in favor of BandwidthBytes
AgentVersion string `json:"v" cbor:"10,keyasint"` AgentVersion string `json:"v" cbor:"10,keyasint"`
Podman bool `json:"p,omitempty" cbor:"11,keyasint,omitempty"` // deprecated - moved to Details struct Podman bool `json:"p,omitempty" cbor:"11,keyasint,omitempty"` // deprecated - moved to Details struct
GpuPct float64 `json:"g,omitempty" cbor:"12,keyasint,omitempty"` GpuPct float64 `json:"g,omitempty" cbor:"12,keyasint,omitempty"`
DashboardTemp float64 `json:"dt,omitempty" cbor:"13,keyasint,omitempty"` DashboardTemp float64 `json:"dt,omitempty" cbor:"13,keyasint,omitempty"`
Os Os `json:"os,omitempty" cbor:"14,keyasint,omitempty"` // deprecated - moved to Details struct Os Os `json:"os,omitempty" cbor:"14,keyasint,omitempty"` // deprecated - moved to Details struct
LoadAvg1 float64 `json:"l1,omitempty" cbor:"15,keyasint,omitempty"` // deprecated - use `la` array instead // LoadAvg1 float64 `json:"l1,omitempty" cbor:"15,keyasint,omitempty"` // deprecated - use `la` array instead
LoadAvg5 float64 `json:"l5,omitempty" cbor:"16,keyasint,omitempty"` // deprecated - use `la` array instead // LoadAvg5 float64 `json:"l5,omitempty" cbor:"16,keyasint,omitempty"` // deprecated - use `la` array instead
LoadAvg15 float64 `json:"l15,omitempty" cbor:"17,keyasint,omitempty"` // deprecated - use `la` array instead // LoadAvg15 float64 `json:"l15,omitempty" cbor:"17,keyasint,omitempty"` // deprecated - use `la` array instead
BandwidthBytes uint64 `json:"bb" cbor:"18,keyasint"`
BandwidthBytes uint64 `json:"bb" cbor:"18,keyasint"`
LoadAvg [3]float64 `json:"la,omitempty" cbor:"19,keyasint"` LoadAvg [3]float64 `json:"la,omitempty" cbor:"19,keyasint"`
ConnectionType ConnectionType `json:"ct,omitempty" cbor:"20,keyasint,omitempty,omitzero"` ConnectionType ConnectionType `json:"ct,omitempty" cbor:"20,keyasint,omitempty,omitzero"`
ExtraFsPct map[string]float64 `json:"efs,omitempty" cbor:"21,keyasint,omitempty"` ExtraFsPct map[string]float64 `json:"efs,omitempty" cbor:"21,keyasint,omitempty"`

View File

@@ -1,35 +1,39 @@
// Package expirymap provides a thread-safe map with expiring entries.
// It supports TTL-based expiration with both lazy cleanup on access
// and periodic background cleanup.
package expirymap package expirymap
import ( import (
"reflect" "sync"
"time" "time"
"github.com/pocketbase/pocketbase/tools/store" "github.com/pocketbase/pocketbase/tools/store"
) )
type val[T any] struct { type val[T comparable] struct {
value T value T
expires time.Time expires time.Time
} }
type ExpiryMap[T any] struct { type ExpiryMap[T comparable] struct {
store *store.Store[string, *val[T]] store store.Store[string, val[T]]
cleanupInterval time.Duration stopChan chan struct{}
stopOnce sync.Once
} }
// New creates a new expiry map with custom cleanup interval // New creates a new expiry map with custom cleanup interval
func New[T any](cleanupInterval time.Duration) *ExpiryMap[T] { func New[T comparable](cleanupInterval time.Duration) *ExpiryMap[T] {
m := &ExpiryMap[T]{ m := &ExpiryMap[T]{
store: store.New(map[string]*val[T]{}), store: *store.New(map[string]val[T]{}),
cleanupInterval: cleanupInterval, stopChan: make(chan struct{}),
} }
m.startCleaner() go m.startCleaner(cleanupInterval)
return m return m
} }
// Set stores a value with the given TTL // Set stores a value with the given TTL
func (m *ExpiryMap[T]) Set(key string, value T, ttl time.Duration) { func (m *ExpiryMap[T]) Set(key string, value T, ttl time.Duration) {
m.store.Set(key, &val[T]{ m.store.Set(key, val[T]{
value: value, value: value,
expires: time.Now().Add(ttl), expires: time.Now().Add(ttl),
}) })
@@ -55,7 +59,7 @@ func (m *ExpiryMap[T]) GetOk(key string) (T, bool) {
// GetByValue retrieves a value by value // GetByValue retrieves a value by value
func (m *ExpiryMap[T]) GetByValue(val T) (key string, value T, ok bool) { func (m *ExpiryMap[T]) GetByValue(val T) (key string, value T, ok bool) {
for key, v := range m.store.GetAll() { for key, v := range m.store.GetAll() {
if reflect.DeepEqual(v.value, val) { if v.value == val {
// check if expired // check if expired
if v.expires.Before(time.Now()) { if v.expires.Before(time.Now()) {
m.store.Remove(key) m.store.Remove(key)
@@ -75,7 +79,7 @@ func (m *ExpiryMap[T]) Remove(key string) {
// RemovebyValue removes a value by value // RemovebyValue removes a value by value
func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) { func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) {
for key, val := range m.store.GetAll() { for key, val := range m.store.GetAll() {
if reflect.DeepEqual(val.value, value) { if val.value == value {
m.store.Remove(key) m.store.Remove(key)
return val.value, true return val.value, true
} }
@@ -84,13 +88,23 @@ func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) {
} }
// startCleaner runs the background cleanup process // startCleaner runs the background cleanup process
func (m *ExpiryMap[T]) startCleaner() { func (m *ExpiryMap[T]) startCleaner(interval time.Duration) {
go func() { tick := time.Tick(interval)
tick := time.Tick(m.cleanupInterval) for {
for range tick { select {
case <-tick:
m.cleanup() m.cleanup()
case <-m.stopChan:
return
} }
}() }
}
// StopCleaner stops the background cleanup process
func (m *ExpiryMap[T]) StopCleaner() {
m.stopOnce.Do(func() {
close(m.stopChan)
})
} }
// cleanup removes all expired entries // cleanup removes all expired entries
@@ -102,3 +116,12 @@ func (m *ExpiryMap[T]) cleanup() {
} }
} }
} }
// UpdateExpiration updates the expiration time of a key
func (m *ExpiryMap[T]) UpdateExpiration(key string, ttl time.Duration) {
value, ok := m.store.GetOk(key)
if ok {
value.expires = time.Now().Add(ttl)
m.store.Set(key, value)
}
}

View File

@@ -4,6 +4,7 @@ package expirymap
import ( import (
"testing" "testing"
"testing/synctest"
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -177,6 +178,33 @@ func TestExpiryMap_GenericTypes(t *testing.T) {
}) })
} }
func TestExpiryMap_UpdateExpiration(t *testing.T) {
em := New[string](time.Hour)
// Set a value with short TTL
em.Set("key1", "value1", time.Millisecond*50)
// Verify it exists
assert.True(t, em.Has("key1"))
// Update expiration to a longer TTL
em.UpdateExpiration("key1", time.Hour)
// Wait for the original TTL to pass
time.Sleep(time.Millisecond * 100)
// Should still exist because expiration was updated
assert.True(t, em.Has("key1"))
value, ok := em.GetOk("key1")
assert.True(t, ok)
assert.Equal(t, "value1", value)
// Try updating non-existent key (should not panic)
assert.NotPanics(t, func() {
em.UpdateExpiration("nonexistent", time.Hour)
})
}
func TestExpiryMap_ZeroValues(t *testing.T) { func TestExpiryMap_ZeroValues(t *testing.T) {
em := New[string](time.Hour) em := New[string](time.Hour)
@@ -473,3 +501,52 @@ func TestExpiryMap_ValueOperations_Integration(t *testing.T) {
assert.Equal(t, "unique", value) assert.Equal(t, "unique", value)
assert.Equal(t, "key2", key) assert.Equal(t, "key2", key)
} }
func TestExpiryMap_Cleaner(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
em := New[string](time.Second)
defer em.StopCleaner()
em.Set("test", "value", 500*time.Millisecond)
// Wait 600ms, value is expired but cleaner hasn't run yet (interval is 1s)
time.Sleep(600 * time.Millisecond)
synctest.Wait()
// Map should still hold the value in its internal store before lazy access or cleaner
assert.Equal(t, 1, len(em.store.GetAll()), "store should still have 1 item before cleaner runs")
// Wait another 500ms so cleaner (1s interval) runs
time.Sleep(500 * time.Millisecond)
synctest.Wait() // Wait for background goroutine to process the tick
assert.Equal(t, 0, len(em.store.GetAll()), "store should be empty after cleaner runs")
})
}
func TestExpiryMap_StopCleaner(t *testing.T) {
em := New[string](time.Hour)
// Initially, stopChan is open, reading would block
select {
case <-em.stopChan:
t.Fatal("stopChan should be open initially")
default:
// success
}
em.StopCleaner()
// After StopCleaner, stopChan is closed, reading returns immediately
select {
case <-em.stopChan:
// success
default:
t.Fatal("stopChan was not closed by StopCleaner")
}
// Calling StopCleaner again should NOT panic thanks to sync.Once
assert.NotPanics(t, func() {
em.StopCleaner()
})
}

View File

@@ -48,7 +48,6 @@ type System struct {
detailsFetched atomic.Bool // True if static system details have been fetched and saved detailsFetched atomic.Bool // True if static system details have been fetched and saved
smartFetching atomic.Bool // True if SMART devices are currently being fetched smartFetching atomic.Bool // True if SMART devices are currently being fetched
smartInterval time.Duration // Interval for periodic SMART data updates smartInterval time.Duration // Interval for periodic SMART data updates
lastSmartFetch atomic.Int64 // Unix milliseconds of last SMART data fetch
} }
func (sm *SystemManager) NewSystem(systemId string) *System { func (sm *SystemManager) NewSystem(systemId string) *System {
@@ -134,19 +133,34 @@ func (sys *System) update() error {
return err return err
} }
// ensure deprecated fields from older agents are migrated to current fields
migrateDeprecatedFields(data, !sys.detailsFetched.Load())
// create system records // create system records
_, err = sys.createRecords(data) _, err = sys.createRecords(data)
// if details were included and fetched successfully, mark details as fetched and update smart interval if set by agent
if err == nil && data.Details != nil {
sys.detailsFetched.Store(true)
// update smart interval if it's set on the agent side
if data.Details.SmartInterval > 0 {
sys.smartInterval = data.Details.SmartInterval
// make sure we reset expiration of lastFetch to remain as long as the new smart interval
// to prevent premature expiration leading to new fetch if interval is different.
sys.manager.smartFetchMap.UpdateExpiration(sys.Id, sys.smartInterval+time.Minute)
}
}
// Fetch and save SMART devices when system first comes online or at intervals // Fetch and save SMART devices when system first comes online or at intervals
if backgroundSmartFetchEnabled() { if backgroundSmartFetchEnabled() && sys.detailsFetched.Load() {
if sys.smartInterval <= 0 { if sys.smartInterval <= 0 {
sys.smartInterval = time.Hour sys.smartInterval = time.Hour
} }
lastFetch := sys.lastSmartFetch.Load() lastFetch, _ := sys.manager.smartFetchMap.GetOk(sys.Id)
if time.Since(time.UnixMilli(lastFetch)) >= sys.smartInterval && sys.smartFetching.CompareAndSwap(false, true) { if time.Since(time.UnixMilli(lastFetch-1e4)) >= sys.smartInterval && sys.smartFetching.CompareAndSwap(false, true) {
go func() { go func() {
defer sys.smartFetching.Store(false) defer sys.smartFetching.Store(false)
sys.lastSmartFetch.Store(time.Now().UnixMilli()) sys.manager.smartFetchMap.Set(sys.Id, time.Now().UnixMilli(), sys.smartInterval+time.Minute)
_ = sys.FetchAndSaveSmartDevices() _ = sys.FetchAndSaveSmartDevices()
}() }()
} }
@@ -221,11 +235,6 @@ func (sys *System) createRecords(data *system.CombinedData) (*core.Record, error
if err := createSystemDetailsRecord(txApp, data.Details, sys.Id); err != nil { if err := createSystemDetailsRecord(txApp, data.Details, sys.Id); err != nil {
return err return err
} }
sys.detailsFetched.Store(true)
// update smart interval if it's set on the agent side
if data.Details.SmartInterval > 0 {
sys.smartInterval = data.Details.SmartInterval
}
} }
// update system record (do this last because it triggers alerts and we need above records to be inserted first) // update system record (do this last because it triggers alerts and we need above records to be inserted first)
@@ -703,3 +712,50 @@ func getJitter() <-chan time.Time {
msDelay := (interval * minPercent / 100) + rand.Intn(interval*jitterRange/100) msDelay := (interval * minPercent / 100) + rand.Intn(interval*jitterRange/100)
return time.After(time.Duration(msDelay) * time.Millisecond) return time.After(time.Duration(msDelay) * time.Millisecond)
} }
// migrateDeprecatedFields moves values from deprecated fields to their new locations if the new
// fields are not already populated. Deprecated fields and refs may be removed at least 30 days
// and one minor version release after the release that includes the migration.
//
// This is run when processing incoming system data from agents, which may be on older versions.
func migrateDeprecatedFields(cd *system.CombinedData, createDetails bool) {
// migration added 0.19.0
if cd.Stats.Bandwidth[0] == 0 && cd.Stats.Bandwidth[1] == 0 {
cd.Stats.Bandwidth[0] = uint64(cd.Stats.NetworkSent * 1024 * 1024)
cd.Stats.Bandwidth[1] = uint64(cd.Stats.NetworkRecv * 1024 * 1024)
cd.Stats.NetworkSent, cd.Stats.NetworkRecv = 0, 0
}
// migration added 0.19.0
if cd.Info.BandwidthBytes == 0 {
cd.Info.BandwidthBytes = uint64(cd.Info.Bandwidth * 1024 * 1024)
cd.Info.Bandwidth = 0
}
// migration added 0.19.0
if cd.Stats.DiskIO[0] == 0 && cd.Stats.DiskIO[1] == 0 {
cd.Stats.DiskIO[0] = uint64(cd.Stats.DiskReadPs * 1024 * 1024)
cd.Stats.DiskIO[1] = uint64(cd.Stats.DiskWritePs * 1024 * 1024)
cd.Stats.DiskReadPs, cd.Stats.DiskWritePs = 0, 0
}
// migration added 0.19.0 - Move deprecated Info fields to Details struct
if cd.Details == nil && cd.Info.Hostname != "" {
if createDetails {
cd.Details = &system.Details{
Hostname: cd.Info.Hostname,
Kernel: cd.Info.KernelVersion,
Cores: cd.Info.Cores,
Threads: cd.Info.Threads,
CpuModel: cd.Info.CpuModel,
Podman: cd.Info.Podman,
Os: cd.Info.Os,
MemoryTotal: uint64(cd.Stats.Mem * 1024 * 1024 * 1024),
}
}
// zero the deprecated fields to prevent saving them in systems.info DB json payload
cd.Info.Hostname = ""
cd.Info.KernelVersion = ""
cd.Info.Cores = 0
cd.Info.CpuModel = ""
cd.Info.Podman = false
cd.Info.Os = 0
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/henrygd/beszel/internal/hub/ws" "github.com/henrygd/beszel/internal/hub/ws"
"github.com/henrygd/beszel/internal/entities/system" "github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/hub/expirymap"
"github.com/henrygd/beszel/internal/common" "github.com/henrygd/beszel/internal/common"
@@ -40,9 +41,10 @@ var errSystemExists = errors.New("system exists")
// SystemManager manages a collection of monitored systems and their connections. // SystemManager manages a collection of monitored systems and their connections.
// It handles system lifecycle, status updates, and maintains both SSH and WebSocket connections. // It handles system lifecycle, status updates, and maintains both SSH and WebSocket connections.
type SystemManager struct { type SystemManager struct {
hub hubLike // Hub interface for database and alert operations hub hubLike // Hub interface for database and alert operations
systems *store.Store[string, *System] // Thread-safe store of active systems systems *store.Store[string, *System] // Thread-safe store of active systems
sshConfig *ssh.ClientConfig // SSH client configuration for system connections sshConfig *ssh.ClientConfig // SSH client configuration for system connections
smartFetchMap *expirymap.ExpiryMap[int64] // Stores last SMART fetch time per system ID
} }
// hubLike defines the interface requirements for the hub dependency. // hubLike defines the interface requirements for the hub dependency.
@@ -58,8 +60,9 @@ type hubLike interface {
// The hub must implement the hubLike interface to provide database and alert functionality. // The hub must implement the hubLike interface to provide database and alert functionality.
func NewSystemManager(hub hubLike) *SystemManager { func NewSystemManager(hub hubLike) *SystemManager {
return &SystemManager{ return &SystemManager{
systems: store.New(map[string]*System{}), systems: store.New(map[string]*System{}),
hub: hub, hub: hub,
smartFetchMap: expirymap.New[int64](time.Hour),
} }
} }

View File

@@ -0,0 +1,159 @@
//go:build testing
package systems
import (
"testing"
"github.com/henrygd/beszel/internal/entities/system"
)
func TestCombinedData_MigrateDeprecatedFields(t *testing.T) {
t.Run("Migrate NetworkSent and NetworkRecv to Bandwidth", func(t *testing.T) {
cd := &system.CombinedData{
Stats: system.Stats{
NetworkSent: 1.5, // 1.5 MB
NetworkRecv: 2.5, // 2.5 MB
},
}
migrateDeprecatedFields(cd, true)
expectedSent := uint64(1.5 * 1024 * 1024)
expectedRecv := uint64(2.5 * 1024 * 1024)
if cd.Stats.Bandwidth[0] != expectedSent {
t.Errorf("expected Bandwidth[0] %d, got %d", expectedSent, cd.Stats.Bandwidth[0])
}
if cd.Stats.Bandwidth[1] != expectedRecv {
t.Errorf("expected Bandwidth[1] %d, got %d", expectedRecv, cd.Stats.Bandwidth[1])
}
if cd.Stats.NetworkSent != 0 || cd.Stats.NetworkRecv != 0 {
t.Errorf("expected NetworkSent and NetworkRecv to be reset, got %f, %f", cd.Stats.NetworkSent, cd.Stats.NetworkRecv)
}
})
t.Run("Migrate Info.Bandwidth to Info.BandwidthBytes", func(t *testing.T) {
cd := &system.CombinedData{
Info: system.Info{
Bandwidth: 10.0, // 10 MB
},
}
migrateDeprecatedFields(cd, true)
expected := uint64(10 * 1024 * 1024)
if cd.Info.BandwidthBytes != expected {
t.Errorf("expected BandwidthBytes %d, got %d", expected, cd.Info.BandwidthBytes)
}
if cd.Info.Bandwidth != 0 {
t.Errorf("expected Info.Bandwidth to be reset, got %f", cd.Info.Bandwidth)
}
})
t.Run("Migrate DiskReadPs and DiskWritePs to DiskIO", func(t *testing.T) {
cd := &system.CombinedData{
Stats: system.Stats{
DiskReadPs: 3.0, // 3 MB
DiskWritePs: 4.0, // 4 MB
},
}
migrateDeprecatedFields(cd, true)
expectedRead := uint64(3 * 1024 * 1024)
expectedWrite := uint64(4 * 1024 * 1024)
if cd.Stats.DiskIO[0] != expectedRead {
t.Errorf("expected DiskIO[0] %d, got %d", expectedRead, cd.Stats.DiskIO[0])
}
if cd.Stats.DiskIO[1] != expectedWrite {
t.Errorf("expected DiskIO[1] %d, got %d", expectedWrite, cd.Stats.DiskIO[1])
}
if cd.Stats.DiskReadPs != 0 || cd.Stats.DiskWritePs != 0 {
t.Errorf("expected DiskReadPs and DiskWritePs to be reset, got %f, %f", cd.Stats.DiskReadPs, cd.Stats.DiskWritePs)
}
})
t.Run("Migrate Info fields to Details struct", func(t *testing.T) {
cd := &system.CombinedData{
Stats: system.Stats{
Mem: 16.0, // 16 GB
},
Info: system.Info{
Hostname: "test-host",
KernelVersion: "6.8.0",
Cores: 8,
Threads: 16,
CpuModel: "Intel i7",
Podman: true,
Os: system.Linux,
},
}
migrateDeprecatedFields(cd, true)
if cd.Details == nil {
t.Fatal("expected Details struct to be created")
}
if cd.Details.Hostname != "test-host" {
t.Errorf("expected Hostname 'test-host', got '%s'", cd.Details.Hostname)
}
if cd.Details.Kernel != "6.8.0" {
t.Errorf("expected Kernel '6.8.0', got '%s'", cd.Details.Kernel)
}
if cd.Details.Cores != 8 {
t.Errorf("expected Cores 8, got %d", cd.Details.Cores)
}
if cd.Details.Threads != 16 {
t.Errorf("expected Threads 16, got %d", cd.Details.Threads)
}
if cd.Details.CpuModel != "Intel i7" {
t.Errorf("expected CpuModel 'Intel i7', got '%s'", cd.Details.CpuModel)
}
if cd.Details.Podman != true {
t.Errorf("expected Podman true, got %v", cd.Details.Podman)
}
if cd.Details.Os != system.Linux {
t.Errorf("expected Os Linux, got %d", cd.Details.Os)
}
expectedMem := uint64(16 * 1024 * 1024 * 1024)
if cd.Details.MemoryTotal != expectedMem {
t.Errorf("expected MemoryTotal %d, got %d", expectedMem, cd.Details.MemoryTotal)
}
if cd.Info.Hostname != "" || cd.Info.KernelVersion != "" || cd.Info.Cores != 0 || cd.Info.CpuModel != "" || cd.Info.Podman != false || cd.Info.Os != 0 {
t.Errorf("expected Info fields to be reset, got %+v", cd.Info)
}
})
t.Run("Do not migrate if Details already exists", func(t *testing.T) {
cd := &system.CombinedData{
Details: &system.Details{Hostname: "existing-host"},
Info: system.Info{
Hostname: "deprecated-host",
},
}
migrateDeprecatedFields(cd, true)
if cd.Details.Hostname != "existing-host" {
t.Errorf("expected Hostname 'existing-host', got '%s'", cd.Details.Hostname)
}
if cd.Info.Hostname != "deprecated-host" {
t.Errorf("expected Info.Hostname to remain 'deprecated-host', got '%s'", cd.Info.Hostname)
}
})
t.Run("Do not create details if migrateDetails is false", func(t *testing.T) {
cd := &system.CombinedData{
Info: system.Info{
Hostname: "deprecated-host",
},
}
migrateDeprecatedFields(cd, false)
if cd.Details != nil {
t.Fatal("expected Details struct to not be created")
}
if cd.Info.Hostname != "" {
t.Errorf("expected Info.Hostname to be reset, got '%s'", cd.Info.Hostname)
}
})
}

View File

@@ -113,4 +113,5 @@ func (sm *SystemManager) RemoveAllSystems() {
for _, system := range sm.systems.GetAll() { for _, system := range sm.systems.GetAll() {
sm.RemoveSystem(system.Id) sm.RemoveSystem(system.Id)
} }
sm.smartFetchMap.StopCleaner()
} }

View File

@@ -16,19 +16,16 @@ import { useYAxisWidth } from "./hooks"
export default memo(function LoadAverageChart({ chartData }: { chartData: ChartData }) { export default memo(function LoadAverageChart({ chartData }: { chartData: ChartData }) {
const { yAxisWidth, updateYAxisWidth } = useYAxisWidth() const { yAxisWidth, updateYAxisWidth } = useYAxisWidth()
const keys: { legacy: keyof SystemStats; color: string; label: string }[] = [ const keys: { color: string; label: string }[] = [
{ {
legacy: "l1",
color: "hsl(271, 81%, 60%)", // Purple color: "hsl(271, 81%, 60%)", // Purple
label: t({ message: `1 min`, comment: "Load average" }), label: t({ message: `1 min`, comment: "Load average" }),
}, },
{ {
legacy: "l5",
color: "hsl(217, 91%, 60%)", // Blue color: "hsl(217, 91%, 60%)", // Blue
label: t({ message: `5 min`, comment: "Load average" }), label: t({ message: `5 min`, comment: "Load average" }),
}, },
{ {
legacy: "l15",
color: "hsl(25, 95%, 53%)", // Orange color: "hsl(25, 95%, 53%)", // Orange
label: t({ message: `15 min`, comment: "Load average" }), label: t({ message: `15 min`, comment: "Load average" }),
}, },
@@ -66,27 +63,18 @@ export default memo(function LoadAverageChart({ chartData }: { chartData: ChartD
/> />
} }
/> />
{keys.map(({ legacy, color, label }, i) => { {keys.map(({ color, label }, i) => (
const dataKey = (value: { stats: SystemStats }) => { <Line
const { minor, patch } = chartData.agentVersion key={label}
if (minor <= 12 && patch < 1) { dataKey={(value: { stats: SystemStats }) => value.stats?.la?.[i]}
return value.stats?.[legacy] name={label}
} type="monotoneX"
return value.stats?.la?.[i] ?? value.stats?.[legacy] dot={false}
} strokeWidth={1.5}
return ( stroke={color}
<Line isAnimationActive={false}
key={label} />
dataKey={dataKey} ))}
name={label}
type="monotoneX"
dot={false}
strokeWidth={1.5}
stroke={color}
isAnimationActive={false}
/>
)
})}
<ChartLegend content={<ChartLegendContent />} /> <ChartLegend content={<ChartLegendContent />} />
</LineChart> </LineChart>
</ChartContainer> </ChartContainer>

View File

@@ -654,7 +654,7 @@ export default memo(function SystemDetail({ id }: { id: string }) {
)} )}
{/* Load Average chart */} {/* Load Average chart */}
{chartData.agentVersion?.minor >= 12 && ( {chartData.agentVersion?.minor > 12 && (
<ChartCard <ChartCard
empty={dataEmpty} empty={dataEmpty}
grid={grid} grid={grid}

View File

@@ -198,32 +198,19 @@ export function SystemsTableColumns(viewMode: "table" | "grid"): ColumnDef<Syste
}, },
{ {
id: "loadAverage", id: "loadAverage",
accessorFn: ({ info }) => { accessorFn: ({ info }) => info.la?.reduce((acc, curr) => acc + curr, 0),
const sum = info.la?.reduce((acc, curr) => acc + curr, 0)
// TODO: remove this in future release in favor of la array
if (!sum) {
return (info.l1 ?? 0) + (info.l5 ?? 0) + (info.l15 ?? 0) || undefined
}
return sum || undefined
},
name: () => t({ message: "Load Avg", comment: "Short label for load average" }), name: () => t({ message: "Load Avg", comment: "Short label for load average" }),
size: 0, size: 0,
Icon: HourglassIcon, Icon: HourglassIcon,
header: sortableHeader, header: sortableHeader,
cell(info: CellContext<SystemRecord, unknown>) { cell(info: CellContext<SystemRecord, unknown>) {
const { info: sysInfo, status } = info.row.original const { info: sysInfo, status } = info.row.original
const { major, minor } = parseSemVer(sysInfo.v)
const { colorWarn = 65, colorCrit = 90 } = useStore($userSettings, { keys: ["colorWarn", "colorCrit"] }) const { colorWarn = 65, colorCrit = 90 } = useStore($userSettings, { keys: ["colorWarn", "colorCrit"] })
// agent version const loadAverages = sysInfo.la || []
const { minor, patch } = parseSemVer(sysInfo.v)
let loadAverages = sysInfo.la
// use legacy load averages if agent version is less than 12.1.0
if (!loadAverages || (minor === 12 && patch < 1)) {
loadAverages = [sysInfo.l1 ?? 0, sysInfo.l5 ?? 0, sysInfo.l15 ?? 0]
}
const max = Math.max(...loadAverages) const max = Math.max(...loadAverages)
if (max === 0 && (status === SystemStatus.Paused || minor < 12)) { if (max === 0 && (status === SystemStatus.Paused || (major < 1 && minor < 13))) {
return null return null
} }
@@ -248,19 +235,20 @@ export function SystemsTableColumns(viewMode: "table" | "grid"): ColumnDef<Syste
}, },
}, },
{ {
accessorFn: ({ info }) => info.bb || (info.b || 0) * 1024 * 1024 || undefined, accessorFn: ({ info, status }) => (status !== SystemStatus.Up ? undefined : info.bb),
id: "net", id: "net",
name: () => t`Net`, name: () => t`Net`,
size: 0, size: 0,
Icon: EthernetIcon, Icon: EthernetIcon,
header: sortableHeader, header: sortableHeader,
sortUndefined: "last",
cell(info) { cell(info) {
const sys = info.row.original const val = info.getValue() as number | undefined
const userSettings = useStore($userSettings, { keys: ["unitNet"] }) if (val === undefined) {
if (sys.status === SystemStatus.Paused) {
return null return null
} }
const { value, unit } = formatBytes((info.getValue() || 0) as number, true, userSettings.unitNet, false) const userSettings = useStore($userSettings, { keys: ["unitNet"] })
const { value, unit } = formatBytes(val, true, userSettings.unitNet, false)
return ( return (
<span className="tabular-nums whitespace-nowrap"> <span className="tabular-nums whitespace-nowrap">
{decimalString(value, value >= 100 ? 1 : 2)} {unit} {decimalString(value, value >= 100 ? 1 : 2)} {unit}

View File

@@ -45,12 +45,6 @@ export interface SystemInfo {
c: number c: number
/** cpu model */ /** cpu model */
m: string m: string
/** load average 1 minute */
l1?: number
/** load average 5 minutes */
l5?: number
/** load average 15 minutes */
l15?: number
/** load average */ /** load average */
la?: [number, number, number] la?: [number, number, number]
/** operating system */ /** operating system */
@@ -94,13 +88,6 @@ export interface SystemStats {
cpub?: number[] cpub?: number[]
/** per-core cpu usage [CPU0..] (0-100 integers) */ /** per-core cpu usage [CPU0..] (0-100 integers) */
cpus?: number[] cpus?: number[]
// TODO: remove these in future release in favor of la
/** load average 1 minute */
l1?: number
/** load average 5 minutes */
l5?: number
/** load average 15 minutes */
l15?: number
/** load average */ /** load average */
la?: [number, number, number] la?: [number, number, number]
/** total memory (gb) */ /** total memory (gb) */