mirror of
https://github.com/henrygd/beszel.git
synced 2026-03-21 21:26:16 +01:00
Compare commits
2 Commits
temp-down-
...
8e2316f845
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8e2316f845 | ||
|
|
0d3dfcb207 |
@@ -21,8 +21,7 @@ type hubLike interface {
|
|||||||
|
|
||||||
type AlertManager struct {
|
type AlertManager struct {
|
||||||
hub hubLike
|
hub hubLike
|
||||||
alertQueue chan alertTask
|
stopOnce sync.Once
|
||||||
stopChan chan struct{}
|
|
||||||
pendingAlerts sync.Map
|
pendingAlerts sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -98,12 +97,9 @@ var supportsTitle = map[string]struct{}{
|
|||||||
// NewAlertManager creates a new AlertManager instance.
|
// NewAlertManager creates a new AlertManager instance.
|
||||||
func NewAlertManager(app hubLike) *AlertManager {
|
func NewAlertManager(app hubLike) *AlertManager {
|
||||||
am := &AlertManager{
|
am := &AlertManager{
|
||||||
hub: app,
|
hub: app,
|
||||||
alertQueue: make(chan alertTask, 5),
|
|
||||||
stopChan: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
am.bindEvents()
|
am.bindEvents()
|
||||||
go am.startWorker()
|
|
||||||
return am
|
return am
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +108,16 @@ func (am *AlertManager) bindEvents() {
|
|||||||
am.hub.OnRecordAfterUpdateSuccess("alerts").BindFunc(updateHistoryOnAlertUpdate)
|
am.hub.OnRecordAfterUpdateSuccess("alerts").BindFunc(updateHistoryOnAlertUpdate)
|
||||||
am.hub.OnRecordAfterDeleteSuccess("alerts").BindFunc(resolveHistoryOnAlertDelete)
|
am.hub.OnRecordAfterDeleteSuccess("alerts").BindFunc(resolveHistoryOnAlertDelete)
|
||||||
am.hub.OnRecordAfterUpdateSuccess("smart_devices").BindFunc(am.handleSmartDeviceAlert)
|
am.hub.OnRecordAfterUpdateSuccess("smart_devices").BindFunc(am.handleSmartDeviceAlert)
|
||||||
|
|
||||||
|
am.hub.OnServe().BindFunc(func(e *core.ServeEvent) error {
|
||||||
|
if err := resolveStatusAlerts(e.App); err != nil {
|
||||||
|
e.App.Logger().Error("Failed to resolve stale status alerts", "err", err)
|
||||||
|
}
|
||||||
|
if err := am.restorePendingStatusAlerts(); err != nil {
|
||||||
|
e.App.Logger().Error("Failed to restore pending status alerts", "err", err)
|
||||||
|
}
|
||||||
|
return e.Next()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsNotificationSilenced checks if a notification should be silenced based on configured quiet hours
|
// IsNotificationSilenced checks if a notification should be silenced based on configured quiet hours
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ func TestAlertSilencedOneTime(t *testing.T) {
|
|||||||
|
|
||||||
// Get alert manager
|
// Get alert manager
|
||||||
am := alerts.NewAlertManager(hub)
|
am := alerts.NewAlertManager(hub)
|
||||||
defer am.StopWorker()
|
defer am.Stop()
|
||||||
|
|
||||||
// Test that alert is silenced
|
// Test that alert is silenced
|
||||||
silenced := am.IsNotificationSilenced(user.Id, system.Id)
|
silenced := am.IsNotificationSilenced(user.Id, system.Id)
|
||||||
@@ -106,7 +106,7 @@ func TestAlertSilencedDaily(t *testing.T) {
|
|||||||
|
|
||||||
// Get alert manager
|
// Get alert manager
|
||||||
am := alerts.NewAlertManager(hub)
|
am := alerts.NewAlertManager(hub)
|
||||||
defer am.StopWorker()
|
defer am.Stop()
|
||||||
|
|
||||||
// Get current hour and create a window that includes current time
|
// Get current hour and create a window that includes current time
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
@@ -170,7 +170,7 @@ func TestAlertSilencedDailyMidnightCrossing(t *testing.T) {
|
|||||||
|
|
||||||
// Get alert manager
|
// Get alert manager
|
||||||
am := alerts.NewAlertManager(hub)
|
am := alerts.NewAlertManager(hub)
|
||||||
defer am.StopWorker()
|
defer am.Stop()
|
||||||
|
|
||||||
// Create a window that crosses midnight: 22:00 - 02:00
|
// Create a window that crosses midnight: 22:00 - 02:00
|
||||||
startTime := time.Date(2000, 1, 1, 22, 0, 0, 0, time.UTC)
|
startTime := time.Date(2000, 1, 1, 22, 0, 0, 0, time.UTC)
|
||||||
@@ -211,7 +211,7 @@ func TestAlertSilencedGlobal(t *testing.T) {
|
|||||||
|
|
||||||
// Get alert manager
|
// Get alert manager
|
||||||
am := alerts.NewAlertManager(hub)
|
am := alerts.NewAlertManager(hub)
|
||||||
defer am.StopWorker()
|
defer am.Stop()
|
||||||
|
|
||||||
// Create a global quiet hours window (no system specified)
|
// Create a global quiet hours window (no system specified)
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
@@ -250,7 +250,7 @@ func TestAlertSilencedSystemSpecific(t *testing.T) {
|
|||||||
|
|
||||||
// Get alert manager
|
// Get alert manager
|
||||||
am := alerts.NewAlertManager(hub)
|
am := alerts.NewAlertManager(hub)
|
||||||
defer am.StopWorker()
|
defer am.Stop()
|
||||||
|
|
||||||
// Create a system-specific quiet hours window for system1 only
|
// Create a system-specific quiet hours window for system1 only
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
@@ -296,7 +296,7 @@ func TestAlertSilencedMultiUser(t *testing.T) {
|
|||||||
|
|
||||||
// Get alert manager
|
// Get alert manager
|
||||||
am := alerts.NewAlertManager(hub)
|
am := alerts.NewAlertManager(hub)
|
||||||
defer am.StopWorker()
|
defer am.Stop()
|
||||||
|
|
||||||
// Create a quiet hours window for user1 only
|
// Create a quiet hours window for user1 only
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
@@ -417,7 +417,7 @@ func TestAlertSilencedNoWindows(t *testing.T) {
|
|||||||
|
|
||||||
// Get alert manager
|
// Get alert manager
|
||||||
am := alerts.NewAlertManager(hub)
|
am := alerts.NewAlertManager(hub)
|
||||||
defer am.StopWorker()
|
defer am.Stop()
|
||||||
|
|
||||||
// Without any quiet hours windows, alert should NOT be silenced
|
// Without any quiet hours windows, alert should NOT be silenced
|
||||||
silenced := am.IsNotificationSilenced(user.Id, system.Id)
|
silenced := am.IsNotificationSilenced(user.Id, system.Id)
|
||||||
|
|||||||
@@ -9,63 +9,25 @@ import (
|
|||||||
"github.com/pocketbase/pocketbase/core"
|
"github.com/pocketbase/pocketbase/core"
|
||||||
)
|
)
|
||||||
|
|
||||||
type alertTask struct {
|
|
||||||
action string // "schedule" or "cancel"
|
|
||||||
systemName string
|
|
||||||
alertRecord *core.Record
|
|
||||||
delay time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type alertInfo struct {
|
type alertInfo struct {
|
||||||
systemName string
|
systemName string
|
||||||
alertRecord *core.Record
|
alertRecord *core.Record
|
||||||
expireTime time.Time
|
expireTime time.Time
|
||||||
|
timer *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
// startWorker is a long-running goroutine that processes alert tasks
|
// Stop cancels all pending status alert timers.
|
||||||
// every x seconds. It must be running to process status alerts.
|
func (am *AlertManager) Stop() {
|
||||||
func (am *AlertManager) startWorker() {
|
am.stopOnce.Do(func() {
|
||||||
processPendingAlerts := time.Tick(15 * time.Second)
|
am.pendingAlerts.Range(func(key, value any) bool {
|
||||||
|
info := value.(*alertInfo)
|
||||||
// check for status alerts that are not resolved when system comes up
|
if info.timer != nil {
|
||||||
// (can be removed if we figure out core bug in #1052)
|
info.timer.Stop()
|
||||||
checkStatusAlerts := time.Tick(561 * time.Second)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-am.stopChan:
|
|
||||||
return
|
|
||||||
case task := <-am.alertQueue:
|
|
||||||
switch task.action {
|
|
||||||
case "schedule":
|
|
||||||
am.pendingAlerts.Store(task.alertRecord.Id, &alertInfo{
|
|
||||||
systemName: task.systemName,
|
|
||||||
alertRecord: task.alertRecord,
|
|
||||||
expireTime: time.Now().Add(task.delay),
|
|
||||||
})
|
|
||||||
case "cancel":
|
|
||||||
am.pendingAlerts.Delete(task.alertRecord.Id)
|
|
||||||
}
|
}
|
||||||
case <-checkStatusAlerts:
|
am.pendingAlerts.Delete(key)
|
||||||
resolveStatusAlerts(am.hub)
|
return true
|
||||||
case <-processPendingAlerts:
|
})
|
||||||
// Check for expired alerts every tick
|
})
|
||||||
now := time.Now()
|
|
||||||
for key, value := range am.pendingAlerts.Range {
|
|
||||||
info := value.(*alertInfo)
|
|
||||||
if now.After(info.expireTime) {
|
|
||||||
// Downtime delay has passed, process alert
|
|
||||||
am.sendStatusAlert("down", info.systemName, info.alertRecord)
|
|
||||||
am.pendingAlerts.Delete(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// StopWorker shuts down the AlertManager.worker goroutine
|
|
||||||
func (am *AlertManager) StopWorker() {
|
|
||||||
close(am.stopChan)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleStatusAlerts manages the logic when system status changes.
|
// HandleStatusAlerts manages the logic when system status changes.
|
||||||
@@ -103,44 +65,82 @@ func (am *AlertManager) getSystemStatusAlerts(systemID string) ([]*core.Record,
|
|||||||
return alertRecords, nil
|
return alertRecords, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedules delayed "down" alerts for each alert record.
|
// handleSystemDown manages the logic when a system status changes to "down". It schedules pending alerts for each alert record.
|
||||||
func (am *AlertManager) handleSystemDown(systemName string, alertRecords []*core.Record) {
|
func (am *AlertManager) handleSystemDown(systemName string, alertRecords []*core.Record) {
|
||||||
for _, alertRecord := range alertRecords {
|
for _, alertRecord := range alertRecords {
|
||||||
// Continue if alert is already scheduled
|
|
||||||
if _, exists := am.pendingAlerts.Load(alertRecord.Id); exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Schedule by adding to queue
|
|
||||||
min := max(1, alertRecord.GetInt("min"))
|
min := max(1, alertRecord.GetInt("min"))
|
||||||
am.alertQueue <- alertTask{
|
am.schedulePendingStatusAlert(systemName, alertRecord, time.Duration(min)*time.Minute)
|
||||||
action: "schedule",
|
|
||||||
systemName: systemName,
|
|
||||||
alertRecord: alertRecord,
|
|
||||||
delay: time.Duration(min) * time.Minute,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// schedulePendingStatusAlert sets up a timer to send a "down" alert after the specified delay if the system is still down.
|
||||||
|
// It returns true if the alert was scheduled, or false if an alert was already pending for the given alert record.
|
||||||
|
func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecord *core.Record, delay time.Duration) bool {
|
||||||
|
alert := &alertInfo{
|
||||||
|
systemName: systemName,
|
||||||
|
alertRecord: alertRecord,
|
||||||
|
expireTime: time.Now().Add(delay),
|
||||||
|
}
|
||||||
|
|
||||||
|
storedAlert, loaded := am.pendingAlerts.LoadOrStore(alertRecord.Id, alert)
|
||||||
|
if loaded {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
stored := storedAlert.(*alertInfo)
|
||||||
|
stored.timer = time.AfterFunc(time.Until(stored.expireTime), func() {
|
||||||
|
am.processPendingAlert(alertRecord.Id)
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// handleSystemUp manages the logic when a system status changes to "up".
|
// handleSystemUp manages the logic when a system status changes to "up".
|
||||||
// It cancels any pending alerts and sends "up" alerts.
|
// It cancels any pending alerts and sends "up" alerts.
|
||||||
func (am *AlertManager) handleSystemUp(systemName string, alertRecords []*core.Record) {
|
func (am *AlertManager) handleSystemUp(systemName string, alertRecords []*core.Record) {
|
||||||
for _, alertRecord := range alertRecords {
|
for _, alertRecord := range alertRecords {
|
||||||
alertRecordID := alertRecord.Id
|
|
||||||
// If alert exists for record, delete and continue (down alert not sent)
|
// If alert exists for record, delete and continue (down alert not sent)
|
||||||
if _, exists := am.pendingAlerts.Load(alertRecordID); exists {
|
if am.cancelPendingAlert(alertRecord.Id) {
|
||||||
am.alertQueue <- alertTask{
|
continue
|
||||||
action: "cancel",
|
}
|
||||||
alertRecord: alertRecord,
|
if !alertRecord.GetBool("triggered") {
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// No alert scheduled for this record, send "up" alert
|
|
||||||
if err := am.sendStatusAlert("up", systemName, alertRecord); err != nil {
|
if err := am.sendStatusAlert("up", systemName, alertRecord); err != nil {
|
||||||
am.hub.Logger().Error("Failed to send alert", "err", err)
|
am.hub.Logger().Error("Failed to send alert", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancelPendingAlert stops the timer and removes the pending alert for the given alert ID. Returns true if a pending alert was found and cancelled.
|
||||||
|
func (am *AlertManager) cancelPendingAlert(alertID string) bool {
|
||||||
|
value, loaded := am.pendingAlerts.LoadAndDelete(alertID)
|
||||||
|
if !loaded {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
info := value.(*alertInfo)
|
||||||
|
if info.timer != nil {
|
||||||
|
info.timer.Stop()
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// processPendingAlert sends a "down" alert if the pending alert has expired and the system is still down.
|
||||||
|
func (am *AlertManager) processPendingAlert(alertID string) {
|
||||||
|
value, loaded := am.pendingAlerts.LoadAndDelete(alertID)
|
||||||
|
if !loaded {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
info := value.(*alertInfo)
|
||||||
|
if info.alertRecord.GetBool("triggered") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := am.sendStatusAlert("down", info.systemName, info.alertRecord); err != nil {
|
||||||
|
am.hub.Logger().Error("Failed to send alert", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// sendStatusAlert sends a status alert ("up" or "down") to the users associated with the alert records.
|
// sendStatusAlert sends a status alert ("up" or "down") to the users associated with the alert records.
|
||||||
func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, alertRecord *core.Record) error {
|
func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, alertRecord *core.Record) error {
|
||||||
switch alertStatus {
|
switch alertStatus {
|
||||||
@@ -174,8 +174,8 @@ func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, a
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolveStatusAlerts resolves any status alerts that weren't resolved
|
// resolveStatusAlerts resolves any triggered status alerts that weren't resolved
|
||||||
// when system came up (https://github.com/henrygd/beszel/issues/1052)
|
// when system came up (https://github.com/henrygd/beszel/issues/1052).
|
||||||
func resolveStatusAlerts(app core.App) error {
|
func resolveStatusAlerts(app core.App) error {
|
||||||
db := app.DB()
|
db := app.DB()
|
||||||
// Find all active status alerts where the system is actually up
|
// Find all active status alerts where the system is actually up
|
||||||
@@ -205,3 +205,36 @@ func resolveStatusAlerts(app core.App) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// restorePendingStatusAlerts re-queues untriggered status alerts for systems that
|
||||||
|
// are still down after a hub restart. This rebuilds the lost in-memory timer state.
|
||||||
|
func (am *AlertManager) restorePendingStatusAlerts() error {
|
||||||
|
type pendingStatusAlert struct {
|
||||||
|
AlertID string `db:"alert_id"`
|
||||||
|
SystemName string `db:"system_name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var pending []pendingStatusAlert
|
||||||
|
err := am.hub.DB().NewQuery(`
|
||||||
|
SELECT a.id AS alert_id, s.name AS system_name
|
||||||
|
FROM alerts a
|
||||||
|
JOIN systems s ON a.system = s.id
|
||||||
|
WHERE a.name = 'Status'
|
||||||
|
AND a.triggered = false
|
||||||
|
AND s.status = 'down'
|
||||||
|
`).All(&pending)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range pending {
|
||||||
|
alertRecord, err := am.hub.FindRecordById("alerts", item.AlertID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
min := max(1, alertRecord.GetInt("min"))
|
||||||
|
am.schedulePendingStatusAlert(item.SystemName, alertRecord, time.Duration(min)*time.Minute)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
628
internal/alerts/alerts_status_test.go
Normal file
628
internal/alerts/alerts_status_test.go
Normal file
@@ -0,0 +1,628 @@
|
|||||||
|
//go:build testing
|
||||||
|
|
||||||
|
package alerts_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"testing/synctest"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/henrygd/beszel/internal/alerts"
|
||||||
|
beszelTests "github.com/henrygd/beszel/internal/tests"
|
||||||
|
"github.com/pocketbase/dbx"
|
||||||
|
"github.com/pocketbase/pocketbase/core"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStatusAlerts(t *testing.T) {
|
||||||
|
synctest.Test(t, func(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
systems, err := beszelTests.CreateSystems(hub, 4, user.Id, "paused")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
var alerts []*core.Record
|
||||||
|
for i, system := range systems {
|
||||||
|
alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
||||||
|
"name": "Status",
|
||||||
|
"system": system.Id,
|
||||||
|
"user": user.Id,
|
||||||
|
"min": i + 1,
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
alerts = append(alerts, alert)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
for _, alert := range alerts {
|
||||||
|
assert.False(t, alert.GetBool("triggered"), "Alert should not be triggered immediately")
|
||||||
|
}
|
||||||
|
if hub.TestMailer.TotalSend() != 0 {
|
||||||
|
assert.Zero(t, hub.TestMailer.TotalSend(), "Expected 0 messages, got %d", hub.TestMailer.TotalSend())
|
||||||
|
}
|
||||||
|
for _, system := range systems {
|
||||||
|
assert.EqualValues(t, "paused", system.GetString("status"), "System should be paused")
|
||||||
|
}
|
||||||
|
for _, system := range systems {
|
||||||
|
system.Set("status", "up")
|
||||||
|
err = hub.SaveNoValidate(system)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map")
|
||||||
|
for _, system := range systems {
|
||||||
|
system.Set("status", "down")
|
||||||
|
err = hub.SaveNoValidate(system)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
// after 30 seconds, should have 4 alerts in the pendingAlerts map, no triggered alerts
|
||||||
|
time.Sleep(time.Second * 30)
|
||||||
|
assert.EqualValues(t, 4, hub.GetPendingAlertsCount(), "should have 4 alerts in the pendingAlerts map")
|
||||||
|
triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, 0, triggeredCount, "should have 0 alert triggered")
|
||||||
|
assert.EqualValues(t, 0, hub.TestMailer.TotalSend(), "should have 0 messages sent")
|
||||||
|
// after 1:30 seconds, should have 1 triggered alert and 3 pending alerts
|
||||||
|
time.Sleep(time.Second * 60)
|
||||||
|
assert.EqualValues(t, 3, hub.GetPendingAlertsCount(), "should have 3 alerts in the pendingAlerts map")
|
||||||
|
triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, 1, triggeredCount, "should have 1 alert triggered")
|
||||||
|
assert.EqualValues(t, 1, hub.TestMailer.TotalSend(), "should have 1 messages sent")
|
||||||
|
// after 2:30 seconds, should have 2 triggered alerts and 2 pending alerts
|
||||||
|
time.Sleep(time.Second * 60)
|
||||||
|
assert.EqualValues(t, 2, hub.GetPendingAlertsCount(), "should have 2 alerts in the pendingAlerts map")
|
||||||
|
triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, 2, triggeredCount, "should have 2 alert triggered")
|
||||||
|
assert.EqualValues(t, 2, hub.TestMailer.TotalSend(), "should have 2 messages sent")
|
||||||
|
// now we will bring the remaning systems back up
|
||||||
|
for _, system := range systems {
|
||||||
|
system.Set("status", "up")
|
||||||
|
err = hub.SaveNoValidate(system)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
// should have 0 alerts in the pendingAlerts map and 0 alerts triggered
|
||||||
|
assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map")
|
||||||
|
triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Zero(t, triggeredCount, "should have 0 alert triggered")
|
||||||
|
// 4 messages sent, 2 down alerts and 2 up alerts for first 2 systems
|
||||||
|
assert.EqualValues(t, 4, hub.TestMailer.TotalSend(), "should have 4 messages sent")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
func TestStatusAlertRecoveryBeforeDeadline(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
// Ensure user settings have an email
|
||||||
|
userSettings, _ := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
|
||||||
|
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
|
||||||
|
hub.Save(userSettings)
|
||||||
|
|
||||||
|
// Initial email count
|
||||||
|
initialEmailCount := hub.TestMailer.TotalSend()
|
||||||
|
|
||||||
|
systemCollection, _ := hub.FindCollectionByNameOrId("systems")
|
||||||
|
system := core.NewRecord(systemCollection)
|
||||||
|
system.Set("name", "test-system")
|
||||||
|
system.Set("status", "up")
|
||||||
|
system.Set("host", "127.0.0.1")
|
||||||
|
system.Set("users", []string{user.Id})
|
||||||
|
hub.Save(system)
|
||||||
|
|
||||||
|
alertCollection, _ := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", false)
|
||||||
|
alert.Set("min", 1)
|
||||||
|
hub.Save(alert)
|
||||||
|
|
||||||
|
am := hub.AlertManager
|
||||||
|
|
||||||
|
// 1. System goes down
|
||||||
|
am.HandleStatusAlerts("down", system)
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "Alert should be scheduled")
|
||||||
|
|
||||||
|
// 2. System goes up BEFORE delay expires
|
||||||
|
// Triggering HandleStatusAlerts("up") SHOULD NOT send an alert.
|
||||||
|
am.HandleStatusAlerts("up", system)
|
||||||
|
|
||||||
|
assert.Equal(t, 0, am.GetPendingAlertsCount(), "Alert should be canceled if system recovers before delay expires")
|
||||||
|
|
||||||
|
// Verify that NO email was sent.
|
||||||
|
assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "Recovery notification should not be sent if system never went down")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusAlertNormalRecovery(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
// Ensure user settings have an email
|
||||||
|
userSettings, _ := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
|
||||||
|
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
|
||||||
|
hub.Save(userSettings)
|
||||||
|
|
||||||
|
systemCollection, _ := hub.FindCollectionByNameOrId("systems")
|
||||||
|
system := core.NewRecord(systemCollection)
|
||||||
|
system.Set("name", "test-system")
|
||||||
|
system.Set("status", "up")
|
||||||
|
system.Set("host", "127.0.0.1")
|
||||||
|
system.Set("users", []string{user.Id})
|
||||||
|
hub.Save(system)
|
||||||
|
|
||||||
|
alertCollection, _ := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", true) // System was confirmed DOWN
|
||||||
|
hub.Save(alert)
|
||||||
|
|
||||||
|
am := hub.AlertManager
|
||||||
|
initialEmailCount := hub.TestMailer.TotalSend()
|
||||||
|
|
||||||
|
// System goes up
|
||||||
|
am.HandleStatusAlerts("up", system)
|
||||||
|
|
||||||
|
// Verify that an email WAS sent (normal recovery).
|
||||||
|
assert.Equal(t, initialEmailCount+1, hub.TestMailer.TotalSend(), "Recovery notification should be sent if system was triggered as down")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandleStatusAlertsDoesNotSendRecoveryWhileDownIsOnlyPending(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
|
||||||
|
require.NoError(t, err)
|
||||||
|
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
|
||||||
|
require.NoError(t, hub.Save(userSettings))
|
||||||
|
|
||||||
|
systemCollection, err := hub.FindCollectionByNameOrId("systems")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := core.NewRecord(systemCollection)
|
||||||
|
system.Set("name", "test-system")
|
||||||
|
system.Set("status", "up")
|
||||||
|
system.Set("host", "127.0.0.1")
|
||||||
|
system.Set("users", []string{user.Id})
|
||||||
|
require.NoError(t, hub.Save(system))
|
||||||
|
|
||||||
|
alertCollection, err := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
require.NoError(t, err)
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", false)
|
||||||
|
alert.Set("min", 1)
|
||||||
|
require.NoError(t, hub.Save(alert))
|
||||||
|
|
||||||
|
initialEmailCount := hub.TestMailer.TotalSend()
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("down", system))
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "down transition should register a pending alert immediately")
|
||||||
|
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("up", system))
|
||||||
|
assert.Zero(t, am.GetPendingAlertsCount(), "recovery should cancel the pending down alert")
|
||||||
|
assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "recovery notification should not be sent before a down alert triggers")
|
||||||
|
|
||||||
|
alertRecord, err := hub.FindRecordById("alerts", alert.Id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.False(t, alertRecord.GetBool("triggered"), "alert should remain untriggered when downtime never matured")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusAlertTimerCancellationPreventsBoundaryDelivery(t *testing.T) {
|
||||||
|
synctest.Test(t, func(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
|
||||||
|
require.NoError(t, err)
|
||||||
|
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
|
||||||
|
require.NoError(t, hub.Save(userSettings))
|
||||||
|
|
||||||
|
systemCollection, err := hub.FindCollectionByNameOrId("systems")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := core.NewRecord(systemCollection)
|
||||||
|
system.Set("name", "test-system")
|
||||||
|
system.Set("status", "up")
|
||||||
|
system.Set("host", "127.0.0.1")
|
||||||
|
system.Set("users", []string{user.Id})
|
||||||
|
require.NoError(t, hub.Save(system))
|
||||||
|
|
||||||
|
alertCollection, err := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
require.NoError(t, err)
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", false)
|
||||||
|
alert.Set("min", 1)
|
||||||
|
require.NoError(t, hub.Save(alert))
|
||||||
|
|
||||||
|
initialEmailCount := hub.TestMailer.TotalSend()
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("down", system))
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "down transition should register a pending alert immediately")
|
||||||
|
require.True(t, am.ResetPendingAlertTimer(alert.Id, 25*time.Millisecond), "test should shorten the pending alert timer")
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("up", system))
|
||||||
|
assert.Zero(t, am.GetPendingAlertsCount(), "recovery should remove the pending alert before the timer callback runs")
|
||||||
|
|
||||||
|
time.Sleep(40 * time.Millisecond)
|
||||||
|
assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "timer callback should not deliver after recovery cancels the pending alert")
|
||||||
|
|
||||||
|
alertRecord, err := hub.FindRecordById("alerts", alert.Id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.False(t, alertRecord.GetBool("triggered"), "alert should remain untriggered when cancellation wins the timer race")
|
||||||
|
|
||||||
|
time.Sleep(time.Minute)
|
||||||
|
synctest.Wait()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusAlertDownFiresAfterDelayExpires(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
|
||||||
|
require.NoError(t, err)
|
||||||
|
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
|
||||||
|
require.NoError(t, hub.Save(userSettings))
|
||||||
|
|
||||||
|
systemCollection, err := hub.FindCollectionByNameOrId("systems")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := core.NewRecord(systemCollection)
|
||||||
|
system.Set("name", "test-system")
|
||||||
|
system.Set("status", "up")
|
||||||
|
system.Set("host", "127.0.0.1")
|
||||||
|
system.Set("users", []string{user.Id})
|
||||||
|
require.NoError(t, hub.Save(system))
|
||||||
|
|
||||||
|
alertCollection, err := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
require.NoError(t, err)
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", false)
|
||||||
|
alert.Set("min", 1)
|
||||||
|
require.NoError(t, hub.Save(alert))
|
||||||
|
|
||||||
|
initialEmailCount := hub.TestMailer.TotalSend()
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("down", system))
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "alert should be pending after system goes down")
|
||||||
|
|
||||||
|
// Expire the pending alert and process it
|
||||||
|
am.ForceExpirePendingAlerts()
|
||||||
|
processed, err := am.ProcessPendingAlerts()
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, processed, 1, "one alert should have been processed")
|
||||||
|
assert.Equal(t, 0, am.GetPendingAlertsCount(), "pending alert should be consumed after processing")
|
||||||
|
|
||||||
|
// Verify down email was sent
|
||||||
|
assert.Equal(t, initialEmailCount+1, hub.TestMailer.TotalSend(), "down notification should be sent after delay expires")
|
||||||
|
|
||||||
|
// Verify triggered flag is set in the DB
|
||||||
|
alertRecord, err := hub.FindRecordById("alerts", alert.Id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, alertRecord.GetBool("triggered"), "alert should be marked triggered after downtime matures")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusAlertDuplicateDownCallIsIdempotent(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
|
||||||
|
require.NoError(t, err)
|
||||||
|
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
|
||||||
|
require.NoError(t, hub.Save(userSettings))
|
||||||
|
|
||||||
|
systemCollection, err := hub.FindCollectionByNameOrId("systems")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := core.NewRecord(systemCollection)
|
||||||
|
system.Set("name", "test-system")
|
||||||
|
system.Set("status", "up")
|
||||||
|
system.Set("host", "127.0.0.1")
|
||||||
|
system.Set("users", []string{user.Id})
|
||||||
|
require.NoError(t, hub.Save(system))
|
||||||
|
|
||||||
|
alertCollection, err := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
require.NoError(t, err)
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", false)
|
||||||
|
alert.Set("min", 5)
|
||||||
|
require.NoError(t, hub.Save(alert))
|
||||||
|
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("down", system))
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("down", system))
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("down", system))
|
||||||
|
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "repeated down calls should not schedule duplicate pending alerts")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusAlertNoAlertRecord(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
systemCollection, err := hub.FindCollectionByNameOrId("systems")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := core.NewRecord(systemCollection)
|
||||||
|
system.Set("name", "test-system")
|
||||||
|
system.Set("status", "up")
|
||||||
|
system.Set("host", "127.0.0.1")
|
||||||
|
system.Set("users", []string{user.Id})
|
||||||
|
require.NoError(t, hub.Save(system))
|
||||||
|
|
||||||
|
// No Status alert record created for this system
|
||||||
|
initialEmailCount := hub.TestMailer.TotalSend()
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("down", system))
|
||||||
|
assert.Equal(t, 0, am.GetPendingAlertsCount(), "no pending alert when no alert record exists")
|
||||||
|
|
||||||
|
require.NoError(t, am.HandleStatusAlerts("up", system))
|
||||||
|
assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "no email when no alert record exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRestorePendingStatusAlertsRequeuesDownSystemsAfterRestart(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
userSettings, err := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
|
||||||
|
require.NoError(t, err)
|
||||||
|
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
|
||||||
|
require.NoError(t, hub.Save(userSettings))
|
||||||
|
|
||||||
|
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "down")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := systems[0]
|
||||||
|
|
||||||
|
alertCollection, err := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
require.NoError(t, err)
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", false)
|
||||||
|
alert.Set("min", 1)
|
||||||
|
require.NoError(t, hub.Save(alert))
|
||||||
|
|
||||||
|
initialEmailCount := hub.TestMailer.TotalSend()
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
|
||||||
|
require.NoError(t, am.RestorePendingStatusAlerts())
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "startup restore should requeue a pending down alert for a system still marked down")
|
||||||
|
|
||||||
|
am.ForceExpirePendingAlerts()
|
||||||
|
processed, err := am.ProcessPendingAlerts()
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, processed, 1, "restored pending alert should be processable after the delay expires")
|
||||||
|
assert.Equal(t, initialEmailCount+1, hub.TestMailer.TotalSend(), "restored pending alert should send the down notification")
|
||||||
|
|
||||||
|
alertRecord, err := hub.FindRecordById("alerts", alert.Id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, alertRecord.GetBool("triggered"), "restored pending alert should mark the alert as triggered once delivered")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRestorePendingStatusAlertsSkipsNonDownOrAlreadyTriggeredAlerts(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
systemsDown, err := beszelTests.CreateSystems(hub, 2, user.Id, "down")
|
||||||
|
require.NoError(t, err)
|
||||||
|
systemDownPending := systemsDown[0]
|
||||||
|
systemDownTriggered := systemsDown[1]
|
||||||
|
|
||||||
|
systemUp, err := beszelTests.CreateRecord(hub, "systems", map[string]any{
|
||||||
|
"name": "up-system",
|
||||||
|
"users": []string{user.Id},
|
||||||
|
"host": "127.0.0.2",
|
||||||
|
"status": "up",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
||||||
|
"name": "Status",
|
||||||
|
"system": systemDownPending.Id,
|
||||||
|
"user": user.Id,
|
||||||
|
"min": 1,
|
||||||
|
"triggered": false,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
||||||
|
"name": "Status",
|
||||||
|
"system": systemUp.Id,
|
||||||
|
"user": user.Id,
|
||||||
|
"min": 1,
|
||||||
|
"triggered": false,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
||||||
|
"name": "Status",
|
||||||
|
"system": systemDownTriggered.Id,
|
||||||
|
"user": user.Id,
|
||||||
|
"min": 1,
|
||||||
|
"triggered": true,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
require.NoError(t, am.RestorePendingStatusAlerts())
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "only untriggered alerts for currently down systems should be restored")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRestorePendingStatusAlertsIsIdempotent(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "down")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := systems[0]
|
||||||
|
|
||||||
|
_, err = beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
||||||
|
"name": "Status",
|
||||||
|
"system": system.Id,
|
||||||
|
"user": user.Id,
|
||||||
|
"min": 1,
|
||||||
|
"triggered": false,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
am := alerts.NewTestAlertManagerWithoutWorker(hub)
|
||||||
|
require.NoError(t, am.RestorePendingStatusAlerts())
|
||||||
|
require.NoError(t, am.RestorePendingStatusAlerts())
|
||||||
|
|
||||||
|
assert.Equal(t, 1, am.GetPendingAlertsCount(), "restoring twice should not create duplicate pending alerts")
|
||||||
|
am.ForceExpirePendingAlerts()
|
||||||
|
processed, err := am.ProcessPendingAlerts()
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, processed, 1, "restored alert should still be processable exactly once")
|
||||||
|
assert.Zero(t, am.GetPendingAlertsCount(), "processing the restored alert should empty the pending map")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveStatusAlertsFixesStaleTriggered(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
// CreateSystems uses SaveNoValidate after initial save to bypass the
|
||||||
|
// onRecordCreate hook that forces status = "pending".
|
||||||
|
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up")
|
||||||
|
require.NoError(t, err)
|
||||||
|
system := systems[0]
|
||||||
|
|
||||||
|
alertCollection, err := hub.FindCollectionByNameOrId("alerts")
|
||||||
|
require.NoError(t, err)
|
||||||
|
alert := core.NewRecord(alertCollection)
|
||||||
|
alert.Set("user", user.Id)
|
||||||
|
alert.Set("system", system.Id)
|
||||||
|
alert.Set("name", "Status")
|
||||||
|
alert.Set("triggered", true) // Stale: system is up but alert still says triggered
|
||||||
|
require.NoError(t, hub.Save(alert))
|
||||||
|
|
||||||
|
// resolveStatusAlerts should clear the stale triggered flag
|
||||||
|
require.NoError(t, alerts.ResolveStatusAlerts(hub))
|
||||||
|
|
||||||
|
alertRecord, err := hub.FindRecordById("alerts", alert.Id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.False(t, alertRecord.GetBool("triggered"), "stale triggered flag should be cleared when system is up")
|
||||||
|
}
|
||||||
|
func TestResolveStatusAlerts(t *testing.T) {
|
||||||
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
|
defer hub.Cleanup()
|
||||||
|
|
||||||
|
// Create a systemUp
|
||||||
|
systemUp, err := beszelTests.CreateRecord(hub, "systems", map[string]any{
|
||||||
|
"name": "test-system",
|
||||||
|
"users": []string{user.Id},
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"status": "up",
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
systemDown, err := beszelTests.CreateRecord(hub, "systems", map[string]any{
|
||||||
|
"name": "test-system-2",
|
||||||
|
"users": []string{user.Id},
|
||||||
|
"host": "127.0.0.2",
|
||||||
|
"status": "up",
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Create a status alertUp for the system
|
||||||
|
alertUp, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
||||||
|
"name": "Status",
|
||||||
|
"system": systemUp.Id,
|
||||||
|
"user": user.Id,
|
||||||
|
"min": 1,
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
alertDown, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
||||||
|
"name": "Status",
|
||||||
|
"system": systemDown.Id,
|
||||||
|
"user": user.Id,
|
||||||
|
"min": 1,
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify alert is not triggered initially
|
||||||
|
assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered initially")
|
||||||
|
|
||||||
|
// Set the system to 'up' (this should not trigger the alert)
|
||||||
|
systemUp.Set("status", "up")
|
||||||
|
err = hub.SaveNoValidate(systemUp)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
systemDown.Set("status", "down")
|
||||||
|
err = hub.SaveNoValidate(systemDown)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Wait a moment for any processing
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// Verify alertUp is still not triggered after setting system to up
|
||||||
|
alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered when system is up")
|
||||||
|
|
||||||
|
// Manually set both alerts triggered to true
|
||||||
|
alertUp.Set("triggered", true)
|
||||||
|
err = hub.SaveNoValidate(alertUp)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
alertDown.Set("triggered", true)
|
||||||
|
err = hub.SaveNoValidate(alertDown)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify we have exactly one alert with triggered true
|
||||||
|
triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, 2, triggeredCount, "Should have exactly two alerts with triggered true")
|
||||||
|
|
||||||
|
// Verify the specific alertUp is triggered
|
||||||
|
alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, alertUp.GetBool("triggered"), "Alert should be triggered")
|
||||||
|
|
||||||
|
// Verify we have two unresolved alert history records
|
||||||
|
alertHistoryCount, err := hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, 2, alertHistoryCount, "Should have exactly two unresolved alert history records")
|
||||||
|
|
||||||
|
err = alerts.ResolveStatusAlerts(hub)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify alertUp is not triggered after resolving
|
||||||
|
alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered after resolving")
|
||||||
|
// Verify alertDown is still triggered
|
||||||
|
alertDown, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertDown.Id})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, alertDown.GetBool("triggered"), "Alert should still be triggered after resolving")
|
||||||
|
|
||||||
|
// Verify we have one unresolved alert history record
|
||||||
|
alertHistoryCount, err = hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record")
|
||||||
|
|
||||||
|
}
|
||||||
@@ -12,7 +12,6 @@ import (
|
|||||||
"testing/synctest"
|
"testing/synctest"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/henrygd/beszel/internal/alerts"
|
|
||||||
beszelTests "github.com/henrygd/beszel/internal/tests"
|
beszelTests "github.com/henrygd/beszel/internal/tests"
|
||||||
|
|
||||||
"github.com/pocketbase/dbx"
|
"github.com/pocketbase/dbx"
|
||||||
@@ -369,87 +368,6 @@ func TestUserAlertsApi(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStatusAlerts(t *testing.T) {
|
|
||||||
synctest.Test(t, func(t *testing.T) {
|
|
||||||
hub, user := beszelTests.GetHubWithUser(t)
|
|
||||||
defer hub.Cleanup()
|
|
||||||
|
|
||||||
systems, err := beszelTests.CreateSystems(hub, 4, user.Id, "paused")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
var alerts []*core.Record
|
|
||||||
for i, system := range systems {
|
|
||||||
alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
|
||||||
"name": "Status",
|
|
||||||
"system": system.Id,
|
|
||||||
"user": user.Id,
|
|
||||||
"min": i + 1,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
alerts = append(alerts, alert)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
for _, alert := range alerts {
|
|
||||||
assert.False(t, alert.GetBool("triggered"), "Alert should not be triggered immediately")
|
|
||||||
}
|
|
||||||
if hub.TestMailer.TotalSend() != 0 {
|
|
||||||
assert.Zero(t, hub.TestMailer.TotalSend(), "Expected 0 messages, got %d", hub.TestMailer.TotalSend())
|
|
||||||
}
|
|
||||||
for _, system := range systems {
|
|
||||||
assert.EqualValues(t, "paused", system.GetString("status"), "System should be paused")
|
|
||||||
}
|
|
||||||
for _, system := range systems {
|
|
||||||
system.Set("status", "up")
|
|
||||||
err = hub.SaveNoValidate(system)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map")
|
|
||||||
for _, system := range systems {
|
|
||||||
system.Set("status", "down")
|
|
||||||
err = hub.SaveNoValidate(system)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
// after 30 seconds, should have 4 alerts in the pendingAlerts map, no triggered alerts
|
|
||||||
time.Sleep(time.Second * 30)
|
|
||||||
assert.EqualValues(t, 4, hub.GetPendingAlertsCount(), "should have 4 alerts in the pendingAlerts map")
|
|
||||||
triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 0, triggeredCount, "should have 0 alert triggered")
|
|
||||||
assert.EqualValues(t, 0, hub.TestMailer.TotalSend(), "should have 0 messages sent")
|
|
||||||
// after 1:30 seconds, should have 1 triggered alert and 3 pending alerts
|
|
||||||
time.Sleep(time.Second * 60)
|
|
||||||
assert.EqualValues(t, 3, hub.GetPendingAlertsCount(), "should have 3 alerts in the pendingAlerts map")
|
|
||||||
triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 1, triggeredCount, "should have 1 alert triggered")
|
|
||||||
assert.EqualValues(t, 1, hub.TestMailer.TotalSend(), "should have 1 messages sent")
|
|
||||||
// after 2:30 seconds, should have 2 triggered alerts and 2 pending alerts
|
|
||||||
time.Sleep(time.Second * 60)
|
|
||||||
assert.EqualValues(t, 2, hub.GetPendingAlertsCount(), "should have 2 alerts in the pendingAlerts map")
|
|
||||||
triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 2, triggeredCount, "should have 2 alert triggered")
|
|
||||||
assert.EqualValues(t, 2, hub.TestMailer.TotalSend(), "should have 2 messages sent")
|
|
||||||
// now we will bring the remaning systems back up
|
|
||||||
for _, system := range systems {
|
|
||||||
system.Set("status", "up")
|
|
||||||
err = hub.SaveNoValidate(system)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
// should have 0 alerts in the pendingAlerts map and 0 alerts triggered
|
|
||||||
assert.EqualValues(t, 0, hub.GetPendingAlertsCount(), "should have 0 alerts in the pendingAlerts map")
|
|
||||||
triggeredCount, err = hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Zero(t, triggeredCount, "should have 0 alert triggered")
|
|
||||||
// 4 messages sent, 2 down alerts and 2 up alerts for first 2 systems
|
|
||||||
assert.EqualValues(t, 4, hub.TestMailer.TotalSend(), "should have 4 messages sent")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAlertsHistory(t *testing.T) {
|
func TestAlertsHistory(t *testing.T) {
|
||||||
synctest.Test(t, func(t *testing.T) {
|
synctest.Test(t, func(t *testing.T) {
|
||||||
hub, user := beszelTests.GetHubWithUser(t)
|
hub, user := beszelTests.GetHubWithUser(t)
|
||||||
@@ -578,102 +496,3 @@ func TestAlertsHistory(t *testing.T) {
|
|||||||
assert.EqualValues(t, 2, totalHistoryCount, "Should have 2 total alert history records")
|
assert.EqualValues(t, 2, totalHistoryCount, "Should have 2 total alert history records")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
func TestResolveStatusAlerts(t *testing.T) {
|
|
||||||
hub, user := beszelTests.GetHubWithUser(t)
|
|
||||||
defer hub.Cleanup()
|
|
||||||
|
|
||||||
// Create a systemUp
|
|
||||||
systemUp, err := beszelTests.CreateRecord(hub, "systems", map[string]any{
|
|
||||||
"name": "test-system",
|
|
||||||
"users": []string{user.Id},
|
|
||||||
"host": "127.0.0.1",
|
|
||||||
"status": "up",
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
systemDown, err := beszelTests.CreateRecord(hub, "systems", map[string]any{
|
|
||||||
"name": "test-system-2",
|
|
||||||
"users": []string{user.Id},
|
|
||||||
"host": "127.0.0.2",
|
|
||||||
"status": "up",
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// Create a status alertUp for the system
|
|
||||||
alertUp, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
|
||||||
"name": "Status",
|
|
||||||
"system": systemUp.Id,
|
|
||||||
"user": user.Id,
|
|
||||||
"min": 1,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
alertDown, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
|
|
||||||
"name": "Status",
|
|
||||||
"system": systemDown.Id,
|
|
||||||
"user": user.Id,
|
|
||||||
"min": 1,
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// Verify alert is not triggered initially
|
|
||||||
assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered initially")
|
|
||||||
|
|
||||||
// Set the system to 'up' (this should not trigger the alert)
|
|
||||||
systemUp.Set("status", "up")
|
|
||||||
err = hub.SaveNoValidate(systemUp)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
systemDown.Set("status", "down")
|
|
||||||
err = hub.SaveNoValidate(systemDown)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// Wait a moment for any processing
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
// Verify alertUp is still not triggered after setting system to up
|
|
||||||
alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered when system is up")
|
|
||||||
|
|
||||||
// Manually set both alerts triggered to true
|
|
||||||
alertUp.Set("triggered", true)
|
|
||||||
err = hub.SaveNoValidate(alertUp)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
alertDown.Set("triggered", true)
|
|
||||||
err = hub.SaveNoValidate(alertDown)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// Verify we have exactly one alert with triggered true
|
|
||||||
triggeredCount, err := hub.CountRecords("alerts", dbx.HashExp{"triggered": true})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 2, triggeredCount, "Should have exactly two alerts with triggered true")
|
|
||||||
|
|
||||||
// Verify the specific alertUp is triggered
|
|
||||||
alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.True(t, alertUp.GetBool("triggered"), "Alert should be triggered")
|
|
||||||
|
|
||||||
// Verify we have two unresolved alert history records
|
|
||||||
alertHistoryCount, err := hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 2, alertHistoryCount, "Should have exactly two unresolved alert history records")
|
|
||||||
|
|
||||||
err = alerts.ResolveStatusAlerts(hub)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// Verify alertUp is not triggered after resolving
|
|
||||||
alertUp, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertUp.Id})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.False(t, alertUp.GetBool("triggered"), "Alert should not be triggered after resolving")
|
|
||||||
// Verify alertDown is still triggered
|
|
||||||
alertDown, err = hub.FindFirstRecordByFilter("alerts", "id={:id}", dbx.Params{"id": alertDown.Id})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.True(t, alertDown.GetBool("triggered"), "Alert should still be triggered after resolving")
|
|
||||||
|
|
||||||
// Verify we have one unresolved alert history record
|
|
||||||
alertHistoryCount, err = hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record")
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -9,6 +9,12 @@ import (
|
|||||||
"github.com/pocketbase/pocketbase/core"
|
"github.com/pocketbase/pocketbase/core"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func NewTestAlertManagerWithoutWorker(app hubLike) *AlertManager {
|
||||||
|
return &AlertManager{
|
||||||
|
hub: app,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (am *AlertManager) GetAlertManager() *AlertManager {
|
func (am *AlertManager) GetAlertManager() *AlertManager {
|
||||||
return am
|
return am
|
||||||
}
|
}
|
||||||
@@ -34,12 +40,11 @@ func (am *AlertManager) ProcessPendingAlerts() ([]*core.Record, error) {
|
|||||||
am.pendingAlerts.Range(func(key, value any) bool {
|
am.pendingAlerts.Range(func(key, value any) bool {
|
||||||
info := value.(*alertInfo)
|
info := value.(*alertInfo)
|
||||||
if now.After(info.expireTime) {
|
if now.After(info.expireTime) {
|
||||||
// Downtime delay has passed, process alert
|
if info.timer != nil {
|
||||||
if err := am.sendStatusAlert("down", info.systemName, info.alertRecord); err != nil {
|
info.timer.Stop()
|
||||||
lastErr = err
|
|
||||||
}
|
}
|
||||||
|
am.processPendingAlert(key.(string))
|
||||||
processedAlerts = append(processedAlerts, info.alertRecord)
|
processedAlerts = append(processedAlerts, info.alertRecord)
|
||||||
am.pendingAlerts.Delete(key)
|
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
@@ -56,6 +61,27 @@ func (am *AlertManager) ForceExpirePendingAlerts() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (am *AlertManager) ResetPendingAlertTimer(alertID string, delay time.Duration) bool {
|
||||||
|
value, loaded := am.pendingAlerts.Load(alertID)
|
||||||
|
if !loaded {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
info := value.(*alertInfo)
|
||||||
|
if info.timer != nil {
|
||||||
|
info.timer.Stop()
|
||||||
|
}
|
||||||
|
info.expireTime = time.Now().Add(delay)
|
||||||
|
info.timer = time.AfterFunc(delay, func() {
|
||||||
|
am.processPendingAlert(alertID)
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func ResolveStatusAlerts(app core.App) error {
|
func ResolveStatusAlerts(app core.App) error {
|
||||||
return resolveStatusAlerts(app)
|
return resolveStatusAlerts(app)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (am *AlertManager) RestorePendingStatusAlerts() error {
|
||||||
|
return am.restorePendingStatusAlerts()
|
||||||
|
}
|
||||||
|
|||||||
@@ -98,7 +98,7 @@ func ClearCollection(t testing.TB, app core.App, collectionName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *TestHub) Cleanup() {
|
func (h *TestHub) Cleanup() {
|
||||||
h.GetAlertManager().StopWorker()
|
h.GetAlertManager().Stop()
|
||||||
h.GetSystemManager().RemoveAllSystems()
|
h.GetSystemManager().RemoveAllSystems()
|
||||||
h.TestApp.Cleanup()
|
h.TestApp.Cleanup()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user