From 8e2316f845fef6ab5051afcab08f5e1afd78a720 Mon Sep 17 00:00:00 2001 From: henrygd Date: Thu, 12 Mar 2026 15:53:40 -0400 Subject: [PATCH] refactor: simplify/improve status alert handling (#1519) also adds new functionality to restore any pending down alerts that were lost by hub restart before creation --- internal/alerts/alerts.go | 18 +- internal/alerts/alerts_quiet_hours_test.go | 14 +- internal/alerts/alerts_status.go | 172 +++--- internal/alerts/alerts_status_test.go | 628 +++++++++++++++++++++ internal/alerts/alerts_test.go | 181 ------ internal/alerts/alerts_test_helpers.go | 34 +- internal/tests/hub.go | 2 +- 7 files changed, 779 insertions(+), 270 deletions(-) create mode 100644 internal/alerts/alerts_status_test.go diff --git a/internal/alerts/alerts.go b/internal/alerts/alerts.go index ab427fc3..b5ed61c0 100644 --- a/internal/alerts/alerts.go +++ b/internal/alerts/alerts.go @@ -21,8 +21,7 @@ type hubLike interface { type AlertManager struct { hub hubLike - alertQueue chan alertTask - stopChan chan struct{} + stopOnce sync.Once pendingAlerts sync.Map } @@ -98,12 +97,9 @@ var supportsTitle = map[string]struct{}{ // NewAlertManager creates a new AlertManager instance. func NewAlertManager(app hubLike) *AlertManager { am := &AlertManager{ - hub: app, - alertQueue: make(chan alertTask, 5), - stopChan: make(chan struct{}), + hub: app, } am.bindEvents() - go am.startWorker() return am } @@ -112,6 +108,16 @@ func (am *AlertManager) bindEvents() { am.hub.OnRecordAfterUpdateSuccess("alerts").BindFunc(updateHistoryOnAlertUpdate) am.hub.OnRecordAfterDeleteSuccess("alerts").BindFunc(resolveHistoryOnAlertDelete) am.hub.OnRecordAfterUpdateSuccess("smart_devices").BindFunc(am.handleSmartDeviceAlert) + + am.hub.OnServe().BindFunc(func(e *core.ServeEvent) error { + if err := resolveStatusAlerts(e.App); err != nil { + e.App.Logger().Error("Failed to resolve stale status alerts", "err", err) + } + if err := am.restorePendingStatusAlerts(); err != nil { + e.App.Logger().Error("Failed to restore pending status alerts", "err", err) + } + return e.Next() + }) } // IsNotificationSilenced checks if a notification should be silenced based on configured quiet hours diff --git a/internal/alerts/alerts_quiet_hours_test.go b/internal/alerts/alerts_quiet_hours_test.go index 78d2c667..94106d5e 100644 --- a/internal/alerts/alerts_quiet_hours_test.go +++ b/internal/alerts/alerts_quiet_hours_test.go @@ -49,7 +49,7 @@ func TestAlertSilencedOneTime(t *testing.T) { // Get alert manager am := alerts.NewAlertManager(hub) - defer am.StopWorker() + defer am.Stop() // Test that alert is silenced silenced := am.IsNotificationSilenced(user.Id, system.Id) @@ -106,7 +106,7 @@ func TestAlertSilencedDaily(t *testing.T) { // Get alert manager am := alerts.NewAlertManager(hub) - defer am.StopWorker() + defer am.Stop() // Get current hour and create a window that includes current time now := time.Now().UTC() @@ -170,7 +170,7 @@ func TestAlertSilencedDailyMidnightCrossing(t *testing.T) { // Get alert manager am := alerts.NewAlertManager(hub) - defer am.StopWorker() + defer am.Stop() // Create a window that crosses midnight: 22:00 - 02:00 startTime := time.Date(2000, 1, 1, 22, 0, 0, 0, time.UTC) @@ -211,7 +211,7 @@ func TestAlertSilencedGlobal(t *testing.T) { // Get alert manager am := alerts.NewAlertManager(hub) - defer am.StopWorker() + defer am.Stop() // Create a global quiet hours window (no system specified) now := time.Now().UTC() @@ -250,7 +250,7 @@ func TestAlertSilencedSystemSpecific(t *testing.T) { // Get alert manager am := alerts.NewAlertManager(hub) - defer am.StopWorker() + defer am.Stop() // Create a system-specific quiet hours window for system1 only now := time.Now().UTC() @@ -296,7 +296,7 @@ func TestAlertSilencedMultiUser(t *testing.T) { // Get alert manager am := alerts.NewAlertManager(hub) - defer am.StopWorker() + defer am.Stop() // Create a quiet hours window for user1 only now := time.Now().UTC() @@ -417,7 +417,7 @@ func TestAlertSilencedNoWindows(t *testing.T) { // Get alert manager am := alerts.NewAlertManager(hub) - defer am.StopWorker() + defer am.Stop() // Without any quiet hours windows, alert should NOT be silenced silenced := am.IsNotificationSilenced(user.Id, system.Id) diff --git a/internal/alerts/alerts_status.go b/internal/alerts/alerts_status.go index 225a0c55..b43a2a9a 100644 --- a/internal/alerts/alerts_status.go +++ b/internal/alerts/alerts_status.go @@ -9,63 +9,25 @@ import ( "github.com/pocketbase/pocketbase/core" ) -type alertTask struct { - action string // "schedule" or "cancel" - systemName string - alertRecord *core.Record - delay time.Duration -} - type alertInfo struct { systemName string alertRecord *core.Record expireTime time.Time + timer *time.Timer } -// startWorker is a long-running goroutine that processes alert tasks -// every x seconds. It must be running to process status alerts. -func (am *AlertManager) startWorker() { - processPendingAlerts := time.Tick(15 * time.Second) - - // check for status alerts that are not resolved when system comes up - // (can be removed if we figure out core bug in #1052) - checkStatusAlerts := time.Tick(561 * time.Second) - - for { - select { - case <-am.stopChan: - return - case task := <-am.alertQueue: - switch task.action { - case "schedule": - am.pendingAlerts.Store(task.alertRecord.Id, &alertInfo{ - systemName: task.systemName, - alertRecord: task.alertRecord, - expireTime: time.Now().Add(task.delay), - }) - case "cancel": - am.pendingAlerts.Delete(task.alertRecord.Id) +// Stop cancels all pending status alert timers. +func (am *AlertManager) Stop() { + am.stopOnce.Do(func() { + am.pendingAlerts.Range(func(key, value any) bool { + info := value.(*alertInfo) + if info.timer != nil { + info.timer.Stop() } - case <-checkStatusAlerts: - resolveStatusAlerts(am.hub) - case <-processPendingAlerts: - // Check for expired alerts every tick - now := time.Now() - for key, value := range am.pendingAlerts.Range { - info := value.(*alertInfo) - if now.After(info.expireTime) { - // Downtime delay has passed, process alert - am.sendStatusAlert("down", info.systemName, info.alertRecord) - am.pendingAlerts.Delete(key) - } - } - } - } -} - -// StopWorker shuts down the AlertManager.worker goroutine -func (am *AlertManager) StopWorker() { - close(am.stopChan) + am.pendingAlerts.Delete(key) + return true + }) + }) } // HandleStatusAlerts manages the logic when system status changes. @@ -103,38 +65,43 @@ func (am *AlertManager) getSystemStatusAlerts(systemID string) ([]*core.Record, return alertRecords, nil } -// Schedules delayed "down" alerts for each alert record. +// handleSystemDown manages the logic when a system status changes to "down". It schedules pending alerts for each alert record. func (am *AlertManager) handleSystemDown(systemName string, alertRecords []*core.Record) { for _, alertRecord := range alertRecords { - // Continue if alert is already scheduled - if _, exists := am.pendingAlerts.Load(alertRecord.Id); exists { - continue - } - // Schedule by adding to queue min := max(1, alertRecord.GetInt("min")) - am.alertQueue <- alertTask{ - action: "schedule", - systemName: systemName, - alertRecord: alertRecord, - delay: time.Duration(min) * time.Minute, - } + am.schedulePendingStatusAlert(systemName, alertRecord, time.Duration(min)*time.Minute) } } +// schedulePendingStatusAlert sets up a timer to send a "down" alert after the specified delay if the system is still down. +// It returns true if the alert was scheduled, or false if an alert was already pending for the given alert record. +func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecord *core.Record, delay time.Duration) bool { + alert := &alertInfo{ + systemName: systemName, + alertRecord: alertRecord, + expireTime: time.Now().Add(delay), + } + + storedAlert, loaded := am.pendingAlerts.LoadOrStore(alertRecord.Id, alert) + if loaded { + return false + } + + stored := storedAlert.(*alertInfo) + stored.timer = time.AfterFunc(time.Until(stored.expireTime), func() { + am.processPendingAlert(alertRecord.Id) + }) + return true +} + // handleSystemUp manages the logic when a system status changes to "up". // It cancels any pending alerts and sends "up" alerts. func (am *AlertManager) handleSystemUp(systemName string, alertRecords []*core.Record) { for _, alertRecord := range alertRecords { - alertRecordID := alertRecord.Id // If alert exists for record, delete and continue (down alert not sent) - if _, exists := am.pendingAlerts.Load(alertRecordID); exists { - am.alertQueue <- alertTask{ - action: "cancel", - alertRecord: alertRecord, - } + if am.cancelPendingAlert(alertRecord.Id) { continue } - // No alert scheduled for this record, send "up" alert only if "down" was triggered if !alertRecord.GetBool("triggered") { continue } @@ -144,6 +111,36 @@ func (am *AlertManager) handleSystemUp(systemName string, alertRecords []*core.R } } +// cancelPendingAlert stops the timer and removes the pending alert for the given alert ID. Returns true if a pending alert was found and cancelled. +func (am *AlertManager) cancelPendingAlert(alertID string) bool { + value, loaded := am.pendingAlerts.LoadAndDelete(alertID) + if !loaded { + return false + } + + info := value.(*alertInfo) + if info.timer != nil { + info.timer.Stop() + } + return true +} + +// processPendingAlert sends a "down" alert if the pending alert has expired and the system is still down. +func (am *AlertManager) processPendingAlert(alertID string) { + value, loaded := am.pendingAlerts.LoadAndDelete(alertID) + if !loaded { + return + } + + info := value.(*alertInfo) + if info.alertRecord.GetBool("triggered") { + return + } + if err := am.sendStatusAlert("down", info.systemName, info.alertRecord); err != nil { + am.hub.Logger().Error("Failed to send alert", "err", err) + } +} + // sendStatusAlert sends a status alert ("up" or "down") to the users associated with the alert records. func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, alertRecord *core.Record) error { switch alertStatus { @@ -177,8 +174,8 @@ func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, a }) } -// resolveStatusAlerts resolves any status alerts that weren't resolved -// when system came up (https://github.com/henrygd/beszel/issues/1052) +// resolveStatusAlerts resolves any triggered status alerts that weren't resolved +// when system came up (https://github.com/henrygd/beszel/issues/1052). func resolveStatusAlerts(app core.App) error { db := app.DB() // Find all active status alerts where the system is actually up @@ -208,3 +205,36 @@ func resolveStatusAlerts(app core.App) error { } return nil } + +// restorePendingStatusAlerts re-queues untriggered status alerts for systems that +// are still down after a hub restart. This rebuilds the lost in-memory timer state. +func (am *AlertManager) restorePendingStatusAlerts() error { + type pendingStatusAlert struct { + AlertID string `db:"alert_id"` + SystemName string `db:"system_name"` + } + + var pending []pendingStatusAlert + err := am.hub.DB().NewQuery(` + SELECT a.id AS alert_id, s.name AS system_name + FROM alerts a + JOIN systems s ON a.system = s.id + WHERE a.name = 'Status' + AND a.triggered = false + AND s.status = 'down' + `).All(&pending) + if err != nil { + return err + } + + for _, item := range pending { + alertRecord, err := am.hub.FindRecordById("alerts", item.AlertID) + if err != nil { + return err + } + min := max(1, alertRecord.GetInt("min")) + am.schedulePendingStatusAlert(item.SystemName, alertRecord, time.Duration(min)*time.Minute) + } + + return nil +} diff --git a/internal/alerts/alerts_status_test.go b/internal/alerts/alerts_status_test.go new file mode 100644 index 00000000..19205e94 --- /dev/null +++ b/internal/alerts/alerts_status_test.go @@ -0,0 +1,628 @@ +//go:build testing + +package alerts_test + +import ( + "testing" + "testing/synctest" + "time" + + "github.com/henrygd/beszel/internal/alerts" + beszelTests "github.com/henrygd/beszel/internal/tests" + "github.com/pocketbase/dbx" + "github.com/pocketbase/pocketbase/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStatusAlerts(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systems, err := beszelTests.CreateSystems(hub, 4, user.Id, "paused") + assert.NoError(t, err) + + var alerts []*core.Record + for i, system := range systems { + alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": system.Id, + "user": user.Id, + "min": i + 1, + }) + assert.NoError(t, err) + alerts = append(alerts, alert) + } + + time.Sleep(10 * time.Millisecond) + + for _, alert := range alerts { + assert.False(t, alert.GetBool("triggered"), "Alert should not be triggered immediately") + } + if hub.TestMailer.TotalSend() != 0 { + assert.Zero(t, hub.TestMailer.TotalSend(), "Expected 0 messages, got %d", hub.TestMailer.TotalSend()) + } + for _, system := range systems { + assert.EqualValues(t, "paused", system.GetString("status"), "System should be paused") + } + for _, system := range systems { + system.Set("status", "up") + err = hub.SaveNoValidate(system) + assert.NoError(t, err) + } + time.Sleep(time.Second) + assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map") + for _, system := range systems { + system.Set("status", "down") + err = hub.SaveNoValidate(system) + assert.NoError(t, err) + } + // after 30 seconds, should have 4 alerts in the pendingAlerts map, no triggered alerts + time.Sleep(time.Second * 30) + assert.EqualValues(t, 4, hub.GetPendingAlertsCount(), "should have 4 alerts in the pendingAlerts map") + triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) + assert.NoError(t, err) + assert.EqualValues(t, 0, triggeredCount, "should have 0 alert triggered") + assert.EqualValues(t, 0, hub.TestMailer.TotalSend(), "should have 0 messages sent") + // after 1:30 seconds, should have 1 triggered alert and 3 pending alerts + time.Sleep(time.Second * 60) + assert.EqualValues(t, 3, hub.GetPendingAlertsCount(), "should have 3 alerts in the pendingAlerts map") + triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) + assert.NoError(t, err) + assert.EqualValues(t, 1, triggeredCount, "should have 1 alert triggered") + assert.EqualValues(t, 1, hub.TestMailer.TotalSend(), "should have 1 messages sent") + // after 2:30 seconds, should have 2 triggered alerts and 2 pending alerts + time.Sleep(time.Second * 60) + assert.EqualValues(t, 2, hub.GetPendingAlertsCount(), "should have 2 alerts in the pendingAlerts map") + triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) + assert.NoError(t, err) + assert.EqualValues(t, 2, triggeredCount, "should have 2 alert triggered") + assert.EqualValues(t, 2, hub.TestMailer.TotalSend(), "should have 2 messages sent") + // now we will bring the remaning systems back up + for _, system := range systems { + system.Set("status", "up") + err = hub.SaveNoValidate(system) + assert.NoError(t, err) + } + time.Sleep(time.Second) + // should have 0 alerts in the pendingAlerts map and 0 alerts triggered + assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map") + triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) + assert.NoError(t, err) + assert.Zero(t, triggeredCount, "should have 0 alert triggered") + // 4 messages sent, 2 down alerts and 2 up alerts for first 2 systems + assert.EqualValues(t, 4, hub.TestMailer.TotalSend(), "should have 4 messages sent") + }) +} +func TestStatusAlertRecoveryBeforeDeadline(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + // Ensure user settings have an email + userSettings, _ := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + hub.Save(userSettings) + + // Initial email count + initialEmailCount := hub.TestMailer.TotalSend() + + systemCollection, _ := hub.FindCollectionByNameOrId("systems") + system := core.NewRecord(systemCollection) + system.Set("name", "test-system") + system.Set("status", "up") + system.Set("host", "127.0.0.1") + system.Set("users", []string{user.Id}) + hub.Save(system) + + alertCollection, _ := hub.FindCollectionByNameOrId("alerts") + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", false) + alert.Set("min", 1) + hub.Save(alert) + + am := hub.AlertManager + + // 1. System goes down + am.HandleStatusAlerts("down", system) + assert.Equal(t, 1, am.GetPendingAlertsCount(), "Alert should be scheduled") + + // 2. System goes up BEFORE delay expires + // Triggering HandleStatusAlerts("up") SHOULD NOT send an alert. + am.HandleStatusAlerts("up", system) + + assert.Equal(t, 0, am.GetPendingAlertsCount(), "Alert should be canceled if system recovers before delay expires") + + // Verify that NO email was sent. + assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "Recovery notification should not be sent if system never went down") + +} + +func TestStatusAlertNormalRecovery(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + // Ensure user settings have an email + userSettings, _ := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + hub.Save(userSettings) + + systemCollection, _ := hub.FindCollectionByNameOrId("systems") + system := core.NewRecord(systemCollection) + system.Set("name", "test-system") + system.Set("status", "up") + system.Set("host", "127.0.0.1") + system.Set("users", []string{user.Id}) + hub.Save(system) + + alertCollection, _ := hub.FindCollectionByNameOrId("alerts") + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", true) // System was confirmed DOWN + hub.Save(alert) + + am := hub.AlertManager + initialEmailCount := hub.TestMailer.TotalSend() + + // System goes up + am.HandleStatusAlerts("up", system) + + // Verify that an email WAS sent (normal recovery). + assert.Equal(t, initialEmailCount+1, hub.TestMailer.TotalSend(), "Recovery notification should be sent if system was triggered as down") + +} + +func TestHandleStatusAlertsDoesNotSendRecoveryWhileDownIsOnlyPending(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + require.NoError(t, err) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + require.NoError(t, hub.Save(userSettings)) + + systemCollection, err := hub.FindCollectionByNameOrId("systems") + require.NoError(t, err) + system := core.NewRecord(systemCollection) + system.Set("name", "test-system") + system.Set("status", "up") + system.Set("host", "127.0.0.1") + system.Set("users", []string{user.Id}) + require.NoError(t, hub.Save(system)) + + alertCollection, err := hub.FindCollectionByNameOrId("alerts") + require.NoError(t, err) + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", false) + alert.Set("min", 1) + require.NoError(t, hub.Save(alert)) + + initialEmailCount := hub.TestMailer.TotalSend() + am := alerts.NewTestAlertManagerWithoutWorker(hub) + + require.NoError(t, am.HandleStatusAlerts("down", system)) + assert.Equal(t, 1, am.GetPendingAlertsCount(), "down transition should register a pending alert immediately") + + require.NoError(t, am.HandleStatusAlerts("up", system)) + assert.Zero(t, am.GetPendingAlertsCount(), "recovery should cancel the pending down alert") + assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "recovery notification should not be sent before a down alert triggers") + + alertRecord, err := hub.FindRecordById("alerts", alert.Id) + require.NoError(t, err) + assert.False(t, alertRecord.GetBool("triggered"), "alert should remain untriggered when downtime never matured") +} + +func TestStatusAlertTimerCancellationPreventsBoundaryDelivery(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + require.NoError(t, err) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + require.NoError(t, hub.Save(userSettings)) + + systemCollection, err := hub.FindCollectionByNameOrId("systems") + require.NoError(t, err) + system := core.NewRecord(systemCollection) + system.Set("name", "test-system") + system.Set("status", "up") + system.Set("host", "127.0.0.1") + system.Set("users", []string{user.Id}) + require.NoError(t, hub.Save(system)) + + alertCollection, err := hub.FindCollectionByNameOrId("alerts") + require.NoError(t, err) + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", false) + alert.Set("min", 1) + require.NoError(t, hub.Save(alert)) + + initialEmailCount := hub.TestMailer.TotalSend() + am := alerts.NewTestAlertManagerWithoutWorker(hub) + + require.NoError(t, am.HandleStatusAlerts("down", system)) + assert.Equal(t, 1, am.GetPendingAlertsCount(), "down transition should register a pending alert immediately") + require.True(t, am.ResetPendingAlertTimer(alert.Id, 25*time.Millisecond), "test should shorten the pending alert timer") + + time.Sleep(10 * time.Millisecond) + require.NoError(t, am.HandleStatusAlerts("up", system)) + assert.Zero(t, am.GetPendingAlertsCount(), "recovery should remove the pending alert before the timer callback runs") + + time.Sleep(40 * time.Millisecond) + assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "timer callback should not deliver after recovery cancels the pending alert") + + alertRecord, err := hub.FindRecordById("alerts", alert.Id) + require.NoError(t, err) + assert.False(t, alertRecord.GetBool("triggered"), "alert should remain untriggered when cancellation wins the timer race") + + time.Sleep(time.Minute) + synctest.Wait() + }) +} + +func TestStatusAlertDownFiresAfterDelayExpires(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + require.NoError(t, err) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + require.NoError(t, hub.Save(userSettings)) + + systemCollection, err := hub.FindCollectionByNameOrId("systems") + require.NoError(t, err) + system := core.NewRecord(systemCollection) + system.Set("name", "test-system") + system.Set("status", "up") + system.Set("host", "127.0.0.1") + system.Set("users", []string{user.Id}) + require.NoError(t, hub.Save(system)) + + alertCollection, err := hub.FindCollectionByNameOrId("alerts") + require.NoError(t, err) + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", false) + alert.Set("min", 1) + require.NoError(t, hub.Save(alert)) + + initialEmailCount := hub.TestMailer.TotalSend() + am := alerts.NewTestAlertManagerWithoutWorker(hub) + + require.NoError(t, am.HandleStatusAlerts("down", system)) + assert.Equal(t, 1, am.GetPendingAlertsCount(), "alert should be pending after system goes down") + + // Expire the pending alert and process it + am.ForceExpirePendingAlerts() + processed, err := am.ProcessPendingAlerts() + require.NoError(t, err) + assert.Len(t, processed, 1, "one alert should have been processed") + assert.Equal(t, 0, am.GetPendingAlertsCount(), "pending alert should be consumed after processing") + + // Verify down email was sent + assert.Equal(t, initialEmailCount+1, hub.TestMailer.TotalSend(), "down notification should be sent after delay expires") + + // Verify triggered flag is set in the DB + alertRecord, err := hub.FindRecordById("alerts", alert.Id) + require.NoError(t, err) + assert.True(t, alertRecord.GetBool("triggered"), "alert should be marked triggered after downtime matures") +} + +func TestStatusAlertDuplicateDownCallIsIdempotent(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + require.NoError(t, err) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + require.NoError(t, hub.Save(userSettings)) + + systemCollection, err := hub.FindCollectionByNameOrId("systems") + require.NoError(t, err) + system := core.NewRecord(systemCollection) + system.Set("name", "test-system") + system.Set("status", "up") + system.Set("host", "127.0.0.1") + system.Set("users", []string{user.Id}) + require.NoError(t, hub.Save(system)) + + alertCollection, err := hub.FindCollectionByNameOrId("alerts") + require.NoError(t, err) + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", false) + alert.Set("min", 5) + require.NoError(t, hub.Save(alert)) + + am := alerts.NewTestAlertManagerWithoutWorker(hub) + + require.NoError(t, am.HandleStatusAlerts("down", system)) + require.NoError(t, am.HandleStatusAlerts("down", system)) + require.NoError(t, am.HandleStatusAlerts("down", system)) + + assert.Equal(t, 1, am.GetPendingAlertsCount(), "repeated down calls should not schedule duplicate pending alerts") +} + +func TestStatusAlertNoAlertRecord(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systemCollection, err := hub.FindCollectionByNameOrId("systems") + require.NoError(t, err) + system := core.NewRecord(systemCollection) + system.Set("name", "test-system") + system.Set("status", "up") + system.Set("host", "127.0.0.1") + system.Set("users", []string{user.Id}) + require.NoError(t, hub.Save(system)) + + // No Status alert record created for this system + initialEmailCount := hub.TestMailer.TotalSend() + am := alerts.NewTestAlertManagerWithoutWorker(hub) + + require.NoError(t, am.HandleStatusAlerts("down", system)) + assert.Equal(t, 0, am.GetPendingAlertsCount(), "no pending alert when no alert record exists") + + require.NoError(t, am.HandleStatusAlerts("up", system)) + assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "no email when no alert record exists") +} + +func TestRestorePendingStatusAlertsRequeuesDownSystemsAfterRestart(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id}) + require.NoError(t, err) + userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`) + require.NoError(t, hub.Save(userSettings)) + + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "down") + require.NoError(t, err) + system := systems[0] + + alertCollection, err := hub.FindCollectionByNameOrId("alerts") + require.NoError(t, err) + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", false) + alert.Set("min", 1) + require.NoError(t, hub.Save(alert)) + + initialEmailCount := hub.TestMailer.TotalSend() + am := alerts.NewTestAlertManagerWithoutWorker(hub) + + require.NoError(t, am.RestorePendingStatusAlerts()) + assert.Equal(t, 1, am.GetPendingAlertsCount(), "startup restore should requeue a pending down alert for a system still marked down") + + am.ForceExpirePendingAlerts() + processed, err := am.ProcessPendingAlerts() + require.NoError(t, err) + assert.Len(t, processed, 1, "restored pending alert should be processable after the delay expires") + assert.Equal(t, initialEmailCount+1, hub.TestMailer.TotalSend(), "restored pending alert should send the down notification") + + alertRecord, err := hub.FindRecordById("alerts", alert.Id) + require.NoError(t, err) + assert.True(t, alertRecord.GetBool("triggered"), "restored pending alert should mark the alert as triggered once delivered") +} + +func TestRestorePendingStatusAlertsSkipsNonDownOrAlreadyTriggeredAlerts(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systemsDown, err := beszelTests.CreateSystems(hub, 2, user.Id, "down") + require.NoError(t, err) + systemDownPending := systemsDown[0] + systemDownTriggered := systemsDown[1] + + systemUp, err := beszelTests.CreateRecord(hub, "systems", map[string]any{ + "name": "up-system", + "users": []string{user.Id}, + "host": "127.0.0.2", + "status": "up", + }) + require.NoError(t, err) + + _, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": systemDownPending.Id, + "user": user.Id, + "min": 1, + "triggered": false, + }) + require.NoError(t, err) + + _, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": systemUp.Id, + "user": user.Id, + "min": 1, + "triggered": false, + }) + require.NoError(t, err) + + _, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": systemDownTriggered.Id, + "user": user.Id, + "min": 1, + "triggered": true, + }) + require.NoError(t, err) + + am := alerts.NewTestAlertManagerWithoutWorker(hub) + require.NoError(t, am.RestorePendingStatusAlerts()) + assert.Equal(t, 1, am.GetPendingAlertsCount(), "only untriggered alerts for currently down systems should be restored") +} + +func TestRestorePendingStatusAlertsIsIdempotent(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "down") + require.NoError(t, err) + system := systems[0] + + _, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": system.Id, + "user": user.Id, + "min": 1, + "triggered": false, + }) + require.NoError(t, err) + + am := alerts.NewTestAlertManagerWithoutWorker(hub) + require.NoError(t, am.RestorePendingStatusAlerts()) + require.NoError(t, am.RestorePendingStatusAlerts()) + + assert.Equal(t, 1, am.GetPendingAlertsCount(), "restoring twice should not create duplicate pending alerts") + am.ForceExpirePendingAlerts() + processed, err := am.ProcessPendingAlerts() + require.NoError(t, err) + assert.Len(t, processed, 1, "restored alert should still be processable exactly once") + assert.Zero(t, am.GetPendingAlertsCount(), "processing the restored alert should empty the pending map") +} + +func TestResolveStatusAlertsFixesStaleTriggered(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + // CreateSystems uses SaveNoValidate after initial save to bypass the + // onRecordCreate hook that forces status = "pending". + systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up") + require.NoError(t, err) + system := systems[0] + + alertCollection, err := hub.FindCollectionByNameOrId("alerts") + require.NoError(t, err) + alert := core.NewRecord(alertCollection) + alert.Set("user", user.Id) + alert.Set("system", system.Id) + alert.Set("name", "Status") + alert.Set("triggered", true) // Stale: system is up but alert still says triggered + require.NoError(t, hub.Save(alert)) + + // resolveStatusAlerts should clear the stale triggered flag + require.NoError(t, alerts.ResolveStatusAlerts(hub)) + + alertRecord, err := hub.FindRecordById("alerts", alert.Id) + require.NoError(t, err) + assert.False(t, alertRecord.GetBool("triggered"), "stale triggered flag should be cleared when system is up") +} +func TestResolveStatusAlerts(t *testing.T) { + hub, user := beszelTests.GetHubWithUser(t) + defer hub.Cleanup() + + // Create a systemUp + systemUp, err := beszelTests.CreateRecord(hub, "systems", map[string]any{ + "name": "test-system", + "users": []string{user.Id}, + "host": "127.0.0.1", + "status": "up", + }) + assert.NoError(t, err) + + systemDown, err := beszelTests.CreateRecord(hub, "systems", map[string]any{ + "name": "test-system-2", + "users": []string{user.Id}, + "host": "127.0.0.2", + "status": "up", + }) + assert.NoError(t, err) + + // Create a status alertUp for the system + alertUp, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": systemUp.Id, + "user": user.Id, + "min": 1, + }) + assert.NoError(t, err) + + alertDown, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ + "name": "Status", + "system": systemDown.Id, + "user": user.Id, + "min": 1, + }) + assert.NoError(t, err) + + // Verify alert is not triggered initially + assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered initially") + + // Set the system to 'up' (this should not trigger the alert) + systemUp.Set("status", "up") + err = hub.SaveNoValidate(systemUp) + assert.NoError(t, err) + + systemDown.Set("status", "down") + err = hub.SaveNoValidate(systemDown) + assert.NoError(t, err) + + // Wait a moment for any processing + time.Sleep(10 * time.Millisecond) + + // Verify alertUp is still not triggered after setting system to up + alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id}) + assert.NoError(t, err) + assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered when system is up") + + // Manually set both alerts triggered to true + alertUp.Set("triggered", true) + err = hub.SaveNoValidate(alertUp) + assert.NoError(t, err) + alertDown.Set("triggered", true) + err = hub.SaveNoValidate(alertDown) + assert.NoError(t, err) + + // Verify we have exactly one alert with triggered true + triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) + assert.NoError(t, err) + assert.EqualValues(t, 2, triggeredCount, "Should have exactly two alerts with triggered true") + + // Verify the specific alertUp is triggered + alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id}) + assert.NoError(t, err) + assert.True(t, alertUp.GetBool("triggered"), "Alert should be triggered") + + // Verify we have two unresolved alert history records + alertHistoryCount, err := hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""}) + assert.NoError(t, err) + assert.EqualValues(t, 2, alertHistoryCount, "Should have exactly two unresolved alert history records") + + err = alerts.ResolveStatusAlerts(hub) + assert.NoError(t, err) + + // Verify alertUp is not triggered after resolving + alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id}) + assert.NoError(t, err) + assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered after resolving") + // Verify alertDown is still triggered + alertDown, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertDown.Id}) + assert.NoError(t, err) + assert.True(t, alertDown.GetBool("triggered"), "Alert should still be triggered after resolving") + + // Verify we have one unresolved alert history record + alertHistoryCount, err = hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""}) + assert.NoError(t, err) + assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record") + +} diff --git a/internal/alerts/alerts_test.go b/internal/alerts/alerts_test.go index 07412904..e5f656dd 100644 --- a/internal/alerts/alerts_test.go +++ b/internal/alerts/alerts_test.go @@ -12,7 +12,6 @@ import ( "testing/synctest" "time" - "github.com/henrygd/beszel/internal/alerts" beszelTests "github.com/henrygd/beszel/internal/tests" "github.com/pocketbase/dbx" @@ -369,87 +368,6 @@ func TestUserAlertsApi(t *testing.T) { } } -func TestStatusAlerts(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - hub, user := beszelTests.GetHubWithUser(t) - defer hub.Cleanup() - - systems, err := beszelTests.CreateSystems(hub, 4, user.Id, "paused") - assert.NoError(t, err) - - var alerts []*core.Record - for i, system := range systems { - alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ - "name": "Status", - "system": system.Id, - "user": user.Id, - "min": i + 1, - }) - assert.NoError(t, err) - alerts = append(alerts, alert) - } - - time.Sleep(10 * time.Millisecond) - - for _, alert := range alerts { - assert.False(t, alert.GetBool("triggered"), "Alert should not be triggered immediately") - } - if hub.TestMailer.TotalSend() != 0 { - assert.Zero(t, hub.TestMailer.TotalSend(), "Expected 0 messages, got %d", hub.TestMailer.TotalSend()) - } - for _, system := range systems { - assert.EqualValues(t, "paused", system.GetString("status"), "System should be paused") - } - for _, system := range systems { - system.Set("status", "up") - err = hub.SaveNoValidate(system) - assert.NoError(t, err) - } - time.Sleep(time.Second) - assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map") - for _, system := range systems { - system.Set("status", "down") - err = hub.SaveNoValidate(system) - assert.NoError(t, err) - } - // after 30 seconds, should have 4 alerts in the pendingAlerts map, no triggered alerts - time.Sleep(time.Second * 30) - assert.EqualValues(t, 4, hub.GetPendingAlertsCount(), "should have 4 alerts in the pendingAlerts map") - triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) - assert.NoError(t, err) - assert.EqualValues(t, 0, triggeredCount, "should have 0 alert triggered") - assert.EqualValues(t, 0, hub.TestMailer.TotalSend(), "should have 0 messages sent") - // after 1:30 seconds, should have 1 triggered alert and 3 pending alerts - time.Sleep(time.Second * 60) - assert.EqualValues(t, 3, hub.GetPendingAlertsCount(), "should have 3 alerts in the pendingAlerts map") - triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) - assert.NoError(t, err) - assert.EqualValues(t, 1, triggeredCount, "should have 1 alert triggered") - assert.EqualValues(t, 1, hub.TestMailer.TotalSend(), "should have 1 messages sent") - // after 2:30 seconds, should have 2 triggered alerts and 2 pending alerts - time.Sleep(time.Second * 60) - assert.EqualValues(t, 2, hub.GetPendingAlertsCount(), "should have 2 alerts in the pendingAlerts map") - triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) - assert.NoError(t, err) - assert.EqualValues(t, 2, triggeredCount, "should have 2 alert triggered") - assert.EqualValues(t, 2, hub.TestMailer.TotalSend(), "should have 2 messages sent") - // now we will bring the remaning systems back up - for _, system := range systems { - system.Set("status", "up") - err = hub.SaveNoValidate(system) - assert.NoError(t, err) - } - time.Sleep(time.Second) - // should have 0 alerts in the pendingAlerts map and 0 alerts triggered - assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map") - triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) - assert.NoError(t, err) - assert.Zero(t, triggeredCount, "should have 0 alert triggered") - // 4 messages sent, 2 down alerts and 2 up alerts for first 2 systems - assert.EqualValues(t, 4, hub.TestMailer.TotalSend(), "should have 4 messages sent") - }) -} - func TestAlertsHistory(t *testing.T) { synctest.Test(t, func(t *testing.T) { hub, user := beszelTests.GetHubWithUser(t) @@ -578,102 +496,3 @@ func TestAlertsHistory(t *testing.T) { assert.EqualValues(t, 2, totalHistoryCount, "Should have 2 total alert history records") }) } -func TestResolveStatusAlerts(t *testing.T) { - hub, user := beszelTests.GetHubWithUser(t) - defer hub.Cleanup() - - // Create a systemUp - systemUp, err := beszelTests.CreateRecord(hub, "systems", map[string]any{ - "name": "test-system", - "users": []string{user.Id}, - "host": "127.0.0.1", - "status": "up", - }) - assert.NoError(t, err) - - systemDown, err := beszelTests.CreateRecord(hub, "systems", map[string]any{ - "name": "test-system-2", - "users": []string{user.Id}, - "host": "127.0.0.2", - "status": "up", - }) - assert.NoError(t, err) - - // Create a status alertUp for the system - alertUp, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ - "name": "Status", - "system": systemUp.Id, - "user": user.Id, - "min": 1, - }) - assert.NoError(t, err) - - alertDown, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{ - "name": "Status", - "system": systemDown.Id, - "user": user.Id, - "min": 1, - }) - assert.NoError(t, err) - - // Verify alert is not triggered initially - assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered initially") - - // Set the system to 'up' (this should not trigger the alert) - systemUp.Set("status", "up") - err = hub.SaveNoValidate(systemUp) - assert.NoError(t, err) - - systemDown.Set("status", "down") - err = hub.SaveNoValidate(systemDown) - assert.NoError(t, err) - - // Wait a moment for any processing - time.Sleep(10 * time.Millisecond) - - // Verify alertUp is still not triggered after setting system to up - alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id}) - assert.NoError(t, err) - assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered when system is up") - - // Manually set both alerts triggered to true - alertUp.Set("triggered", true) - err = hub.SaveNoValidate(alertUp) - assert.NoError(t, err) - alertDown.Set("triggered", true) - err = hub.SaveNoValidate(alertDown) - assert.NoError(t, err) - - // Verify we have exactly one alert with triggered true - triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true}) - assert.NoError(t, err) - assert.EqualValues(t, 2, triggeredCount, "Should have exactly two alerts with triggered true") - - // Verify the specific alertUp is triggered - alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id}) - assert.NoError(t, err) - assert.True(t, alertUp.GetBool("triggered"), "Alert should be triggered") - - // Verify we have two unresolved alert history records - alertHistoryCount, err := hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""}) - assert.NoError(t, err) - assert.EqualValues(t, 2, alertHistoryCount, "Should have exactly two unresolved alert history records") - - err = alerts.ResolveStatusAlerts(hub) - assert.NoError(t, err) - - // Verify alertUp is not triggered after resolving - alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id}) - assert.NoError(t, err) - assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered after resolving") - // Verify alertDown is still triggered - alertDown, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertDown.Id}) - assert.NoError(t, err) - assert.True(t, alertDown.GetBool("triggered"), "Alert should still be triggered after resolving") - - // Verify we have one unresolved alert history record - alertHistoryCount, err = hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""}) - assert.NoError(t, err) - assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record") - -} diff --git a/internal/alerts/alerts_test_helpers.go b/internal/alerts/alerts_test_helpers.go index 8894e3cb..f3f2a303 100644 --- a/internal/alerts/alerts_test_helpers.go +++ b/internal/alerts/alerts_test_helpers.go @@ -9,6 +9,12 @@ import ( "github.com/pocketbase/pocketbase/core" ) +func NewTestAlertManagerWithoutWorker(app hubLike) *AlertManager { + return &AlertManager{ + hub: app, + } +} + func (am *AlertManager) GetAlertManager() *AlertManager { return am } @@ -34,12 +40,11 @@ func (am *AlertManager) ProcessPendingAlerts() ([]*core.Record, error) { am.pendingAlerts.Range(func(key, value any) bool { info := value.(*alertInfo) if now.After(info.expireTime) { - // Downtime delay has passed, process alert - if err := am.sendStatusAlert("down", info.systemName, info.alertRecord); err != nil { - lastErr = err + if info.timer != nil { + info.timer.Stop() } + am.processPendingAlert(key.(string)) processedAlerts = append(processedAlerts, info.alertRecord) - am.pendingAlerts.Delete(key) } return true }) @@ -56,6 +61,27 @@ func (am *AlertManager) ForceExpirePendingAlerts() { }) } +func (am *AlertManager) ResetPendingAlertTimer(alertID string, delay time.Duration) bool { + value, loaded := am.pendingAlerts.Load(alertID) + if !loaded { + return false + } + + info := value.(*alertInfo) + if info.timer != nil { + info.timer.Stop() + } + info.expireTime = time.Now().Add(delay) + info.timer = time.AfterFunc(delay, func() { + am.processPendingAlert(alertID) + }) + return true +} + func ResolveStatusAlerts(app core.App) error { return resolveStatusAlerts(app) } + +func (am *AlertManager) RestorePendingStatusAlerts() error { + return am.restorePendingStatusAlerts() +} diff --git a/internal/tests/hub.go b/internal/tests/hub.go index 598d774d..db035103 100644 --- a/internal/tests/hub.go +++ b/internal/tests/hub.go @@ -98,7 +98,7 @@ func ClearCollection(t testing.TB, app core.App, collectionName string) error { } func (h *TestHub) Cleanup() { - h.GetAlertManager().StopWorker() + h.GetAlertManager().Stop() h.GetSystemManager().RemoveAllSystems() h.TestApp.Cleanup() }