mirror of
https://github.com/henrygd/beszel.git
synced 2026-03-23 14:06:18 +01:00
[Feature] Improve Container monitoring (#928)
* align docker colors * change the name of the chart * Add tabs * Add volumes, health, stack and uptime * remove tests * fix table * Add stack filtering * search in filter * add container id * fix uptime * feat add ability to exclude container * remove stack colors, didnt look clear * better table * add disk io * Sync with main * undo locale * undo locale
This commit is contained in:
198
agent/docker.go
198
agent/docker.go
@@ -25,6 +25,16 @@ const (
|
||||
dockerTimeoutMs = 2100
|
||||
// Maximum realistic network speed (5 GB/s) to detect bad deltas
|
||||
maxNetworkSpeedBps uint64 = 5e9
|
||||
// Container and health constants
|
||||
composeProjectLabel = "com.docker.compose.project"
|
||||
healthStatusNone = "none"
|
||||
containerStateRunning = "running"
|
||||
containerStateUnknown = "unknown"
|
||||
volumeTypeVolume = "volume"
|
||||
diskOpRead = "read"
|
||||
diskOpReadCap = "Read"
|
||||
diskOpWrite = "write"
|
||||
diskOpWriteCap = "Write"
|
||||
// Maximum conceivable memory usage of a container (100TB) to detect bad memory stats
|
||||
maxMemoryUsage uint64 = 100 * 1024 * 1024 * 1024 * 1024
|
||||
)
|
||||
@@ -42,6 +52,8 @@ type dockerManager struct {
|
||||
buf *bytes.Buffer // Buffer to store and read response bodies
|
||||
decoder *json.Decoder // Reusable JSON decoder that reads from buf
|
||||
apiStats *container.ApiStats // Reusable API stats object
|
||||
volumeSizeCache map[string]float64 // Cached volume sizes (name -> size in MB)
|
||||
volumeSizeUpdated time.Time // Last time volume sizes were updated
|
||||
|
||||
// Cache-time-aware tracking for CPU stats (similar to cpu.go)
|
||||
// Maps cache time intervals to container-specific CPU usage tracking
|
||||
@@ -53,6 +65,11 @@ type dockerManager struct {
|
||||
// cacheTimeMs -> DeltaTracker for network bytes sent/received
|
||||
networkSentTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
||||
networkRecvTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
||||
|
||||
// Disk I/O delta trackers - one per cache time to avoid interference
|
||||
// cacheTimeMs -> DeltaTracker for disk bytes read/written
|
||||
diskReadTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
||||
diskWriteTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
||||
}
|
||||
|
||||
// userAgentRoundTripper is a custom http.RoundTripper that adds a User-Agent header to all requests
|
||||
@@ -159,8 +176,9 @@ func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats,
|
||||
}
|
||||
}
|
||||
|
||||
// prepare network trackers for next interval for this cache time
|
||||
// prepare network and disk trackers for next interval for this cache time
|
||||
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
|
||||
dm.cycleDiskDeltasForCacheTime(cacheTimeMs)
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
@@ -239,6 +257,32 @@ func (dm *dockerManager) cycleNetworkDeltasForCacheTime(cacheTimeMs uint16) {
|
||||
}
|
||||
}
|
||||
|
||||
// getDiskTracker returns the DeltaTracker for disk I/O for a specific cache time, creating it if needed
|
||||
func (dm *dockerManager) getDiskTracker(cacheTimeMs uint16, isRead bool) *deltatracker.DeltaTracker[string, uint64] {
|
||||
var trackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
||||
if isRead {
|
||||
trackers = dm.diskReadTrackers
|
||||
} else {
|
||||
trackers = dm.diskWriteTrackers
|
||||
}
|
||||
|
||||
if trackers[cacheTimeMs] == nil {
|
||||
trackers[cacheTimeMs] = deltatracker.NewDeltaTracker[string, uint64]()
|
||||
}
|
||||
|
||||
return trackers[cacheTimeMs]
|
||||
}
|
||||
|
||||
// cycleDiskDeltasForCacheTime cycles the disk delta trackers for a specific cache time
|
||||
func (dm *dockerManager) cycleDiskDeltasForCacheTime(cacheTimeMs uint16) {
|
||||
if dm.diskReadTrackers[cacheTimeMs] != nil {
|
||||
dm.diskReadTrackers[cacheTimeMs].Cycle()
|
||||
}
|
||||
if dm.diskWriteTrackers[cacheTimeMs] != nil {
|
||||
dm.diskWriteTrackers[cacheTimeMs].Cycle()
|
||||
}
|
||||
}
|
||||
|
||||
// calculateNetworkStats calculates network sent/receive deltas using DeltaTracker
|
||||
func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, stats *container.Stats, initialized bool, name string, cacheTimeMs uint16) (uint64, uint64) {
|
||||
var total_sent, total_recv uint64
|
||||
@@ -284,6 +328,50 @@ func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats
|
||||
return sent_delta, recv_delta
|
||||
}
|
||||
|
||||
// calculateDiskStats calculates disk read/write deltas using DeltaTracker
|
||||
func (dm *dockerManager) calculateDiskStats(ctr *container.ApiInfo, apiStats *container.ApiStats, stats *container.Stats, initialized bool, cacheTimeMs uint16) (uint64, uint64) {
|
||||
var total_read, total_write uint64
|
||||
for _, entry := range apiStats.BlkioStats.IoServiceBytesRecursive {
|
||||
switch entry.Op {
|
||||
case diskOpRead, diskOpReadCap:
|
||||
total_read += entry.Value
|
||||
case diskOpWrite, diskOpWriteCap:
|
||||
total_write += entry.Value
|
||||
}
|
||||
}
|
||||
|
||||
// Get the DeltaTracker for this specific cache time
|
||||
readTracker := dm.getDiskTracker(cacheTimeMs, true)
|
||||
writeTracker := dm.getDiskTracker(cacheTimeMs, false)
|
||||
|
||||
// Set current values in the cache-time-specific DeltaTracker
|
||||
readTracker.Set(ctr.IdShort, total_read)
|
||||
writeTracker.Set(ctr.IdShort, total_write)
|
||||
|
||||
// Get deltas (bytes since last measurement)
|
||||
read_delta_raw := readTracker.Delta(ctr.IdShort)
|
||||
write_delta_raw := writeTracker.Delta(ctr.IdShort)
|
||||
|
||||
// Calculate bytes per second if we have previous data
|
||||
var read_delta, write_delta uint64
|
||||
if initialized {
|
||||
millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds())
|
||||
if millisecondsElapsed > 0 {
|
||||
if read_delta_raw > 0 {
|
||||
read_delta = read_delta_raw * 1000 / millisecondsElapsed
|
||||
}
|
||||
if write_delta_raw > 0 {
|
||||
write_delta = write_delta_raw * 1000 / millisecondsElapsed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Store current disk values for legacy compatibility
|
||||
stats.PrevDisk.Read, stats.PrevDisk.Write = total_read, total_write
|
||||
|
||||
return read_delta, write_delta
|
||||
}
|
||||
|
||||
// validateCpuPercentage checks if CPU percentage is within valid range
|
||||
func validateCpuPercentage(cpuPct float64, containerName string) error {
|
||||
if cpuPct > 100 {
|
||||
@@ -293,11 +381,13 @@ func validateCpuPercentage(cpuPct float64, containerName string) error {
|
||||
}
|
||||
|
||||
// updateContainerStatsValues updates the final stats values
|
||||
func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta uint64, readTime time.Time) {
|
||||
func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta, read_delta, write_delta uint64, readTime time.Time) {
|
||||
stats.Cpu = twoDecimals(cpuPct)
|
||||
stats.Mem = bytesToMegabytes(float64(usedMemory))
|
||||
stats.NetworkSent = bytesToMegabytes(float64(sent_delta))
|
||||
stats.NetworkRecv = bytesToMegabytes(float64(recv_delta))
|
||||
stats.DiskRead = bytesToMegabytes(float64(read_delta))
|
||||
stats.DiskWrite = bytesToMegabytes(float64(write_delta))
|
||||
stats.PrevReadTime = readTime
|
||||
}
|
||||
|
||||
@@ -320,11 +410,64 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeM
|
||||
dm.containerStatsMap[ctr.IdShort] = stats
|
||||
}
|
||||
|
||||
// Update name in case it changed
|
||||
stats.Name = name
|
||||
|
||||
// Set container metadata
|
||||
stats.IdShort = ctr.IdShort
|
||||
stats.Status = ctr.State
|
||||
if stats.Status == "" {
|
||||
stats.Status = containerStateUnknown
|
||||
}
|
||||
|
||||
// Set health status
|
||||
stats.Health = healthStatusNone
|
||||
if ctr.Health != "" {
|
||||
stats.Health = ctr.Health
|
||||
}
|
||||
|
||||
// Set Docker Compose project name
|
||||
if ctr.Labels != nil {
|
||||
if projectName, exists := ctr.Labels[composeProjectLabel]; exists {
|
||||
stats.Project = projectName
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate uptime for running containers
|
||||
if ctr.StartedAt > 0 && stats.Status == containerStateRunning {
|
||||
startedTime := time.Unix(ctr.StartedAt, 0)
|
||||
stats.Uptime = twoDecimals(time.Since(startedTime).Seconds())
|
||||
} else {
|
||||
stats.Uptime = 0
|
||||
}
|
||||
|
||||
// Collect volume information and fetch sizes
|
||||
volumeCount := 0
|
||||
for _, mount := range ctr.Mounts {
|
||||
if mount.Type == volumeTypeVolume && mount.Name != "" {
|
||||
volumeCount++
|
||||
}
|
||||
}
|
||||
if volumeCount > 0 {
|
||||
stats.Volumes = make(map[string]float64, volumeCount)
|
||||
for _, mount := range ctr.Mounts {
|
||||
if mount.Type == volumeTypeVolume && mount.Name != "" {
|
||||
// Fetch volume size using Docker system df API
|
||||
size := dm.getVolumeSize(mount.Name)
|
||||
stats.Volumes[mount.Name] = size
|
||||
}
|
||||
}
|
||||
} else {
|
||||
stats.Volumes = nil
|
||||
}
|
||||
|
||||
// reset current stats
|
||||
stats.Cpu = 0
|
||||
stats.Mem = 0
|
||||
stats.NetworkSent = 0
|
||||
stats.NetworkRecv = 0
|
||||
stats.DiskRead = 0
|
||||
stats.DiskWrite = 0
|
||||
|
||||
res := dm.apiStats
|
||||
res.Networks = nil
|
||||
@@ -374,8 +517,11 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeM
|
||||
}
|
||||
stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv
|
||||
|
||||
// Calculate disk I/O stats using DeltaTracker
|
||||
read_delta, write_delta := dm.calculateDiskStats(ctr, res, stats, initialized, cacheTimeMs)
|
||||
|
||||
// Update final stats values
|
||||
updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, res.Read)
|
||||
updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, read_delta, write_delta, res.Read)
|
||||
// store per-cache-time read time for Windows CPU percent calc
|
||||
dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort] = res.Read
|
||||
|
||||
@@ -460,6 +606,7 @@ func newDockerManager(a *Agent) *dockerManager {
|
||||
sem: make(chan struct{}, 5),
|
||||
apiContainerList: []*container.ApiInfo{},
|
||||
apiStats: &container.ApiStats{},
|
||||
volumeSizeCache: make(map[string]float64),
|
||||
|
||||
// Initialize cache-time-aware tracking structures
|
||||
lastCpuContainer: make(map[uint16]map[string]uint64),
|
||||
@@ -467,6 +614,8 @@ func newDockerManager(a *Agent) *dockerManager {
|
||||
lastCpuReadTime: make(map[uint16]map[string]time.Time),
|
||||
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
||||
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
||||
diskReadTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
||||
diskWriteTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
||||
}
|
||||
|
||||
// If using podman, return client
|
||||
@@ -521,6 +670,49 @@ func (dm *dockerManager) checkDockerVersion() {
|
||||
}
|
||||
}
|
||||
|
||||
// getVolumeSize returns the cached size of a Docker volume
|
||||
// Refreshes the cache every 5 minutes using the system df API
|
||||
// Returns size in MB (megabytes)
|
||||
func (dm *dockerManager) getVolumeSize(volumeName string) float64 {
|
||||
// Refresh cache if older than 5 minutes
|
||||
if time.Since(dm.volumeSizeUpdated) > 5*time.Minute {
|
||||
dm.refreshVolumeSizes()
|
||||
}
|
||||
|
||||
return dm.volumeSizeCache[volumeName]
|
||||
}
|
||||
|
||||
// refreshVolumeSizes fetches all volume sizes from Docker and updates the cache
|
||||
func (dm *dockerManager) refreshVolumeSizes() {
|
||||
type volumeInfo struct {
|
||||
Name string
|
||||
UsageData struct {
|
||||
Size int64
|
||||
}
|
||||
}
|
||||
type systemDfResponse struct {
|
||||
Volumes []volumeInfo
|
||||
}
|
||||
|
||||
resp, err := dm.client.Get("http://localhost/system/df")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var dfData systemDfResponse
|
||||
if err := dm.decode(resp, &dfData); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Update all volume sizes in cache
|
||||
for _, vol := range dfData.Volumes {
|
||||
// Convert bytes to MB (megabytes)
|
||||
dm.volumeSizeCache[vol.Name] = float64(vol.UsageData.Size) / 1_000_000
|
||||
}
|
||||
|
||||
dm.volumeSizeUpdated = time.Now()
|
||||
}
|
||||
|
||||
// Decodes Docker API JSON response using a reusable buffer and decoder. Not thread safe.
|
||||
func (dm *dockerManager) decode(resp *http.Response, d any) error {
|
||||
if dm.buf == nil {
|
||||
|
||||
Reference in New Issue
Block a user