add one minute chart + refactor rpc

- add one minute charts
- update disk io to use bytes
- update hub and agent connection interfaces / handlers to be more
flexible
- change agent cache to use cache time instead of session id
- refactor collection of metrics which require deltas to track
separately per cache time
This commit is contained in:
henrygd
2025-10-02 17:56:51 -04:00
parent f9a39c6004
commit 7d6230de74
44 changed files with 3892 additions and 551 deletions

View File

@@ -14,17 +14,25 @@ import (
"sync"
"time"
"github.com/henrygd/beszel/agent/deltatracker"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/blang/semver"
)
const (
// Docker API timeout in milliseconds
dockerTimeoutMs = 2100
// Maximum realistic network speed (5 GB/s) to detect bad deltas
maxNetworkSpeedBps uint64 = 5e9
)
type dockerManager struct {
client *http.Client // Client to query Docker API
wg sync.WaitGroup // WaitGroup to wait for all goroutines to finish
sem chan struct{} // Semaphore to limit concurrent container requests
containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to containerStatsMap
apiContainerList []*container.ApiInfo // List of containers from Docker API (no pointer)
apiContainerList []*container.ApiInfo // List of containers from Docker API
containerStatsMap map[string]*container.Stats // Keeps track of container stats
validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap
goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly)
@@ -32,6 +40,17 @@ type dockerManager struct {
buf *bytes.Buffer // Buffer to store and read response bodies
decoder *json.Decoder // Reusable JSON decoder that reads from buf
apiStats *container.ApiStats // Reusable API stats object
// Cache-time-aware tracking for CPU stats (similar to cpu.go)
// Maps cache time intervals to container-specific CPU usage tracking
lastCpuContainer map[uint16]map[string]uint64 // cacheTimeMs -> containerId -> last cpu container usage
lastCpuSystem map[uint16]map[string]uint64 // cacheTimeMs -> containerId -> last cpu system usage
lastCpuReadTime map[uint16]map[string]time.Time // cacheTimeMs -> containerId -> last read time (Windows)
// Network delta trackers - one per cache time to avoid interference
// cacheTimeMs -> DeltaTracker for network bytes sent/received
networkSentTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
networkRecvTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
}
// userAgentRoundTripper is a custom http.RoundTripper that adds a User-Agent header to all requests
@@ -62,8 +81,8 @@ func (d *dockerManager) dequeue() {
}
}
// Returns stats for all running containers
func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
// Returns stats for all running containers with cache-time-aware delta tracking
func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats, error) {
resp, err := dm.client.Get("http://localhost/containers/json")
if err != nil {
return nil, err
@@ -87,8 +106,7 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
var failedContainers []*container.ApiInfo
for i := range dm.apiContainerList {
ctr := dm.apiContainerList[i]
for _, ctr := range dm.apiContainerList {
ctr.IdShort = ctr.Id[:12]
dm.validIds[ctr.IdShort] = struct{}{}
// check if container is less than 1 minute old (possible restart)
@@ -98,9 +116,9 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
dm.deleteContainerStatsSync(ctr.IdShort)
}
dm.queue()
go func() {
go func(ctr *container.ApiInfo) {
defer dm.dequeue()
err := dm.updateContainerStats(ctr)
err := dm.updateContainerStats(ctr, cacheTimeMs)
// if error, delete from map and add to failed list to retry
if err != nil {
dm.containerStatsMutex.Lock()
@@ -108,7 +126,7 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
failedContainers = append(failedContainers, ctr)
dm.containerStatsMutex.Unlock()
}
}()
}(ctr)
}
dm.wg.Wait()
@@ -119,13 +137,12 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
for i := range failedContainers {
ctr := failedContainers[i]
dm.queue()
go func() {
go func(ctr *container.ApiInfo) {
defer dm.dequeue()
err = dm.updateContainerStats(ctr)
if err != nil {
slog.Error("Error getting container stats", "err", err)
if err2 := dm.updateContainerStats(ctr, cacheTimeMs); err2 != nil {
slog.Error("Error getting container stats", "err", err2)
}
}()
}(ctr)
}
dm.wg.Wait()
}
@@ -140,18 +157,156 @@ func (dm *dockerManager) getDockerStats() ([]*container.Stats, error) {
}
}
// prepare network trackers for next interval for this cache time
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
return stats, nil
}
// Updates stats for individual container
func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error {
// initializeCpuTracking initializes CPU tracking maps for a specific cache time interval
func (dm *dockerManager) initializeCpuTracking(cacheTimeMs uint16) {
// Initialize cache time maps if they don't exist
if dm.lastCpuContainer[cacheTimeMs] == nil {
dm.lastCpuContainer[cacheTimeMs] = make(map[string]uint64)
}
if dm.lastCpuSystem[cacheTimeMs] == nil {
dm.lastCpuSystem[cacheTimeMs] = make(map[string]uint64)
}
// Ensure the outer map exists before indexing
if dm.lastCpuReadTime == nil {
dm.lastCpuReadTime = make(map[uint16]map[string]time.Time)
}
if dm.lastCpuReadTime[cacheTimeMs] == nil {
dm.lastCpuReadTime[cacheTimeMs] = make(map[string]time.Time)
}
}
// getCpuPreviousValues returns previous CPU values for a container and cache time interval
func (dm *dockerManager) getCpuPreviousValues(cacheTimeMs uint16, containerId string) (uint64, uint64) {
return dm.lastCpuContainer[cacheTimeMs][containerId], dm.lastCpuSystem[cacheTimeMs][containerId]
}
// setCpuCurrentValues stores current CPU values for a container and cache time interval
func (dm *dockerManager) setCpuCurrentValues(cacheTimeMs uint16, containerId string, cpuContainer, cpuSystem uint64) {
dm.lastCpuContainer[cacheTimeMs][containerId] = cpuContainer
dm.lastCpuSystem[cacheTimeMs][containerId] = cpuSystem
}
// calculateMemoryUsage calculates memory usage from Docker API stats
func calculateMemoryUsage(apiStats *container.ApiStats, isWindows bool) (uint64, error) {
if isWindows {
return apiStats.MemoryStats.PrivateWorkingSet, nil
}
// Check if container has valid data, otherwise may be in restart loop (#103)
if apiStats.MemoryStats.Usage == 0 {
return 0, fmt.Errorf("no memory stats available")
}
memCache := apiStats.MemoryStats.Stats.InactiveFile
if memCache == 0 {
memCache = apiStats.MemoryStats.Stats.Cache
}
return apiStats.MemoryStats.Usage - memCache, nil
}
// getNetworkTracker returns the DeltaTracker for a specific cache time, creating it if needed
func (dm *dockerManager) getNetworkTracker(cacheTimeMs uint16, isSent bool) *deltatracker.DeltaTracker[string, uint64] {
var trackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
if isSent {
trackers = dm.networkSentTrackers
} else {
trackers = dm.networkRecvTrackers
}
if trackers[cacheTimeMs] == nil {
trackers[cacheTimeMs] = deltatracker.NewDeltaTracker[string, uint64]()
}
return trackers[cacheTimeMs]
}
// cycleNetworkDeltasForCacheTime cycles the network delta trackers for a specific cache time
func (dm *dockerManager) cycleNetworkDeltasForCacheTime(cacheTimeMs uint16) {
if dm.networkSentTrackers[cacheTimeMs] != nil {
dm.networkSentTrackers[cacheTimeMs].Cycle()
}
if dm.networkRecvTrackers[cacheTimeMs] != nil {
dm.networkRecvTrackers[cacheTimeMs].Cycle()
}
}
// calculateNetworkStats calculates network sent/receive deltas using DeltaTracker
func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, stats *container.Stats, initialized bool, name string, cacheTimeMs uint16) (uint64, uint64) {
var total_sent, total_recv uint64
for _, v := range apiStats.Networks {
total_sent += v.TxBytes
total_recv += v.RxBytes
}
// Get the DeltaTracker for this specific cache time
sentTracker := dm.getNetworkTracker(cacheTimeMs, true)
recvTracker := dm.getNetworkTracker(cacheTimeMs, false)
// Set current values in the cache-time-specific DeltaTracker
sentTracker.Set(ctr.IdShort, total_sent)
recvTracker.Set(ctr.IdShort, total_recv)
// Get deltas (bytes since last measurement)
sent_delta_raw := sentTracker.Delta(ctr.IdShort)
recv_delta_raw := recvTracker.Delta(ctr.IdShort)
// Calculate bytes per second independently for Tx and Rx if we have previous data
var sent_delta, recv_delta uint64
if initialized {
millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds())
if millisecondsElapsed > 0 {
if sent_delta_raw > 0 {
sent_delta = sent_delta_raw * 1000 / millisecondsElapsed
if sent_delta > maxNetworkSpeedBps {
slog.Warn("Bad network delta", "container", name)
sent_delta = 0
}
}
if recv_delta_raw > 0 {
recv_delta = recv_delta_raw * 1000 / millisecondsElapsed
if recv_delta > maxNetworkSpeedBps {
slog.Warn("Bad network delta", "container", name)
recv_delta = 0
}
}
}
}
return sent_delta, recv_delta
}
// validateCpuPercentage checks if CPU percentage is within valid range
func validateCpuPercentage(cpuPct float64, containerName string) error {
if cpuPct > 100 {
return fmt.Errorf("%s cpu pct greater than 100: %+v", containerName, cpuPct)
}
return nil
}
// updateContainerStatsValues updates the final stats values
func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta uint64, readTime time.Time) {
stats.Cpu = twoDecimals(cpuPct)
stats.Mem = bytesToMegabytes(float64(usedMemory))
stats.NetworkSent = bytesToMegabytes(float64(sent_delta))
stats.NetworkRecv = bytesToMegabytes(float64(recv_delta))
stats.PrevReadTime = readTime
}
// Updates stats for individual container with cache-time-aware delta tracking
func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeMs uint16) error {
name := ctr.Names[0][1:]
resp, err := dm.client.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1")
if err != nil {
return err
}
defer resp.Body.Close()
dm.containerStatsMutex.Lock()
defer dm.containerStatsMutex.Unlock()
@@ -169,72 +324,58 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo) error {
stats.NetworkSent = 0
stats.NetworkRecv = 0
// docker host container stats response
// res := dm.getApiStats()
// defer dm.putApiStats(res)
//
res := dm.apiStats
res.Networks = nil
if err := dm.decode(resp, res); err != nil {
return err
}
// calculate cpu and memory stats
var usedMemory uint64
// Initialize CPU tracking for this cache time interval
dm.initializeCpuTracking(cacheTimeMs)
// Get previous CPU values
prevCpuContainer, prevCpuSystem := dm.getCpuPreviousValues(cacheTimeMs, ctr.IdShort)
// Calculate CPU percentage based on platform
var cpuPct float64
// store current cpu stats
prevCpuContainer, prevCpuSystem := stats.CpuContainer, stats.CpuSystem
stats.CpuContainer = res.CPUStats.CPUUsage.TotalUsage
stats.CpuSystem = res.CPUStats.SystemUsage
if dm.isWindows {
usedMemory = res.MemoryStats.PrivateWorkingSet
cpuPct = res.CalculateCpuPercentWindows(prevCpuContainer, stats.PrevReadTime)
prevRead := dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort]
cpuPct = res.CalculateCpuPercentWindows(prevCpuContainer, prevRead)
} else {
// check if container has valid data, otherwise may be in restart loop (#103)
if res.MemoryStats.Usage == 0 {
return fmt.Errorf("%s - no memory stats - see https://github.com/henrygd/beszel/issues/144", name)
}
memCache := res.MemoryStats.Stats.InactiveFile
if memCache == 0 {
memCache = res.MemoryStats.Stats.Cache
}
usedMemory = res.MemoryStats.Usage - memCache
cpuPct = res.CalculateCpuPercentLinux(prevCpuContainer, prevCpuSystem)
}
if cpuPct > 100 {
return fmt.Errorf("%s cpu pct greater than 100: %+v", name, cpuPct)
// Calculate memory usage
usedMemory, err := calculateMemoryUsage(res, dm.isWindows)
if err != nil {
return fmt.Errorf("%s - %w - see https://github.com/henrygd/beszel/issues/144", name, err)
}
// network
// Store current CPU stats for next calculation
currentCpuContainer := res.CPUStats.CPUUsage.TotalUsage
currentCpuSystem := res.CPUStats.SystemUsage
dm.setCpuCurrentValues(cacheTimeMs, ctr.IdShort, currentCpuContainer, currentCpuSystem)
// Validate CPU percentage
if err := validateCpuPercentage(cpuPct, name); err != nil {
return err
}
// Calculate network stats using DeltaTracker
sent_delta, recv_delta := dm.calculateNetworkStats(ctr, res, stats, initialized, name, cacheTimeMs)
// Store current network values for legacy compatibility
var total_sent, total_recv uint64
for _, v := range res.Networks {
total_sent += v.TxBytes
total_recv += v.RxBytes
}
var sent_delta, recv_delta uint64
millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds())
if initialized && millisecondsElapsed > 0 {
// get bytes per second
sent_delta = (total_sent - stats.PrevNet.Sent) * 1000 / millisecondsElapsed
recv_delta = (total_recv - stats.PrevNet.Recv) * 1000 / millisecondsElapsed
// check for unrealistic network values (> 5GB/s)
if sent_delta > 5e9 || recv_delta > 5e9 {
slog.Warn("Bad network delta", "container", name)
sent_delta, recv_delta = 0, 0
}
}
stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv
stats.Cpu = twoDecimals(cpuPct)
stats.Mem = bytesToMegabytes(float64(usedMemory))
stats.NetworkSent = bytesToMegabytes(float64(sent_delta))
stats.NetworkRecv = bytesToMegabytes(float64(recv_delta))
stats.PrevReadTime = res.Read
// Update final stats values
updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, res.Read)
// store per-cache-time read time for Windows CPU percent calc
dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort] = res.Read
return nil
}
@@ -244,6 +385,15 @@ func (dm *dockerManager) deleteContainerStatsSync(id string) {
dm.containerStatsMutex.Lock()
defer dm.containerStatsMutex.Unlock()
delete(dm.containerStatsMap, id)
for ct := range dm.lastCpuContainer {
delete(dm.lastCpuContainer[ct], id)
}
for ct := range dm.lastCpuSystem {
delete(dm.lastCpuSystem[ct], id)
}
for ct := range dm.lastCpuReadTime {
delete(dm.lastCpuReadTime[ct], id)
}
}
// Creates a new http client for Docker or Podman API
@@ -283,7 +433,7 @@ func newDockerManager(a *Agent) *dockerManager {
}
// configurable timeout
timeout := time.Millisecond * 2100
timeout := time.Millisecond * time.Duration(dockerTimeoutMs)
if t, set := GetEnv("DOCKER_TIMEOUT"); set {
timeout, err = time.ParseDuration(t)
if err != nil {
@@ -308,6 +458,13 @@ func newDockerManager(a *Agent) *dockerManager {
sem: make(chan struct{}, 5),
apiContainerList: []*container.ApiInfo{},
apiStats: &container.ApiStats{},
// Initialize cache-time-aware tracking structures
lastCpuContainer: make(map[uint16]map[string]uint64),
lastCpuSystem: make(map[uint16]map[string]uint64),
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
}
// If using podman, return client