refactor: add alertsCache to maintain active alert data in memory

This commit is contained in:
henrygd
2026-03-17 18:32:57 -04:00
parent 48ddc96a0d
commit c6c3950fb0
8 changed files with 628 additions and 57 deletions

View File

@@ -23,6 +23,7 @@ type AlertManager struct {
hub hubLike
stopOnce sync.Once
pendingAlerts sync.Map
alertsCache *AlertsCache
}
type AlertMessageData struct {
@@ -63,7 +64,7 @@ type SystemAlertGPUData struct {
type SystemAlertData struct {
systemRecord *core.Record
alertRecord *core.Record
alertRecord CachedAlertData
name string
unit string
val float64
@@ -97,7 +98,8 @@ var supportsTitle = map[string]struct{}{
// NewAlertManager creates a new AlertManager instance.
func NewAlertManager(app hubLike) *AlertManager {
am := &AlertManager{
hub: app,
hub: app,
alertsCache: NewAlertsCache(app),
}
am.bindEvents()
return am
@@ -110,6 +112,9 @@ func (am *AlertManager) bindEvents() {
am.hub.OnRecordAfterUpdateSuccess("smart_devices").BindFunc(am.handleSmartDeviceAlert)
am.hub.OnServe().BindFunc(func(e *core.ServeEvent) error {
// Populate all alerts into cache on startup
_ = am.alertsCache.PopulateFromDB(true)
if err := resolveStatusAlerts(e.App); err != nil {
e.App.Logger().Error("Failed to resolve stale status alerts", "err", err)
}
@@ -311,3 +316,13 @@ func (am *AlertManager) SendTestNotification(e *core.RequestEvent) error {
}
return e.JSON(200, map[string]bool{"err": false})
}
// setAlertTriggered updates the "triggered" status of an alert record in the database
func (am *AlertManager) setAlertTriggered(alert CachedAlertData, triggered bool) error {
alertRecord, err := am.hub.FindRecordById("alerts", alert.Id)
if err != nil {
return err
}
alertRecord.Set("triggered", triggered)
return am.hub.Save(alertRecord)
}

View File

@@ -5,13 +5,12 @@ import (
"strings"
"time"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
)
type alertInfo struct {
systemName string
alertRecord *core.Record
alertRecord CachedAlertData
expireTime time.Time
timer *time.Timer
}
@@ -36,10 +35,7 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, systemRecord *core.
return nil
}
alertRecords, err := am.getSystemStatusAlerts(systemRecord.Id)
if err != nil {
return err
}
alertRecords := am.alertsCache.GetAlertsByName(systemRecord.Id, "Status")
if len(alertRecords) == 0 {
return nil
}
@@ -53,29 +49,17 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, systemRecord *core.
return nil
}
// getSystemStatusAlerts retrieves all "Status" alert records for a given system ID.
func (am *AlertManager) getSystemStatusAlerts(systemID string) ([]*core.Record, error) {
alertRecords, err := am.hub.FindAllRecords("alerts", dbx.HashExp{
"system": systemID,
"name": "Status",
})
if err != nil {
return nil, err
}
return alertRecords, nil
}
// handleSystemDown manages the logic when a system status changes to "down". It schedules pending alerts for each alert record.
func (am *AlertManager) handleSystemDown(systemName string, alertRecords []*core.Record) {
func (am *AlertManager) handleSystemDown(systemName string, alertRecords []CachedAlertData) {
for _, alertRecord := range alertRecords {
min := max(1, alertRecord.GetInt("min"))
min := max(1, int(alertRecord.Min))
am.schedulePendingStatusAlert(systemName, alertRecord, time.Duration(min)*time.Minute)
}
}
// schedulePendingStatusAlert sets up a timer to send a "down" alert after the specified delay if the system is still down.
// It returns true if the alert was scheduled, or false if an alert was already pending for the given alert record.
func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecord *core.Record, delay time.Duration) bool {
func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecord CachedAlertData, delay time.Duration) bool {
alert := &alertInfo{
systemName: systemName,
alertRecord: alertRecord,
@@ -96,13 +80,13 @@ func (am *AlertManager) schedulePendingStatusAlert(systemName string, alertRecor
// handleSystemUp manages the logic when a system status changes to "up".
// It cancels any pending alerts and sends "up" alerts.
func (am *AlertManager) handleSystemUp(systemName string, alertRecords []*core.Record) {
func (am *AlertManager) handleSystemUp(systemName string, alertRecords []CachedAlertData) {
for _, alertRecord := range alertRecords {
// If alert exists for record, delete and continue (down alert not sent)
if am.cancelPendingAlert(alertRecord.Id) {
continue
}
if !alertRecord.GetBool("triggered") {
if !alertRecord.Triggered {
continue
}
if err := am.sendStatusAlert("up", systemName, alertRecord); err != nil {
@@ -133,23 +117,22 @@ func (am *AlertManager) processPendingAlert(alertID string) {
}
info := value.(*alertInfo)
if info.alertRecord.GetBool("triggered") {
alertRecord, ok := am.alertsCache.Refresh(info.alertRecord)
if !ok || alertRecord.Triggered {
return
}
if err := am.sendStatusAlert("down", info.systemName, info.alertRecord); err != nil {
if err := am.sendStatusAlert("down", info.systemName, alertRecord); err != nil {
am.hub.Logger().Error("Failed to send alert", "err", err)
}
}
// sendStatusAlert sends a status alert ("up" or "down") to the users associated with the alert records.
func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, alertRecord *core.Record) error {
switch alertStatus {
case "up":
alertRecord.Set("triggered", false)
case "down":
alertRecord.Set("triggered", true)
func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, alertRecord CachedAlertData) error {
// Update trigger state for alert record before sending alert
triggered := alertStatus == "down"
if err := am.setAlertTriggered(alertRecord, triggered); err != nil {
return err
}
am.hub.Save(alertRecord)
var emoji string
if alertStatus == "up" {
@@ -162,10 +145,10 @@ func (am *AlertManager) sendStatusAlert(alertStatus string, systemName string, a
message := strings.TrimSuffix(title, emoji)
// Get system ID for the link
systemID := alertRecord.GetString("system")
systemID := alertRecord.SystemID
return am.SendAlert(AlertMessageData{
UserID: alertRecord.GetString("user"),
UserID: alertRecord.UserID,
SystemID: systemID,
Title: title,
Message: message,
@@ -211,12 +194,13 @@ func resolveStatusAlerts(app core.App) error {
func (am *AlertManager) restorePendingStatusAlerts() error {
type pendingStatusAlert struct {
AlertID string `db:"alert_id"`
SystemID string `db:"system_id"`
SystemName string `db:"system_name"`
}
var pending []pendingStatusAlert
err := am.hub.DB().NewQuery(`
SELECT a.id AS alert_id, s.name AS system_name
SELECT a.id AS alert_id, a.system AS system_id, s.name AS system_name
FROM alerts a
JOIN systems s ON a.system = s.id
WHERE a.name = 'Status'
@@ -227,12 +211,15 @@ func (am *AlertManager) restorePendingStatusAlerts() error {
return err
}
// Make sure cache is populated before trying to restore pending alerts
_ = am.alertsCache.PopulateFromDB(false)
for _, item := range pending {
alertRecord, err := am.hub.FindRecordById("alerts", item.AlertID)
if err != nil {
return err
alertRecord, ok := am.alertsCache.GetAlert(item.SystemID, item.AlertID)
if !ok {
continue
}
min := max(1, alertRecord.GetInt("min"))
min := max(1, int(alertRecord.Min))
am.schedulePendingStatusAlert(item.SystemName, alertRecord, time.Duration(min)*time.Minute)
}

View File

@@ -626,3 +626,130 @@ func TestResolveStatusAlerts(t *testing.T) {
assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record")
}
func TestAlertsHistoryStatus(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
hub, user := beszelTests.GetHubWithUser(t)
defer hub.Cleanup()
// Create a system
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up")
assert.NoError(t, err)
system := systems[0]
// Create a status alertRecord for the system
alertRecord, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "Status",
"system": system.Id,
"user": user.Id,
"min": 1,
})
assert.NoError(t, err)
// Verify alert is not triggered initially
assert.False(t, alertRecord.GetBool("triggered"), "Alert should not be triggered initially")
// Set the system to 'down' (this should trigger the alert)
system.Set("status", "down")
err = hub.Save(system)
assert.NoError(t, err)
time.Sleep(time.Second * 30)
synctest.Wait()
alertFresh, _ := hub.FindRecordById("alerts", alertRecord.Id)
assert.False(t, alertFresh.GetBool("triggered"), "Alert should not be triggered after 30 seconds")
time.Sleep(time.Minute)
synctest.Wait()
// Verify alert is triggered after setting system to down
alertFresh, err = hub.FindRecordById("alerts", alertRecord.Id)
assert.NoError(t, err)
assert.True(t, alertFresh.GetBool("triggered"), "Alert should be triggered after one minute")
// Verify we have one unresolved alert history record
alertHistoryCount, err := hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""})
assert.NoError(t, err)
assert.EqualValues(t, 1, alertHistoryCount, "Should have exactly one unresolved alert history record")
// Set the system back to 'up' (this should resolve the alert)
system.Set("status", "up")
err = hub.Save(system)
assert.NoError(t, err)
time.Sleep(time.Second)
synctest.Wait()
// Verify alert is not triggered after setting system back to up
alertFresh, err = hub.FindRecordById("alerts", alertRecord.Id)
assert.NoError(t, err)
assert.False(t, alertFresh.GetBool("triggered"), "Alert should not be triggered after system recovers")
// Verify the alert history record is resolved
alertHistoryCount, err = hub.CountRecords("alerts_history", dbx.HashExp{"resolved": ""})
assert.NoError(t, err)
assert.EqualValues(t, 0, alertHistoryCount, "Should have no unresolved alert history records")
})
}
func TestStatusAlertClearedBeforeSend(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
hub, user := beszelTests.GetHubWithUser(t)
defer hub.Cleanup()
// Create a system
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up")
assert.NoError(t, err)
system := systems[0]
// Ensure user settings have an email
userSettings, _ := hub.FindFirstRecordByFilter("user_settings", "user={:user}", map[string]any{"user": user.Id})
userSettings.Set("settings", `{"emails":["test@example.com"],"webhooks":[]}`)
hub.Save(userSettings)
// Initial email count
initialEmailCount := hub.TestMailer.TotalSend()
// Create a status alertRecord for the system
alertRecord, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "Status",
"system": system.Id,
"user": user.Id,
"min": 1,
})
assert.NoError(t, err)
// Verify alert is not triggered initially
assert.False(t, alertRecord.GetBool("triggered"), "Alert should not be triggered initially")
// Set the system to 'down' (this should trigger the alert)
system.Set("status", "down")
err = hub.Save(system)
assert.NoError(t, err)
time.Sleep(time.Second * 30)
synctest.Wait()
// Set system back up to clear the pending alert before it triggers
system.Set("status", "up")
err = hub.Save(system)
assert.NoError(t, err)
time.Sleep(time.Minute)
synctest.Wait()
// Verify that we have not sent any emails since the system recovered before the alert triggered
assert.Equal(t, initialEmailCount, hub.TestMailer.TotalSend(), "No email should be sent if system recovers before alert triggers")
// Verify alert is not triggered after setting system back to up
alertFresh, err := hub.FindRecordById("alerts", alertRecord.Id)
assert.NoError(t, err)
assert.False(t, alertFresh.GetBool("triggered"), "Alert should not be triggered after system recovers")
// Verify that no alert history record was created since the alert never triggered
alertHistoryCount, err := hub.CountRecords("alerts_history")
assert.NoError(t, err)
assert.EqualValues(t, 0, alertHistoryCount, "Should have no unresolved alert history records since alert never triggered")
})
}

View File

@@ -14,11 +14,8 @@ import (
)
func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *system.CombinedData) error {
alertRecords, err := am.hub.FindAllRecords("alerts",
dbx.NewExp("system={:system} AND name!='Status'", dbx.Params{"system": systemRecord.Id}),
)
if err != nil || len(alertRecords) == 0 {
// log.Println("no alerts found for system")
alertRecords := am.alertsCache.GetAlertsExcludingNames(systemRecord.Id, "Status")
if len(alertRecords) == 0 {
return nil
}
@@ -27,7 +24,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
oldestTime := now
for _, alertRecord := range alertRecords {
name := alertRecord.GetString("name")
name := alertRecord.Name
var val float64
unit := "%"
@@ -72,8 +69,8 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
val = float64(data.Stats.Battery[0])
}
triggered := alertRecord.GetBool("triggered")
threshold := alertRecord.GetFloat("value")
triggered := alertRecord.Triggered
threshold := alertRecord.Value
// Battery alert has inverted logic: trigger when value is BELOW threshold
lowAlert := isLowAlert(name)
@@ -91,7 +88,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
}
}
min := max(1, uint8(alertRecord.GetInt("min")))
min := max(1, alertRecord.Min)
alert := SystemAlertData{
systemRecord: systemRecord,
@@ -128,7 +125,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
Created types.DateTime `db:"created"`
}{}
err = am.hub.DB().
err := am.hub.DB().
Select("stats", "created").
From("system_stats").
Where(dbx.NewExp(
@@ -343,13 +340,12 @@ func (am *AlertManager) sendSystemAlert(alert SystemAlertData) {
}
body := fmt.Sprintf("%s averaged %.2f%s for the previous %v %s.", alert.descriptor, alert.val, alert.unit, alert.min, minutesLabel)
alert.alertRecord.Set("triggered", alert.triggered)
if err := am.hub.Save(alert.alertRecord); err != nil {
if err := am.setAlertTriggered(alert.alertRecord, alert.triggered); err != nil {
// app.Logger().Error("failed to save alert record", "err", err)
return
}
am.SendAlert(AlertMessageData{
UserID: alert.alertRecord.GetString("user"),
UserID: alert.alertRecord.UserID,
SystemID: alert.systemRecord.Id,
Title: subject,
Message: body,

View File

@@ -0,0 +1,177 @@
package alerts
import (
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/store"
)
// CachedAlertData represents the relevant fields of an alert record for status checking and updates.
type CachedAlertData struct {
Id string
SystemID string
UserID string
Name string
Value float64
Triggered bool
Min uint8
// Created types.DateTime
}
func (a *CachedAlertData) PopulateFromRecord(record *core.Record) {
a.Id = record.Id
a.SystemID = record.GetString("system")
a.UserID = record.GetString("user")
a.Name = record.GetString("name")
a.Value = record.GetFloat("value")
a.Triggered = record.GetBool("triggered")
a.Min = uint8(record.GetInt("min"))
// a.Created = record.GetDateTime("created")
}
// AlertsCache provides an in-memory cache for system alerts.
type AlertsCache struct {
app core.App
store *store.Store[string, *store.Store[string, CachedAlertData]]
populated bool
}
// NewAlertsCache creates a new instance of SystemAlertsCache.
func NewAlertsCache(app core.App) *AlertsCache {
c := AlertsCache{
app: app,
store: store.New(map[string]*store.Store[string, CachedAlertData]{}),
}
return c.bindEvents()
}
// bindEvents sets up event listeners to keep the cache in sync with database changes.
func (c *AlertsCache) bindEvents() *AlertsCache {
c.app.OnRecordAfterUpdateSuccess("alerts").BindFunc(func(e *core.RecordEvent) error {
// c.Delete(e.Record.Original()) // this would be needed if the system field on an existing alert was changed, however we don't currently allow that in the UI so we'll leave it commented out
c.Update(e.Record)
return e.Next()
})
c.app.OnRecordAfterDeleteSuccess("alerts").BindFunc(func(e *core.RecordEvent) error {
c.Delete(e.Record)
return e.Next()
})
c.app.OnRecordAfterCreateSuccess("alerts").BindFunc(func(e *core.RecordEvent) error {
c.Update(e.Record)
return e.Next()
})
return c
}
// PopulateFromDB clears current entries and loads all alerts from the database into the cache.
func (c *AlertsCache) PopulateFromDB(force bool) error {
if !force && c.populated {
return nil
}
records, err := c.app.FindAllRecords("alerts")
if err != nil {
return err
}
c.store.RemoveAll()
for _, record := range records {
c.Update(record)
}
c.populated = true
return nil
}
// Update adds or updates an alert record in the cache.
func (c *AlertsCache) Update(record *core.Record) {
systemID := record.GetString("system")
if systemID == "" {
return
}
systemStore, ok := c.store.GetOk(systemID)
if !ok {
systemStore = store.New(map[string]CachedAlertData{})
c.store.Set(systemID, systemStore)
}
var ca CachedAlertData
ca.PopulateFromRecord(record)
systemStore.Set(record.Id, ca)
}
// Delete removes an alert record from the cache.
func (c *AlertsCache) Delete(record *core.Record) {
systemID := record.GetString("system")
if systemID == "" {
return
}
if systemStore, ok := c.store.GetOk(systemID); ok {
systemStore.Remove(record.Id)
}
}
// GetSystemAlerts returns all alerts for the specified system, lazy-loading if necessary.
func (c *AlertsCache) GetSystemAlerts(systemID string) []CachedAlertData {
systemStore, ok := c.store.GetOk(systemID)
if !ok {
// Populate cache for this system
records, err := c.app.FindAllRecords("alerts", dbx.NewExp("system={:system}", dbx.Params{"system": systemID}))
if err != nil {
return nil
}
systemStore = store.New(map[string]CachedAlertData{})
for _, record := range records {
var ca CachedAlertData
ca.PopulateFromRecord(record)
systemStore.Set(record.Id, ca)
}
c.store.Set(systemID, systemStore)
}
all := systemStore.GetAll()
records := make([]CachedAlertData, 0, len(all))
for _, alert := range all {
records = append(records, alert)
}
return records
}
// GetAlert returns a specific alert by its ID from the cache.
func (c *AlertsCache) GetAlert(systemID, alertID string) (CachedAlertData, bool) {
if systemStore, ok := c.store.GetOk(systemID); ok {
return systemStore.GetOk(alertID)
}
return CachedAlertData{}, false
}
// GetAlertsByName returns all alerts of a specific type for the specified system.
func (c *AlertsCache) GetAlertsByName(systemID, alertName string) []CachedAlertData {
allAlerts := c.GetSystemAlerts(systemID)
var alertRecords []CachedAlertData
for _, record := range allAlerts {
if record.Name == alertName {
alertRecords = append(alertRecords, record)
}
}
return alertRecords
}
// GetAlertsExcludingNames returns all alerts for the specified system excluding the given types.
func (c *AlertsCache) GetAlertsExcludingNames(systemID string, excludedNames ...string) []CachedAlertData {
excludeMap := make(map[string]struct{})
for _, name := range excludedNames {
excludeMap[name] = struct{}{}
}
allAlerts := c.GetSystemAlerts(systemID)
var alertRecords []CachedAlertData
for _, record := range allAlerts {
if _, excluded := excludeMap[record.Name]; !excluded {
alertRecords = append(alertRecords, record)
}
}
return alertRecords
}
// Refresh returns the latest cached copy for an alert snapshot if it still exists.
func (c *AlertsCache) Refresh(alert CachedAlertData) (CachedAlertData, bool) {
if alert.Id == "" {
return CachedAlertData{}, false
}
return c.GetAlert(alert.SystemID, alert.Id)
}

View File

@@ -0,0 +1,215 @@
//go:build testing
package alerts_test
import (
"testing"
"github.com/henrygd/beszel/internal/alerts"
beszelTests "github.com/henrygd/beszel/internal/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSystemAlertsCachePopulateAndFilter(t *testing.T) {
hub, user := beszelTests.GetHubWithUser(t)
defer hub.Cleanup()
systems, err := beszelTests.CreateSystems(hub, 2, user.Id, "up")
require.NoError(t, err)
system1 := systems[0]
system2 := systems[1]
statusAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "Status",
"system": system1.Id,
"user": user.Id,
"min": 1,
})
require.NoError(t, err)
cpuAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "CPU",
"system": system1.Id,
"user": user.Id,
"value": 80,
"min": 1,
})
require.NoError(t, err)
memoryAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "Memory",
"system": system2.Id,
"user": user.Id,
"value": 90,
"min": 1,
})
require.NoError(t, err)
cache := alerts.NewAlertsCache(hub)
cache.PopulateFromDB(false)
statusAlerts := cache.GetAlertsByName(system1.Id, "Status")
require.Len(t, statusAlerts, 1)
assert.Equal(t, statusAlert.Id, statusAlerts[0].Id)
nonStatusAlerts := cache.GetAlertsExcludingNames(system1.Id, "Status")
require.Len(t, nonStatusAlerts, 1)
assert.Equal(t, cpuAlert.Id, nonStatusAlerts[0].Id)
system2Alerts := cache.GetSystemAlerts(system2.Id)
require.Len(t, system2Alerts, 1)
assert.Equal(t, memoryAlert.Id, system2Alerts[0].Id)
}
func TestSystemAlertsCacheLazyLoadUpdateAndDelete(t *testing.T) {
hub, user := beszelTests.GetHubWithUser(t)
defer hub.Cleanup()
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up")
require.NoError(t, err)
systemRecord := systems[0]
statusAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "Status",
"system": systemRecord.Id,
"user": user.Id,
"min": 1,
})
require.NoError(t, err)
cache := alerts.NewAlertsCache(hub)
require.Len(t, cache.GetSystemAlerts(systemRecord.Id), 1, "first lookup should lazy-load alerts for the system")
cpuAlert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "CPU",
"system": systemRecord.Id,
"user": user.Id,
"value": 80,
"min": 1,
})
require.NoError(t, err)
cache.Update(cpuAlert)
nonStatusAlerts := cache.GetAlertsExcludingNames(systemRecord.Id, "Status")
require.Len(t, nonStatusAlerts, 1)
assert.Equal(t, cpuAlert.Id, nonStatusAlerts[0].Id)
cache.Delete(statusAlert)
assert.Empty(t, cache.GetAlertsByName(systemRecord.Id, "Status"), "deleted alerts should be removed from the in-memory cache")
}
func TestSystemAlertsCacheRefreshReturnsLatestCopy(t *testing.T) {
hub, user := beszelTests.GetHubWithUser(t)
defer hub.Cleanup()
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up")
require.NoError(t, err)
system := systems[0]
alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "Status",
"system": system.Id,
"user": user.Id,
"min": 1,
"triggered": false,
})
require.NoError(t, err)
cache := alerts.NewAlertsCache(hub)
snapshot := cache.GetSystemAlerts(system.Id)[0]
assert.False(t, snapshot.Triggered)
alert.Set("triggered", true)
require.NoError(t, hub.Save(alert))
refreshed, ok := cache.Refresh(snapshot)
require.True(t, ok)
assert.Equal(t, snapshot.Id, refreshed.Id)
assert.True(t, refreshed.Triggered, "refresh should return the updated cached value rather than the stale snapshot")
require.NoError(t, hub.Delete(alert))
_, ok = cache.Refresh(snapshot)
assert.False(t, ok, "refresh should report false when the cached alert no longer exists")
}
func TestAlertManagerCacheLifecycle(t *testing.T) {
hub, user := beszelTests.GetHubWithUser(t)
defer hub.Cleanup()
systems, err := beszelTests.CreateSystems(hub, 1, user.Id, "up")
require.NoError(t, err)
system := systems[0]
// Create an alert
alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "CPU",
"system": system.Id,
"user": user.Id,
"value": 80,
"min": 1,
})
require.NoError(t, err)
am := hub.AlertManager
cache := am.GetSystemAlertsCache()
// Verify it's in cache (it should be since CreateRecord triggers the event)
assert.Len(t, cache.GetSystemAlerts(system.Id), 1)
assert.Equal(t, alert.Id, cache.GetSystemAlerts(system.Id)[0].Id)
assert.EqualValues(t, 80, cache.GetSystemAlerts(system.Id)[0].Value)
// Update the alert through PocketBase to trigger events
alert.Set("value", 85)
require.NoError(t, hub.Save(alert))
// Check if updated value is reflected (or at least that it's still there)
cachedAlerts := cache.GetSystemAlerts(system.Id)
assert.Len(t, cachedAlerts, 1)
assert.EqualValues(t, 85, cachedAlerts[0].Value)
// Delete the alert through PocketBase to trigger events
require.NoError(t, hub.Delete(alert))
// Verify it's removed from cache
assert.Empty(t, cache.GetSystemAlerts(system.Id), "alert should be removed from cache after PocketBase delete")
}
// func TestAlertManagerCacheMovesAlertToNewSystemOnUpdate(t *testing.T) {
// hub, user := beszelTests.GetHubWithUser(t)
// defer hub.Cleanup()
// systems, err := beszelTests.CreateSystems(hub, 2, user.Id, "up")
// require.NoError(t, err)
// system1 := systems[0]
// system2 := systems[1]
// alert, err := beszelTests.CreateRecord(hub, "alerts", map[string]any{
// "name": "CPU",
// "system": system1.Id,
// "user": user.Id,
// "value": 80,
// "min": 1,
// })
// require.NoError(t, err)
// am := hub.AlertManager
// cache := am.GetSystemAlertsCache()
// // Initially in system1 cache
// assert.Len(t, cache.Get(system1.Id), 1)
// assert.Empty(t, cache.Get(system2.Id))
// // Move alert to system2
// alert.Set("system", system2.Id)
// require.NoError(t, hub.Save(alert))
// // DEBUG: print if it is found
// // fmt.Printf("system1 alerts after update: %v\n", cache.Get(system1.Id))
// // Should be removed from system1 and present in system2
// assert.Empty(t, cache.GetType(system1.Id, "CPU"), "updated alerts should be evicted from the previous system cache")
// require.Len(t, cache.Get(system2.Id), 1)
// assert.Equal(t, alert.Id, cache.Get(system2.Id)[0].Id)
// }

View File

@@ -14,6 +14,7 @@ import (
beszelTests "github.com/henrygd/beszel/internal/tests"
"github.com/henrygd/beszel/internal/alerts"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
pbTests "github.com/pocketbase/pocketbase/tests"
@@ -496,3 +497,46 @@ func TestAlertsHistory(t *testing.T) {
assert.EqualValues(t, 2, totalHistoryCount, "Should have 2 total alert history records")
})
}
func TestSetAlertTriggered(t *testing.T) {
hub, _ := beszelTests.NewTestHub(t.TempDir())
defer hub.Cleanup()
hub.StartHub()
user, _ := beszelTests.CreateUser(hub, "test@example.com", "password")
system, _ := beszelTests.CreateRecord(hub, "systems", map[string]any{
"name": "test-system",
"users": []string{user.Id},
"host": "127.0.0.1",
})
alertRecord, _ := beszelTests.CreateRecord(hub, "alerts", map[string]any{
"name": "CPU",
"system": system.Id,
"user": user.Id,
"value": 80,
"triggered": false,
})
am := alerts.NewAlertManager(hub)
var alert alerts.CachedAlertData
alert.PopulateFromRecord(alertRecord)
// Test triggering the alert
err := am.SetAlertTriggered(alert, true)
assert.NoError(t, err)
updatedRecord, err := hub.FindRecordById("alerts", alert.Id)
assert.NoError(t, err)
assert.True(t, updatedRecord.GetBool("triggered"))
// Test un-triggering the alert
err = am.SetAlertTriggered(alert, false)
assert.NoError(t, err)
updatedRecord, err = hub.FindRecordById("alerts", alert.Id)
assert.NoError(t, err)
assert.False(t, updatedRecord.GetBool("triggered"))
}

View File

@@ -11,10 +11,16 @@ import (
func NewTestAlertManagerWithoutWorker(app hubLike) *AlertManager {
return &AlertManager{
hub: app,
hub: app,
alertsCache: NewAlertsCache(app),
}
}
// GetSystemAlertsCache returns the internal system alerts cache.
func (am *AlertManager) GetSystemAlertsCache() *AlertsCache {
return am.alertsCache
}
func (am *AlertManager) GetAlertManager() *AlertManager {
return am
}
@@ -33,10 +39,10 @@ func (am *AlertManager) GetPendingAlertsCount() int {
}
// ProcessPendingAlerts manually processes all expired alerts (for testing)
func (am *AlertManager) ProcessPendingAlerts() ([]*core.Record, error) {
func (am *AlertManager) ProcessPendingAlerts() ([]CachedAlertData, error) {
now := time.Now()
var lastErr error
var processedAlerts []*core.Record
var processedAlerts []CachedAlertData
am.pendingAlerts.Range(func(key, value any) bool {
info := value.(*alertInfo)
if now.After(info.expireTime) {
@@ -85,3 +91,7 @@ func ResolveStatusAlerts(app core.App) error {
func (am *AlertManager) RestorePendingStatusAlerts() error {
return am.restorePendingStatusAlerts()
}
func (am *AlertManager) SetAlertTriggered(alert CachedAlertData, triggered bool) error {
return am.setAlertTriggered(alert, triggered)
}