diff --git a/internal/alerts/alerts.go b/internal/alerts/alerts.go index b5ed61c0..6e0c5788 100644 --- a/internal/alerts/alerts.go +++ b/internal/alerts/alerts.go @@ -23,6 +23,7 @@ type AlertManager struct { hub hubLike stopOnce sync.Once pendingAlerts sync.Map + alertsCache *AlertsCache } type AlertMessageData struct { @@ -63,7 +64,7 @@ type SystemAlertGPUData struct { type SystemAlertData struct { systemRecord *core.Record - alertRecord *core.Record + alertRecord CachedAlertData name string unit string val float64 @@ -97,7 +98,8 @@ var supportsTitle = map[string]struct{}{ // NewAlertManager creates a new AlertManager instance. func NewAlertManager(app hubLike) *AlertManager { am := &AlertManager{ - hub: app, + hub: app, + alertsCache: NewAlertsCache(app), } am.bindEvents() return am @@ -110,6 +112,9 @@ func (am *AlertManager) bindEvents() { am.hub.OnRecordAfterUpdateSuccess("smart_devices").BindFunc(am.handleSmartDeviceAlert) am.hub.OnServe().BindFunc(func(e *core.ServeEvent) error { + // Populate all alerts into cache on startup + _ = am.alertsCache.PopulateFromDB(true) + if err := resolveStatusAlerts(e.App); err != nil { e.App.Logger().Error("Failed to resolve stale status alerts", "err", err) } @@ -311,3 +316,13 @@ func (am *AlertManager) SendTestNotification(e *core.RequestEvent) error { } return e.JSON(200, map[string]bool{"err": false}) } + +// setAlertTriggered updates the "triggered" status of an alert record in the database +func (am *AlertManager) setAlertTriggered(alert CachedAlertData, triggered bool) error { + alertRecord, err := am.hub.FindRecordById("alerts", alert.Id) + if err != nil { + return err + } + alertRecord.Set("triggered", triggered) + return am.hub.Save(alertRecord) +} diff --git a/internal/alerts/alerts_status.go b/internal/alerts/alerts_status.go index b43a2a9a..224f62b4 100644 --- a/internal/alerts/alerts_status.go +++ b/internal/alerts/alerts_status.go @@ -5,13 +5,12 @@ import ( "strings" "time" - "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/core" ) type alertInfo struct { systemName string - alertRecord *core.Record + alertRecord CachedAlertData expireTime time.Time timer *time.Timer } @@ -36,10 +35,7 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, systemRecord *core. return nil } - alertRecords, err := am.getSystemStatusAlerts(systemRecord.Id) - if err != nil { - return err - } + alertRecords := am.alertsCache.GetAlertsByName(systemRecord.Id, "Status") if len(alertRecords) == 0 { return nil } @@ -53,29 +49,17 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, systemRecord *core. return nil } -// getSystemStatusAlerts retrieves all "Status" alert records for a given system ID. -func (am *AlertManager) getSystemStatusAlerts(systemID string) ([]*core.Record, error) { - alertRecords, err := am.hub.FindAllRecords("alerts", dbx.HashExp{ - "system": systemID, - "name": "Status", - }) - if err != nil { - return nil, err - } - return alertRecords, nil -} - // handleSystemDown manages the logic when a system status changes to "down". It schedules pending alerts for each alert record. -func (am *AlertManager) handleSystemDown(systemName string, alertRecords []*core.Record) { +func (am *AlertManager) handleSystemDown(systemName string, alertRecords []CachedAlertData) { for _, alertRecord := range alertRecords { - min := max(1, alertRecord.GetInt("min")) + min := max(1, int(alertRecord.Min)) am.schedulePendingStatusAlert(systemName, alertRecord, time.Duration(min)*time.Minute) } } // schedulePendingStatusAlert sets up a timer to send a "down" alert after the specified delay if the system is still down. // It returns true if the alert was scheduled, or false if an alert was already pending for the given alert record. -func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecord *core.Record, delay time.Duration) bool { +func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecord CachedAlertData, delay time.Duration) bool { alert := &alertInfo{ systemName: systemName, alertRecord: alertRecord, @@ -96,13 +80,13 @@ func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecor // handleSystemUp manages the logic when a system status changes to "up". // It cancels any pending alerts and sends "up" alerts. -func (am *AlertManager) handleSystemUp(systemName string, alertRecords []*core.Record) { +func (am *AlertManager) handleSystemUp(systemName string, alertRecords []CachedAlertData) { for _, alertRecord := range alertRecords { // If alert exists for record, delete and continue (down alert not sent) if am.cancelPendingAlert(alertRecord.Id) { continue } - if !alertRecord.GetBool("triggered") { + if !alertRecord.Triggered { continue } if err := am.sendStatusAlert("up", systemName, alertRecord); err != nil { @@ -133,23 +117,22 @@ func (am *AlertManager) processPendingAlert(alertID string) { } info := value.(*alertInfo) - if info.alertRecord.GetBool("triggered") { + alertRecord, ok := am.alertsCache.Refresh(info.alertRecord) + if !ok || alertRecord.Triggered { return } - if err := am.sendStatusAlert("down", info.systemName, info.alertRecord); err != nil { + if err := am.sendStatusAlert("down", info.systemName, alertRecord); err != nil { am.hub.Logger().Error("Failed to send alert", "err", err) } } // sendStatusAlert sends a status alert ("up" or "down") to the users associated with the alert records. -func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, alertRecord *core.Record) error { - switch alertStatus { - case "up": - alertRecord.Set("triggered", false) - case "down": - alertRecord.Set("triggered", true) +func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, alertRecord CachedAlertData) error { + // Update trigger state for alert record before sending alert + triggered := alertStatus == "down" + if err := am.setAlertTriggered(alertRecord, triggered); err != nil { + return err } - am.hub.Save(alertRecord) var emoji string if alertStatus == "up" { @@ -162,10 +145,10 @@ func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, a message := strings.TrimSuffix(title, emoji) // Get system ID for the link - systemID := alertRecord.GetString("system") + systemID := alertRecord.SystemID return am.SendAlert(AlertMessageData{ - UserID: alertRecord.GetString("user"), + UserID: alertRecord.UserID, SystemID: systemID, Title: title, Message: message, @@ -211,12 +194,13 @@ func resolveStatusAlerts(app core.App) error { func (am *AlertManager) restorePendingStatusAlerts() error { type pendingStatusAlert struct { AlertID string `db:"alert_id"` + SystemID string `db:"system_id"` SystemName string `db:"system_name"` } var pending []pendingStatusAlert err := am.hub.DB().NewQuery(` - SELECT a.id AS alert_id, s.name AS system_name + SELECT a.id AS alert_id, a.system AS system_id, s.name AS system_name FROM alerts a JOIN systems s ON a.system = s.id WHERE a.name = 'Status' @@ -227,12 +211,15 @@ func (am *AlertManager) restorePendingStatusAlerts() error { return err } + // Make sure cache is populated before trying to restore pending alerts + _ = am.alertsCache.PopulateFromDB(false) + for _, item := range pending { - alertRecord, err := am.hub.FindRecordById("alerts", item.AlertID) - if err != nil { - return err + alertRecord, ok := am.alertsCache.GetAlert(item.SystemID, item.AlertID) + if !ok { + continue } - min := max(1, alertRecord.GetInt("min")) + min := max(1, int(alertRecord.Min)) am.schedulePendingStatusAlert(item.SystemName, alertRecord, time.Duration(min)*time.Minute) } diff --git a/internal/alerts/alerts_status_test.go b/internal/alerts/alerts_status_test.go index 19205e94..27b6f91a 100644 --- a/internal/alerts/alerts_status_test.go +++ b/internal/alerts/alerts_status_test.go @@ -626,3 +626,130 @@ func TestResolveStatusAlerts(t *testing.T) { assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record") } + +func TestAlertsHistoryStatus(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + // Create a system + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up") + assert.NoError(t, err) + system := systems[0] + + // Create a status alertRecord for the system + alertRecord, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": system.Id, + "user": user.Id, + "min": 1, + }) + assert.NoError(t, err) + + // Verify alert is not triggered initially + assert.False(t, alertRecord.GetBool("triggered"), "Alert should not be triggered initially") + + // Set the system to 'down' (this should trigger the alert) + system.Set("status", "down") + err = hub.Save(system) + assert.NoError(t, err) + + time.Sleep(time.Second * 30) + synctest.Wait() + + alertFresh, _ := hub.FindRecordById("alerts", alertRecord.Id) + assert.False(t, alertFresh.GetBool("triggered"), "Alert should not be triggered after 30 seconds") + + time.Sleep(time.Minute) + synctest.Wait() + + // Verify alert is triggered after setting system to down + alertFresh, err = hub.FindRecordById("alerts", alertRecord.Id) + assert.NoError(t, err) + assert.True(t, alertFresh.GetBool("triggered"), "Alert should be triggered after one minute") + + // Verify we have one unresolved alert history record + alertHistoryCount, err := hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""}) + assert.NoError(t, err) + assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record") + + // Set the system back to 'up' (this should resolve the alert) + system.Set("status", "up") + err = hub.Save(system) + assert.NoError(t, err) + + time.Sleep(time.Second) + synctest.Wait() + + // Verify alert is not triggered after setting system back to up + alertFresh, err = hub.FindRecordById("alerts", alertRecord.Id) + assert.NoError(t, err) + assert.False(t, alertFresh.GetBool("triggered"), "Alert should not be triggered after system recovers") + + // Verify the alert history record is resolved + alertHistoryCount, err = hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""}) + assert.NoError(t, err) + assert.EqualValues(t, 0, alertHistoryCount, "Should have no unresolved alert history records") + }) +} + +func TestStatusAlertClearedBeforeSend(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + // Create a system + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up") + assert.NoError(t, err) + system := systems[0] + + // Ensure user settings have an email + userSettings, _ := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + hub.Save(userSettings) + + // Initial email count + initialEmailCount := hub.TestMailer.TotalSend() + + // Create a status alertRecord for the system + alertRecord, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": system.Id, + "user": user.Id, + "min": 1, + }) + assert.NoError(t, err) + + // Verify alert is not triggered initially + assert.False(t, alertRecord.GetBool("triggered"), "Alert should not be triggered initially") + + // Set the system to 'down' (this should trigger the alert) + system.Set("status", "down") + err = hub.Save(system) + assert.NoError(t, err) + + time.Sleep(time.Second * 30) + synctest.Wait() + + // Set system back up to clear the pending alert before it triggers + system.Set("status", "up") + err = hub.Save(system) + assert.NoError(t, err) + + time.Sleep(time.Minute) + synctest.Wait() + + // Verify that we have not sent any emails since the system recovered before the alert triggered + assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "No email should be sent if system recovers before alert triggers") + + // Verify alert is not triggered after setting system back to up + alertFresh, err := hub.FindRecordById("alerts", alertRecord.Id) + assert.NoError(t, err) + assert.False(t, alertFresh.GetBool("triggered"), "Alert should not be triggered after system recovers") + + // Verify that no alert history record was created since the alert never triggered + alertHistoryCount, err := hub.CountRecords("alerts_history") + assert.NoError(t, err) + assert.EqualValues(t, 0, alertHistoryCount, "Should have no unresolved alert history records since alert never triggered") + }) +} diff --git a/internal/alerts/alerts_system.go b/internal/alerts/alerts_system.go index a1a8ec5c..6961365d 100644 --- a/internal/alerts/alerts_system.go +++ b/internal/alerts/alerts_system.go @@ -14,11 +14,8 @@ import ( ) func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *system.CombinedData) error { - alertRecords, err := am.hub.FindAllRecords("alerts", - dbx.NewExp("system={:system} AND name!='Status'", dbx.Params{"system": systemRecord.Id}), - ) - if err != nil || len(alertRecords) == 0 { - // log.Println("no alerts found for system") + alertRecords := am.alertsCache.GetAlertsExcludingNames(systemRecord.Id, "Status") + if len(alertRecords) == 0 { return nil } @@ -27,7 +24,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst oldestTime := now for _, alertRecord := range alertRecords { - name := alertRecord.GetString("name") + name := alertRecord.Name var val float64 unit := "%" @@ -72,8 +69,8 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst val = float64(data.Stats.Battery[0]) } - triggered := alertRecord.GetBool("triggered") - threshold := alertRecord.GetFloat("value") + triggered := alertRecord.Triggered + threshold := alertRecord.Value // Battery alert has inverted logic: trigger when value is BELOW threshold lowAlert := isLowAlert(name) @@ -91,7 +88,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst } } - min := max(1, uint8(alertRecord.GetInt("min"))) + min := max(1, alertRecord.Min) alert := SystemAlertData{ systemRecord: systemRecord, @@ -128,7 +125,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst Created types.DateTime `db:"created"` }{} - err = am.hub.DB(). + err := am.hub.DB(). Select("stats", "created"). From("system_stats"). Where(dbx.NewExp( @@ -343,13 +340,12 @@ func (am *AlertManager) sendSystemAlert(alert SystemAlertData) { } body := fmt.Sprintf("%s averaged %.2f%s for the previous %v %s.", alert.descriptor, alert.val, alert.unit, alert.min, minutesLabel) - alert.alertRecord.Set("triggered", alert.triggered) - if err := am.hub.Save(alert.alertRecord); err != nil { + if err := am.setAlertTriggered(alert.alertRecord, alert.triggered); err != nil { // app.Logger().Error("failed to save alert record", "err", err) return } am.SendAlert(AlertMessageData{ - UserID: alert.alertRecord.GetString("user"), + UserID: alert.alertRecord.UserID, SystemID: alert.systemRecord.Id, Title: subject, Message: body, diff --git a/internal/alerts/alerts_system_cache.go b/internal/alerts/alerts_system_cache.go new file mode 100644 index 00000000..ee0e318b --- /dev/null +++ b/internal/alerts/alerts_system_cache.go @@ -0,0 +1,177 @@ +package alerts + +import ( + "github.com/pocketbase/dbx" + "github.com/pocketbase/pocketbase/core" + "github.com/pocketbase/pocketbase/tools/store" +) + +// CachedAlertData represents the relevant fields of an alert record for status checking and updates. +type CachedAlertData struct { + Id string + SystemID string + UserID string + Name string + Value float64 + Triggered bool + Min uint8 + // Created types.DateTime +} + +func (a *CachedAlertData) PopulateFromRecord(record *core.Record) { + a.Id = record.Id + a.SystemID = record.GetString("system") + a.UserID = record.GetString("user") + a.Name = record.GetString("name") + a.Value = record.GetFloat("value") + a.Triggered = record.GetBool("triggered") + a.Min = uint8(record.GetInt("min")) + // a.Created = record.GetDateTime("created") +} + +// AlertsCache provides an in-memory cache for system alerts. +type AlertsCache struct { + app core.App + store *store.Store[string, *store.Store[string, CachedAlertData]] + populated bool +} + +// NewAlertsCache creates a new instance of SystemAlertsCache. +func NewAlertsCache(app core.App) *AlertsCache { + c := AlertsCache{ + app: app, + store: store.New(map[string]*store.Store[string, CachedAlertData]{}), + } + return c.bindEvents() +} + +// bindEvents sets up event listeners to keep the cache in sync with database changes. +func (c *AlertsCache) bindEvents() *AlertsCache { + c.app.OnRecordAfterUpdateSuccess("alerts").BindFunc(func(e *core.RecordEvent) error { + // c.Delete(e.Record.Original()) // this would be needed if the system field on an existing alert was changed, however we don't currently allow that in the UI so we'll leave it commented out + c.Update(e.Record) + return e.Next() + }) + c.app.OnRecordAfterDeleteSuccess("alerts").BindFunc(func(e *core.RecordEvent) error { + c.Delete(e.Record) + return e.Next() + }) + c.app.OnRecordAfterCreateSuccess("alerts").BindFunc(func(e *core.RecordEvent) error { + c.Update(e.Record) + return e.Next() + }) + return c +} + +// PopulateFromDB clears current entries and loads all alerts from the database into the cache. +func (c *AlertsCache) PopulateFromDB(force bool) error { + if !force && c.populated { + return nil + } + records, err := c.app.FindAllRecords("alerts") + if err != nil { + return err + } + c.store.RemoveAll() + for _, record := range records { + c.Update(record) + } + c.populated = true + return nil +} + +// Update adds or updates an alert record in the cache. +func (c *AlertsCache) Update(record *core.Record) { + systemID := record.GetString("system") + if systemID == "" { + return + } + systemStore, ok := c.store.GetOk(systemID) + if !ok { + systemStore = store.New(map[string]CachedAlertData{}) + c.store.Set(systemID, systemStore) + } + var ca CachedAlertData + ca.PopulateFromRecord(record) + systemStore.Set(record.Id, ca) +} + +// Delete removes an alert record from the cache. +func (c *AlertsCache) Delete(record *core.Record) { + systemID := record.GetString("system") + if systemID == "" { + return + } + if systemStore, ok := c.store.GetOk(systemID); ok { + systemStore.Remove(record.Id) + } +} + +// GetSystemAlerts returns all alerts for the specified system, lazy-loading if necessary. +func (c *AlertsCache) GetSystemAlerts(systemID string) []CachedAlertData { + systemStore, ok := c.store.GetOk(systemID) + if !ok { + // Populate cache for this system + records, err := c.app.FindAllRecords("alerts", dbx.NewExp("system={:system}", dbx.Params{"system": systemID})) + if err != nil { + return nil + } + systemStore = store.New(map[string]CachedAlertData{}) + for _, record := range records { + var ca CachedAlertData + ca.PopulateFromRecord(record) + systemStore.Set(record.Id, ca) + } + c.store.Set(systemID, systemStore) + } + all := systemStore.GetAll() + records := make([]CachedAlertData, 0, len(all)) + for _, alert := range all { + records = append(records, alert) + } + return records +} + +// GetAlert returns a specific alert by its ID from the cache. +func (c *AlertsCache) GetAlert(systemID, alertID string) (CachedAlertData, bool) { + if systemStore, ok := c.store.GetOk(systemID); ok { + return systemStore.GetOk(alertID) + } + return CachedAlertData{}, false +} + +// GetAlertsByName returns all alerts of a specific type for the specified system. +func (c *AlertsCache) GetAlertsByName(systemID, alertName string) []CachedAlertData { + allAlerts := c.GetSystemAlerts(systemID) + var alertRecords []CachedAlertData + for _, record := range allAlerts { + if record.Name == alertName { + alertRecords = append(alertRecords, record) + } + } + return alertRecords +} + +// GetAlertsExcludingNames returns all alerts for the specified system excluding the given types. +func (c *AlertsCache) GetAlertsExcludingNames(systemID string, excludedNames ...string) []CachedAlertData { + excludeMap := make(map[string]struct{}) + for _, name := range excludedNames { + excludeMap[name] = struct{}{} + } + allAlerts := c.GetSystemAlerts(systemID) + var alertRecords []CachedAlertData + for _, record := range allAlerts { + if _, excluded := excludeMap[record.Name]; !excluded { + alertRecords = append(alertRecords, record) + } + } + return alertRecords +} + +// Refresh returns the latest cached copy for an alert snapshot if it still exists. +func (c *AlertsCache) Refresh(alert CachedAlertData) (CachedAlertData, bool) { + if alert.Id == "" { + return CachedAlertData{}, false + } + return c.GetAlert(alert.SystemID, alert.Id) +} diff --git a/internal/alerts/alerts_system_cache_test.go b/internal/alerts/alerts_system_cache_test.go new file mode 100644 index 00000000..13dc8588 --- /dev/null +++ b/internal/alerts/alerts_system_cache_test.go @@ -0,0 +1,215 @@ +//go:build testing + +package alerts_test + +import ( + "testing" + + "github.com/henrygd/beszel/internal/alerts" + beszelTests "github.com/henrygd/beszel/internal/tests" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSystemAlertsCachePopulateAndFilter(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systems, err := beszelTests.CreateSystems(hub, 2, user.Id, "up") + require.NoError(t, err) + system1 := systems[0] + system2 := systems[1] + + statusAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": system1.Id, + "user": user.Id, + "min": 1, + }) + require.NoError(t, err) + + cpuAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "CPU", + "system": system1.Id, + "user": user.Id, + "value": 80, + "min": 1, + }) + require.NoError(t, err) + + memoryAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Memory", + "system": system2.Id, + "user": user.Id, + "value": 90, + "min": 1, + }) + require.NoError(t, err) + + cache := alerts.NewAlertsCache(hub) + cache.PopulateFromDB(false) + + statusAlerts := cache.GetAlertsByName(system1.Id, "Status") + require.Len(t, statusAlerts, 1) + assert.Equal(t, statusAlert.Id, statusAlerts[0].Id) + + nonStatusAlerts := cache.GetAlertsExcludingNames(system1.Id, "Status") + require.Len(t, nonStatusAlerts, 1) + assert.Equal(t, cpuAlert.Id, nonStatusAlerts[0].Id) + + system2Alerts := cache.GetSystemAlerts(system2.Id) + require.Len(t, system2Alerts, 1) + assert.Equal(t, memoryAlert.Id, system2Alerts[0].Id) +} + +func TestSystemAlertsCacheLazyLoadUpdateAndDelete(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up") + require.NoError(t, err) + systemRecord := systems[0] + + statusAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": systemRecord.Id, + "user": user.Id, + "min": 1, + }) + require.NoError(t, err) + + cache := alerts.NewAlertsCache(hub) + require.Len(t, cache.GetSystemAlerts(systemRecord.Id), 1, "first lookup should lazy-load alerts for the system") + + cpuAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "CPU", + "system": systemRecord.Id, + "user": user.Id, + "value": 80, + "min": 1, + }) + require.NoError(t, err) + + cache.Update(cpuAlert) + + nonStatusAlerts := cache.GetAlertsExcludingNames(systemRecord.Id, "Status") + require.Len(t, nonStatusAlerts, 1) + assert.Equal(t, cpuAlert.Id, nonStatusAlerts[0].Id) + + cache.Delete(statusAlert) + assert.Empty(t, cache.GetAlertsByName(systemRecord.Id, "Status"), "deleted alerts should be removed from the in-memory cache") +} + +func TestSystemAlertsCacheRefreshReturnsLatestCopy(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up") + require.NoError(t, err) + system := systems[0] + + alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": system.Id, + "user": user.Id, + "min": 1, + "triggered": false, + }) + require.NoError(t, err) + + cache := alerts.NewAlertsCache(hub) + snapshot := cache.GetSystemAlerts(system.Id)[0] + assert.False(t, snapshot.Triggered) + + alert.Set("triggered", true) + require.NoError(t, hub.Save(alert)) + + refreshed, ok := cache.Refresh(snapshot) + require.True(t, ok) + assert.Equal(t, snapshot.Id, refreshed.Id) + assert.True(t, refreshed.Triggered, "refresh should return the updated cached value rather than the stale snapshot") + + require.NoError(t, hub.Delete(alert)) + _, ok = cache.Refresh(snapshot) + assert.False(t, ok, "refresh should report false when the cached alert no longer exists") +} + +func TestAlertManagerCacheLifecycle(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up") + require.NoError(t, err) + system := systems[0] + + // Create an alert + alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "CPU", + "system": system.Id, + "user": user.Id, + "value": 80, + "min": 1, + }) + require.NoError(t, err) + + am := hub.AlertManager + cache := am.GetSystemAlertsCache() + + // Verify it's in cache (it should be since CreateRecord triggers the event) + assert.Len(t, cache.GetSystemAlerts(system.Id), 1) + assert.Equal(t, alert.Id, cache.GetSystemAlerts(system.Id)[0].Id) + assert.EqualValues(t, 80, cache.GetSystemAlerts(system.Id)[0].Value) + + // Update the alert through PocketBase to trigger events + alert.Set("value", 85) + require.NoError(t, hub.Save(alert)) + + // Check if updated value is reflected (or at least that it's still there) + cachedAlerts := cache.GetSystemAlerts(system.Id) + assert.Len(t, cachedAlerts, 1) + assert.EqualValues(t, 85, cachedAlerts[0].Value) + + // Delete the alert through PocketBase to trigger events + require.NoError(t, hub.Delete(alert)) + + // Verify it's removed from cache + assert.Empty(t, cache.GetSystemAlerts(system.Id), "alert should be removed from cache after PocketBase delete") +} + +// func TestAlertManagerCacheMovesAlertToNewSystemOnUpdate(t *testing.T) { +// hub, user := beszelTests.GetHubWithUser(t) +// defer hub.Cleanup() + +// systems, err := beszelTests.CreateSystems(hub, 2, user.Id, "up") +// require.NoError(t, err) +// system1 := systems[0] +// system2 := systems[1] + +// alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ +// "name": "CPU", +// "system": system1.Id, +// "user": user.Id, +// "value": 80, +// "min": 1, +// }) +// require.NoError(t, err) + +// am := hub.AlertManager +// cache := am.GetSystemAlertsCache() + +// // Initially in system1 cache +// assert.Len(t, cache.Get(system1.Id), 1) +// assert.Empty(t, cache.Get(system2.Id)) + +// // Move alert to system2 +// alert.Set("system", system2.Id) +// require.NoError(t, hub.Save(alert)) + +// // DEBUG: print if it is found +// // fmt.Printf("system1 alerts after update: %v\n", cache.Get(system1.Id)) + +// // Should be removed from system1 and present in system2 +// assert.Empty(t, cache.GetType(system1.Id, "CPU"), "updated alerts should be evicted from the previous system cache") +// require.Len(t, cache.Get(system2.Id), 1) +// assert.Equal(t, alert.Id, cache.Get(system2.Id)[0].Id) +// } diff --git a/internal/alerts/alerts_test.go b/internal/alerts/alerts_test.go index e5f656dd..82b056a6 100644 --- a/internal/alerts/alerts_test.go +++ b/internal/alerts/alerts_test.go @@ -14,6 +14,7 @@ import ( beszelTests "github.com/henrygd/beszel/internal/tests" + "github.com/henrygd/beszel/internal/alerts" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/core" pbTests "github.com/pocketbase/pocketbase/tests" @@ -496,3 +497,46 @@ func TestAlertsHistory(t *testing.T) { assert.EqualValues(t, 2, totalHistoryCount, "Should have 2 total alert history records") }) } + +func TestSetAlertTriggered(t *testing.T) { + hub, _ := beszelTests.NewTestHub(t.TempDir()) + defer hub.Cleanup() + + hub.StartHub() + + user, _ := beszelTests.CreateUser(hub, "test@example.com", "password") + system, _ := beszelTests.CreateRecord(hub, "systems", map[string]any{ + "name": "test-system", + "users": []string{user.Id}, + "host": "127.0.0.1", + }) + + alertRecord, _ := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "CPU", + "system": system.Id, + "user": user.Id, + "value": 80, + "triggered": false, + }) + + am := alerts.NewAlertManager(hub) + + var alert alerts.CachedAlertData + alert.PopulateFromRecord(alertRecord) + + // Test triggering the alert + err := am.SetAlertTriggered(alert, true) + assert.NoError(t, err) + + updatedRecord, err := hub.FindRecordById("alerts", alert.Id) + assert.NoError(t, err) + assert.True(t, updatedRecord.GetBool("triggered")) + + // Test un-triggering the alert + err = am.SetAlertTriggered(alert, false) + assert.NoError(t, err) + + updatedRecord, err = hub.FindRecordById("alerts", alert.Id) + assert.NoError(t, err) + assert.False(t, updatedRecord.GetBool("triggered")) +} diff --git a/internal/alerts/alerts_test_helpers.go b/internal/alerts/alerts_test_helpers.go index f3f2a303..2fc319b0 100644 --- a/internal/alerts/alerts_test_helpers.go +++ b/internal/alerts/alerts_test_helpers.go @@ -11,10 +11,16 @@ import ( func NewTestAlertManagerWithoutWorker(app hubLike) *AlertManager { return &AlertManager{ - hub: app, + hub: app, + alertsCache: NewAlertsCache(app), } } +// GetSystemAlertsCache returns the internal system alerts cache. +func (am *AlertManager) GetSystemAlertsCache() *AlertsCache { + return am.alertsCache +} + func (am *AlertManager) GetAlertManager() *AlertManager { return am } @@ -33,10 +39,10 @@ func (am *AlertManager) GetPendingAlertsCount() int { } // ProcessPendingAlerts manually processes all expired alerts (for testing) -func (am *AlertManager) ProcessPendingAlerts() ([]*core.Record, error) { +func (am *AlertManager) ProcessPendingAlerts() ([]CachedAlertData, error) { now := time.Now() var lastErr error - var processedAlerts []*core.Record + var processedAlerts []CachedAlertData am.pendingAlerts.Range(func(key, value any) bool { info := value.(*alertInfo) if now.After(info.expireTime) { @@ -85,3 +91,7 @@ func ResolveStatusAlerts(app core.App) error { func (am *AlertManager) RestorePendingStatusAlerts() error { return am.restorePendingStatusAlerts() } + +func (am *AlertManager) SetAlertTriggered(alert CachedAlertData, triggered bool) error { + return am.setAlertTriggered(alert, triggered) +}