Compare commits

...

2 Commits

Author SHA1 Message Date
henrygd
35d0e792ad refactor(expirymap): optimize performance and add StopCleaner method 2026-03-08 19:09:41 -04:00
henrygd
654cd06b19 respect SMART_INTERVAL across agent reconnects (#1800)
Move tracking of the last SMART data fetch from individual System
instances to the SystemManager using a TTL-based ExpiryMap.

This ensures that the SMART_INTERVAL is respected even if an
agent connection is dropped and re-established, preventing
redundant data collection on every reconnect.
2026-03-08 19:03:50 -04:00
5 changed files with 91 additions and 24 deletions

View File

@@ -1,29 +1,33 @@
// Package expirymap provides a thread-safe map with expiring entries.
// It supports TTL-based expiration with both lazy cleanup on access
// and periodic background cleanup.
package expirymap
import (
"reflect"
"sync"
"time"
"github.com/pocketbase/pocketbase/tools/store"
)
type val[T any] struct {
type val[T comparable] struct {
value T
expires time.Time
}
type ExpiryMap[T any] struct {
store *store.Store[string, *val[T]]
cleanupInterval time.Duration
type ExpiryMap[T comparable] struct {
store *store.Store[string, *val[T]]
stopChan chan struct{}
stopOnce sync.Once
}
// New creates a new expiry map with custom cleanup interval
func New[T any](cleanupInterval time.Duration) *ExpiryMap[T] {
func New[T comparable](cleanupInterval time.Duration) *ExpiryMap[T] {
m := &ExpiryMap[T]{
store: store.New(map[string]*val[T]{}),
cleanupInterval: cleanupInterval,
store: store.New(map[string]*val[T]{}),
stopChan: make(chan struct{}),
}
m.startCleaner()
go m.startCleaner(cleanupInterval)
return m
}
@@ -55,7 +59,7 @@ func (m *ExpiryMap[T]) GetOk(key string) (T, bool) {
// GetByValue retrieves a value by value
func (m *ExpiryMap[T]) GetByValue(val T) (key string, value T, ok bool) {
for key, v := range m.store.GetAll() {
if reflect.DeepEqual(v.value, val) {
if v.value == val {
// check if expired
if v.expires.Before(time.Now()) {
m.store.Remove(key)
@@ -75,7 +79,7 @@ func (m *ExpiryMap[T]) Remove(key string) {
// RemovebyValue removes a value by value
func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) {
for key, val := range m.store.GetAll() {
if reflect.DeepEqual(val.value, value) {
if val.value == value {
m.store.Remove(key)
return val.value, true
}
@@ -84,13 +88,23 @@ func (m *ExpiryMap[T]) RemovebyValue(value T) (T, bool) {
}
// startCleaner runs the background cleanup process
func (m *ExpiryMap[T]) startCleaner() {
go func() {
tick := time.Tick(m.cleanupInterval)
for range tick {
func (m *ExpiryMap[T]) startCleaner(interval time.Duration) {
tick := time.Tick(interval)
for {
select {
case <-tick:
m.cleanup()
case <-m.stopChan:
return
}
}()
}
}
// StopCleaner stops the background cleanup process
func (m *ExpiryMap[T]) StopCleaner() {
m.stopOnce.Do(func() {
close(m.stopChan)
})
}
// cleanup removes all expired entries

View File

@@ -4,6 +4,7 @@ package expirymap
import (
"testing"
"testing/synctest"
"time"
"github.com/stretchr/testify/assert"
@@ -473,3 +474,52 @@ func TestExpiryMap_ValueOperations_Integration(t *testing.T) {
assert.Equal(t, "unique", value)
assert.Equal(t, "key2", key)
}
func TestExpiryMap_Cleaner(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
em := New[string](time.Second)
defer em.StopCleaner()
em.Set("test", "value", 500*time.Millisecond)
// Wait 600ms, value is expired but cleaner hasn't run yet (interval is 1s)
time.Sleep(600 * time.Millisecond)
synctest.Wait()
// Map should still hold the value in its internal store before lazy access or cleaner
assert.Equal(t, 1, len(em.store.GetAll()), "store should still have 1 item before cleaner runs")
// Wait another 500ms so cleaner (1s interval) runs
time.Sleep(500 * time.Millisecond)
synctest.Wait() // Wait for background goroutine to process the tick
assert.Equal(t, 0, len(em.store.GetAll()), "store should be empty after cleaner runs")
})
}
func TestExpiryMap_StopCleaner(t *testing.T) {
em := New[string](time.Hour)
// Initially, stopChan is open, reading would block
select {
case <-em.stopChan:
t.Fatal("stopChan should be open initially")
default:
// success
}
em.StopCleaner()
// After StopCleaner, stopChan is closed, reading returns immediately
select {
case <-em.stopChan:
// success
default:
t.Fatal("stopChan was not closed by StopCleaner")
}
// Calling StopCleaner again should NOT panic thanks to sync.Once
assert.NotPanics(t, func() {
em.StopCleaner()
})
}

View File

@@ -48,7 +48,6 @@ type System struct {
detailsFetched atomic.Bool // True if static system details have been fetched and saved
smartFetching atomic.Bool // True if SMART devices are currently being fetched
smartInterval time.Duration // Interval for periodic SMART data updates
lastSmartFetch atomic.Int64 // Unix milliseconds of last SMART data fetch
}
func (sm *SystemManager) NewSystem(systemId string) *System {
@@ -142,11 +141,11 @@ func (sys *System) update() error {
if sys.smartInterval <= 0 {
sys.smartInterval = time.Hour
}
lastFetch := sys.lastSmartFetch.Load()
lastFetch, _ := sys.manager.smartFetchMap.GetOk(sys.Id)
if time.Since(time.UnixMilli(lastFetch)) >= sys.smartInterval && sys.smartFetching.CompareAndSwap(false, true) {
go func() {
defer sys.smartFetching.Store(false)
sys.lastSmartFetch.Store(time.Now().UnixMilli())
sys.manager.smartFetchMap.Set(sys.Id, time.Now().UnixMilli(), sys.smartInterval+time.Minute)
_ = sys.FetchAndSaveSmartDevices()
}()
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/henrygd/beszel/internal/hub/ws"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/hub/expirymap"
"github.com/henrygd/beszel/internal/common"
@@ -40,9 +41,10 @@ var errSystemExists = errors.New("system exists")
// SystemManager manages a collection of monitored systems and their connections.
// It handles system lifecycle, status updates, and maintains both SSH and WebSocket connections.
type SystemManager struct {
hub hubLike // Hub interface for database and alert operations
systems *store.Store[string, *System] // Thread-safe store of active systems
sshConfig *ssh.ClientConfig // SSH client configuration for system connections
hub hubLike // Hub interface for database and alert operations
systems *store.Store[string, *System] // Thread-safe store of active systems
sshConfig *ssh.ClientConfig // SSH client configuration for system connections
smartFetchMap *expirymap.ExpiryMap[int64] // Stores last SMART fetch time per system ID
}
// hubLike defines the interface requirements for the hub dependency.
@@ -58,8 +60,9 @@ type hubLike interface {
// The hub must implement the hubLike interface to provide database and alert functionality.
func NewSystemManager(hub hubLike) *SystemManager {
return &SystemManager{
systems: store.New(map[string]*System{}),
hub: hub,
systems: store.New(map[string]*System{}),
hub: hub,
smartFetchMap: expirymap.New[int64](time.Hour),
}
}

View File

@@ -113,4 +113,5 @@ func (sm *SystemManager) RemoveAllSystems() {
for _, system := range sm.systems.GetAll() {
sm.RemoveSystem(system.Id)
}
sm.smartFetchMap.StopCleaner()
}