Compare commits

...

2 Commits

Author SHA1 Message Date
henrygd
35d0e792ad refactor(expirymap): optimize performance and add StopCleaner method 2026-03-08 19:09:41 -04:00
henrygd
654cd06b19 respect SMART_INTERVAL across agent reconnects (#1800)
Move tracking of the last SMART data fetch from individual System
instances to the SystemManager using a TTL-based ExpiryMap.

This ensures that the SMART_INTERVAL is respected even if an
agent connection is dropped and re-established, preventing
redundant data collection on every reconnect.
2026-03-08 19:03:50 -04:00
5 changed files with 91 additions and 24 deletions

View File

@@ -1,29 +1,33 @@
// Package expirymap provides a thread-safe map with expiring entries.
// It supports TTL-based expiration with both lazy cleanup on access
// and periodic background cleanup.
package expirymap package expirymap
import ( import (
"reflect" "sync"
"time" "time"
"github.com/pocketbase/pocketbase/tools/store" "github.com/pocketbase/pocketbase/tools/store"
) )
type val[T any] struct { type val[T comparable] struct {
value T value T
expires time.Time expires time.Time
} }
type ExpiryMap[T any] struct { type ExpiryMap[T comparable] struct {
store *store.Store[string, *val[T]] store *store.Store[string, *val[T]]
cleanupInterval time.Duration stopChan chan struct{}
stopOnce sync.Once
} }
// New creates a new expiry map with custom cleanup interval // New creates a new expiry map with custom cleanup interval
func New[T any](cleanupInterval time.Duration) *ExpiryMap[T] { func New[T comparable](cleanupInterval time.Duration) *ExpiryMap[T] {
m := &ExpiryMap[T]{ m := &ExpiryMap[T]{
store: store.New(map[string]*val[T]{}), store: store.New(map[string]*val[T]{}),
cleanupInterval: cleanupInterval, stopChan: make(chan struct{}),
} }
m.startCleaner() go m.startCleaner(cleanupInterval)
return m return m
} }
@@ -55,7 +59,7 @@ func (m *ExpiryMap[T]) GetOk(key string) (T, bool) {
// GetByValue retrieves a value by value // GetByValue retrieves a value by value
func (m *ExpiryMap[T]) GetByValue(val T) (key string, value T, ok bool) { func (m *ExpiryMap[T]) GetByValue(val T) (key string, value T, ok bool) {
for key, v := range m.store.GetAll() { for key, v := range m.store.GetAll() {
if reflect.DeepEqual(v.value, val) { if v.value == val {
// check if expired // check if expired
if v.expires.Before(time.Now()) { if v.expires.Before(time.Now()) {
m.store.Remove(key) m.store.Remove(key)
@@ -75,7 +79,7 @@ func (m *ExpiryMap[T]) Remove(key string) {
// RemovebyValue removes a value by value // RemovebyValue removes a value by value
func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) { func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) {
for key, val := range m.store.GetAll() { for key, val := range m.store.GetAll() {
if reflect.DeepEqual(val.value, value) { if val.value == value {
m.store.Remove(key) m.store.Remove(key)
return val.value, true return val.value, true
} }
@@ -84,13 +88,23 @@ func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) {
} }
// startCleaner runs the background cleanup process // startCleaner runs the background cleanup process
func (m *ExpiryMap[T]) startCleaner() { func (m *ExpiryMap[T]) startCleaner(interval time.Duration) {
go func() { tick := time.Tick(interval)
tick := time.Tick(m.cleanupInterval) for {
for range tick { select {
case <-tick:
m.cleanup() m.cleanup()
case <-m.stopChan:
return
} }
}() }
}
// StopCleaner stops the background cleanup process
func (m *ExpiryMap[T]) StopCleaner() {
m.stopOnce.Do(func() {
close(m.stopChan)
})
} }
// cleanup removes all expired entries // cleanup removes all expired entries

View File

@@ -4,6 +4,7 @@ package expirymap
import ( import (
"testing" "testing"
"testing/synctest"
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -473,3 +474,52 @@ func TestExpiryMap_ValueOperations_Integration(t *testing.T) {
assert.Equal(t, "unique", value) assert.Equal(t, "unique", value)
assert.Equal(t, "key2", key) assert.Equal(t, "key2", key)
} }
func TestExpiryMap_Cleaner(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
em := New[string](time.Second)
defer em.StopCleaner()
em.Set("test", "value", 500*time.Millisecond)
// Wait 600ms, value is expired but cleaner hasn't run yet (interval is 1s)
time.Sleep(600 * time.Millisecond)
synctest.Wait()
// Map should still hold the value in its internal store before lazy access or cleaner
assert.Equal(t, 1, len(em.store.GetAll()), "store should still have 1 item before cleaner runs")
// Wait another 500ms so cleaner (1s interval) runs
time.Sleep(500 * time.Millisecond)
synctest.Wait() // Wait for background goroutine to process the tick
assert.Equal(t, 0, len(em.store.GetAll()), "store should be empty after cleaner runs")
})
}
func TestExpiryMap_StopCleaner(t *testing.T) {
em := New[string](time.Hour)
// Initially, stopChan is open, reading would block
select {
case <-em.stopChan:
t.Fatal("stopChan should be open initially")
default:
// success
}
em.StopCleaner()
// After StopCleaner, stopChan is closed, reading returns immediately
select {
case <-em.stopChan:
// success
default:
t.Fatal("stopChan was not closed by StopCleaner")
}
// Calling StopCleaner again should NOT panic thanks to sync.Once
assert.NotPanics(t, func() {
em.StopCleaner()
})
}

View File

@@ -48,7 +48,6 @@ type System struct {
detailsFetched atomic.Bool // True if static system details have been fetched and saved detailsFetched atomic.Bool // True if static system details have been fetched and saved
smartFetching atomic.Bool // True if SMART devices are currently being fetched smartFetching atomic.Bool // True if SMART devices are currently being fetched
smartInterval time.Duration // Interval for periodic SMART data updates smartInterval time.Duration // Interval for periodic SMART data updates
lastSmartFetch atomic.Int64 // Unix milliseconds of last SMART data fetch
} }
func (sm *SystemManager) NewSystem(systemId string) *System { func (sm *SystemManager) NewSystem(systemId string) *System {
@@ -142,11 +141,11 @@ func (sys *System) update() error {
if sys.smartInterval <= 0 { if sys.smartInterval <= 0 {
sys.smartInterval = time.Hour sys.smartInterval = time.Hour
} }
lastFetch := sys.lastSmartFetch.Load() lastFetch, _ := sys.manager.smartFetchMap.GetOk(sys.Id)
if time.Since(time.UnixMilli(lastFetch)) >= sys.smartInterval && sys.smartFetching.CompareAndSwap(false, true) { if time.Since(time.UnixMilli(lastFetch)) >= sys.smartInterval && sys.smartFetching.CompareAndSwap(false, true) {
go func() { go func() {
defer sys.smartFetching.Store(false) defer sys.smartFetching.Store(false)
sys.lastSmartFetch.Store(time.Now().UnixMilli()) sys.manager.smartFetchMap.Set(sys.Id, time.Now().UnixMilli(), sys.smartInterval+time.Minute)
_ = sys.FetchAndSaveSmartDevices() _ = sys.FetchAndSaveSmartDevices()
}() }()
} }

View File

@@ -8,6 +8,7 @@ import (
"github.com/henrygd/beszel/internal/hub/ws" "github.com/henrygd/beszel/internal/hub/ws"
"github.com/henrygd/beszel/internal/entities/system" "github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/hub/expirymap"
"github.com/henrygd/beszel/internal/common" "github.com/henrygd/beszel/internal/common"
@@ -40,9 +41,10 @@ var errSystemExists = errors.New("system exists")
// SystemManager manages a collection of monitored systems and their connections. // SystemManager manages a collection of monitored systems and their connections.
// It handles system lifecycle, status updates, and maintains both SSH and WebSocket connections. // It handles system lifecycle, status updates, and maintains both SSH and WebSocket connections.
type SystemManager struct { type SystemManager struct {
hub hubLike // Hub interface for database and alert operations hub hubLike // Hub interface for database and alert operations
systems *store.Store[string, *System] // Thread-safe store of active systems systems *store.Store[string, *System] // Thread-safe store of active systems
sshConfig *ssh.ClientConfig // SSH client configuration for system connections sshConfig *ssh.ClientConfig // SSH client configuration for system connections
smartFetchMap *expirymap.ExpiryMap[int64] // Stores last SMART fetch time per system ID
} }
// hubLike defines the interface requirements for the hub dependency. // hubLike defines the interface requirements for the hub dependency.
@@ -58,8 +60,9 @@ type hubLike interface {
// The hub must implement the hubLike interface to provide database and alert functionality. // The hub must implement the hubLike interface to provide database and alert functionality.
func NewSystemManager(hub hubLike) *SystemManager { func NewSystemManager(hub hubLike) *SystemManager {
return &SystemManager{ return &SystemManager{
systems: store.New(map[string]*System{}), systems: store.New(map[string]*System{}),
hub: hub, hub: hub,
smartFetchMap: expirymap.New[int64](time.Hour),
} }
} }

View File

@@ -113,4 +113,5 @@ func (sm *SystemManager) RemoveAllSystems() {
for _, system := range sm.systems.GetAll() { for _, system := range sm.systems.GetAll() {
sm.RemoveSystem(system.Id) sm.RemoveSystem(system.Id)
} }
sm.smartFetchMap.StopCleaner()
} }