mirror of
https://github.com/henrygd/beszel.git
synced 2026-03-26 07:26:16 +01:00
Previously, the agent shared a single PrevReadTime timestamp across all collection intervals (e.g., 1s and 60s). This caused the 60s collector to divide its accumulated 60s byte delta by the tiny time elapsed since the last 1s collection, resulting in astronomically inflated network rates. The fix introduces per-cache-time read time tracking, ensuring calculations for each interval use their own independent timing context.
944 lines
30 KiB
Go
944 lines
30 KiB
Go
package agent
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/henrygd/beszel/agent/deltatracker"
|
|
"github.com/henrygd/beszel/agent/utils"
|
|
"github.com/henrygd/beszel/internal/entities/container"
|
|
|
|
"github.com/blang/semver"
|
|
)
|
|
|
|
// ansiEscapePattern matches ANSI escape sequences (colors, cursor movement, etc.)
|
|
// This includes CSI sequences like \x1b[...m and simple escapes like \x1b[K
|
|
var ansiEscapePattern = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]|\x1b\][^\x07]*\x07|\x1b[@-Z\\-_]`)
|
|
var dockerContainerIDPattern = regexp.MustCompile(`^[a-fA-F0-9]{12,64}$`)
|
|
|
|
const (
|
|
// Docker API timeout in milliseconds
|
|
dockerTimeoutMs = 2100
|
|
// Maximum realistic network speed (5 GB/s) to detect bad deltas
|
|
maxNetworkSpeedBps uint64 = 5e9
|
|
// Maximum conceivable memory usage of a container (100TB) to detect bad memory stats
|
|
maxMemoryUsage uint64 = 100 * 1024 * 1024 * 1024 * 1024
|
|
// Number of log lines to request when fetching container logs
|
|
dockerLogsTail = 200
|
|
// Maximum size of a single log frame (1MB) to prevent memory exhaustion
|
|
// A single log line larger than 1MB is likely an error or misconfiguration
|
|
maxLogFrameSize = 1024 * 1024
|
|
// Maximum total log content size (5MB) to prevent memory exhaustion
|
|
// This provides a reasonable limit for network transfer and browser rendering
|
|
maxTotalLogSize = 5 * 1024 * 1024
|
|
)
|
|
|
|
type dockerManager struct {
|
|
client *http.Client // Client to query Docker API
|
|
wg sync.WaitGroup // WaitGroup to wait for all goroutines to finish
|
|
sem chan struct{} // Semaphore to limit concurrent container requests
|
|
containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to containerStatsMap
|
|
apiContainerList []*container.ApiInfo // List of containers from Docker API
|
|
containerStatsMap map[string]*container.Stats // Keeps track of container stats
|
|
validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap
|
|
goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly)
|
|
isWindows bool // Whether the Docker Engine API is running on Windows
|
|
buf *bytes.Buffer // Buffer to store and read response bodies
|
|
decoder *json.Decoder // Reusable JSON decoder that reads from buf
|
|
apiStats *container.ApiStats // Reusable API stats object
|
|
excludeContainers []string // Patterns to exclude containers by name
|
|
usingPodman bool // Whether the Docker Engine API is running on Podman
|
|
|
|
// Cache-time-aware tracking for CPU stats (similar to cpu.go)
|
|
// Maps cache time intervals to container-specific CPU usage tracking
|
|
lastCpuContainer map[uint16]map[string]uint64 // cacheTimeMs -> containerId -> last cpu container usage
|
|
lastCpuSystem map[uint16]map[string]uint64 // cacheTimeMs -> containerId -> last cpu system usage
|
|
lastCpuReadTime map[uint16]map[string]time.Time // cacheTimeMs -> containerId -> last read time (Windows)
|
|
|
|
// Network delta trackers - one per cache time to avoid interference
|
|
// cacheTimeMs -> DeltaTracker for network bytes sent/received
|
|
networkSentTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
|
networkRecvTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
|
lastNetworkReadTime map[uint16]map[string]time.Time // cacheTimeMs -> containerId -> last network read time
|
|
retrySleep func(time.Duration)
|
|
}
|
|
|
|
// userAgentRoundTripper is a custom http.RoundTripper that adds a User-Agent header to all requests
|
|
type userAgentRoundTripper struct {
|
|
rt http.RoundTripper
|
|
userAgent string
|
|
}
|
|
|
|
// RoundTrip implements the http.RoundTripper interface
|
|
func (u *userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
req.Header.Set("User-Agent", u.userAgent)
|
|
return u.rt.RoundTrip(req)
|
|
}
|
|
|
|
// Add goroutine to the queue
|
|
func (d *dockerManager) queue() {
|
|
d.wg.Add(1)
|
|
if d.goodDockerVersion {
|
|
d.sem <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// Remove goroutine from the queue
|
|
func (d *dockerManager) dequeue() {
|
|
d.wg.Done()
|
|
if d.goodDockerVersion {
|
|
<-d.sem
|
|
}
|
|
}
|
|
|
|
// shouldExcludeContainer checks if a container name matches any exclusion pattern
|
|
func (dm *dockerManager) shouldExcludeContainer(name string) bool {
|
|
if len(dm.excludeContainers) == 0 {
|
|
return false
|
|
}
|
|
for _, pattern := range dm.excludeContainers {
|
|
if match, _ := path.Match(pattern, name); match {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Returns stats for all running containers with cache-time-aware delta tracking
|
|
func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats, error) {
|
|
resp, err := dm.client.Get("http://localhost/containers/json")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dm.apiContainerList = dm.apiContainerList[:0]
|
|
if err := dm.decode(resp, &dm.apiContainerList); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dm.isWindows = strings.Contains(resp.Header.Get("Server"), "windows")
|
|
|
|
containersLength := len(dm.apiContainerList)
|
|
|
|
// store valid ids to clean up old container ids from map
|
|
if dm.validIds == nil {
|
|
dm.validIds = make(map[string]struct{}, containersLength)
|
|
} else {
|
|
clear(dm.validIds)
|
|
}
|
|
|
|
var failedContainers []*container.ApiInfo
|
|
|
|
for _, ctr := range dm.apiContainerList {
|
|
ctr.IdShort = ctr.Id[:12]
|
|
|
|
// Skip this container if it matches the exclusion pattern
|
|
if dm.shouldExcludeContainer(ctr.Names[0][1:]) {
|
|
slog.Debug("Excluding container", "name", ctr.Names[0][1:])
|
|
continue
|
|
}
|
|
|
|
dm.validIds[ctr.IdShort] = struct{}{}
|
|
// check if container is less than 1 minute old (possible restart)
|
|
// note: can't use Created field because it's not updated on restart
|
|
if strings.Contains(ctr.Status, "second") {
|
|
// if so, remove old container data
|
|
dm.deleteContainerStatsSync(ctr.IdShort)
|
|
}
|
|
dm.queue()
|
|
go func(ctr *container.ApiInfo) {
|
|
defer dm.dequeue()
|
|
err := dm.updateContainerStats(ctr, cacheTimeMs)
|
|
// if error, delete from map and add to failed list to retry
|
|
if err != nil {
|
|
dm.containerStatsMutex.Lock()
|
|
delete(dm.containerStatsMap, ctr.IdShort)
|
|
failedContainers = append(failedContainers, ctr)
|
|
dm.containerStatsMutex.Unlock()
|
|
}
|
|
}(ctr)
|
|
}
|
|
|
|
dm.wg.Wait()
|
|
|
|
// retry failed containers separately so we can run them in parallel (docker 24 bug)
|
|
if len(failedContainers) > 0 {
|
|
slog.Debug("Retrying failed containers", "count", len(failedContainers))
|
|
for i := range failedContainers {
|
|
ctr := failedContainers[i]
|
|
dm.queue()
|
|
go func(ctr *container.ApiInfo) {
|
|
defer dm.dequeue()
|
|
if err2 := dm.updateContainerStats(ctr, cacheTimeMs); err2 != nil {
|
|
slog.Error("Error getting container stats", "err", err2)
|
|
}
|
|
}(ctr)
|
|
}
|
|
dm.wg.Wait()
|
|
}
|
|
|
|
// populate final stats and remove old / invalid container stats
|
|
stats := make([]*container.Stats, 0, containersLength)
|
|
for id, v := range dm.containerStatsMap {
|
|
if _, exists := dm.validIds[id]; !exists {
|
|
delete(dm.containerStatsMap, id)
|
|
} else {
|
|
stats = append(stats, v)
|
|
}
|
|
}
|
|
|
|
// prepare network trackers for next interval for this cache time
|
|
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
// initializeCpuTracking initializes CPU tracking maps for a specific cache time interval
|
|
func (dm *dockerManager) initializeCpuTracking(cacheTimeMs uint16) {
|
|
// Initialize cache time maps if they don't exist
|
|
if dm.lastCpuContainer[cacheTimeMs] == nil {
|
|
dm.lastCpuContainer[cacheTimeMs] = make(map[string]uint64)
|
|
}
|
|
if dm.lastCpuSystem[cacheTimeMs] == nil {
|
|
dm.lastCpuSystem[cacheTimeMs] = make(map[string]uint64)
|
|
}
|
|
// Ensure the outer map exists before indexing
|
|
if dm.lastCpuReadTime == nil {
|
|
dm.lastCpuReadTime = make(map[uint16]map[string]time.Time)
|
|
}
|
|
if dm.lastCpuReadTime[cacheTimeMs] == nil {
|
|
dm.lastCpuReadTime[cacheTimeMs] = make(map[string]time.Time)
|
|
}
|
|
}
|
|
|
|
// getCpuPreviousValues returns previous CPU values for a container and cache time interval
|
|
func (dm *dockerManager) getCpuPreviousValues(cacheTimeMs uint16, containerId string) (uint64, uint64) {
|
|
return dm.lastCpuContainer[cacheTimeMs][containerId], dm.lastCpuSystem[cacheTimeMs][containerId]
|
|
}
|
|
|
|
// setCpuCurrentValues stores current CPU values for a container and cache time interval
|
|
func (dm *dockerManager) setCpuCurrentValues(cacheTimeMs uint16, containerId string, cpuContainer, cpuSystem uint64) {
|
|
dm.lastCpuContainer[cacheTimeMs][containerId] = cpuContainer
|
|
dm.lastCpuSystem[cacheTimeMs][containerId] = cpuSystem
|
|
}
|
|
|
|
// calculateMemoryUsage calculates memory usage from Docker API stats
|
|
func calculateMemoryUsage(apiStats *container.ApiStats, isWindows bool) (uint64, error) {
|
|
if isWindows {
|
|
return apiStats.MemoryStats.PrivateWorkingSet, nil
|
|
}
|
|
|
|
memCache := apiStats.MemoryStats.Stats.InactiveFile
|
|
if memCache == 0 {
|
|
memCache = apiStats.MemoryStats.Stats.Cache
|
|
}
|
|
|
|
usedDelta := apiStats.MemoryStats.Usage - memCache
|
|
if usedDelta <= 0 || usedDelta > maxMemoryUsage {
|
|
return 0, fmt.Errorf("bad memory stats")
|
|
}
|
|
|
|
return usedDelta, nil
|
|
}
|
|
|
|
// getNetworkTracker returns the DeltaTracker for a specific cache time, creating it if needed
|
|
func (dm *dockerManager) getNetworkTracker(cacheTimeMs uint16, isSent bool) *deltatracker.DeltaTracker[string, uint64] {
|
|
var trackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
|
if isSent {
|
|
trackers = dm.networkSentTrackers
|
|
} else {
|
|
trackers = dm.networkRecvTrackers
|
|
}
|
|
|
|
if trackers[cacheTimeMs] == nil {
|
|
trackers[cacheTimeMs] = deltatracker.NewDeltaTracker[string, uint64]()
|
|
}
|
|
|
|
return trackers[cacheTimeMs]
|
|
}
|
|
|
|
// cycleNetworkDeltasForCacheTime cycles the network delta trackers for a specific cache time
|
|
func (dm *dockerManager) cycleNetworkDeltasForCacheTime(cacheTimeMs uint16) {
|
|
if dm.networkSentTrackers[cacheTimeMs] != nil {
|
|
dm.networkSentTrackers[cacheTimeMs].Cycle()
|
|
}
|
|
if dm.networkRecvTrackers[cacheTimeMs] != nil {
|
|
dm.networkRecvTrackers[cacheTimeMs].Cycle()
|
|
}
|
|
}
|
|
|
|
// calculateNetworkStats calculates network sent/receive deltas using DeltaTracker
|
|
func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, name string, cacheTimeMs uint16) (uint64, uint64) {
|
|
var total_sent, total_recv uint64
|
|
for _, v := range apiStats.Networks {
|
|
total_sent += v.TxBytes
|
|
total_recv += v.RxBytes
|
|
}
|
|
|
|
// Get the DeltaTracker for this specific cache time
|
|
sentTracker := dm.getNetworkTracker(cacheTimeMs, true)
|
|
recvTracker := dm.getNetworkTracker(cacheTimeMs, false)
|
|
|
|
// Set current values in the cache-time-specific DeltaTracker
|
|
sentTracker.Set(ctr.IdShort, total_sent)
|
|
recvTracker.Set(ctr.IdShort, total_recv)
|
|
|
|
// Get deltas (bytes since last measurement)
|
|
sent_delta_raw := sentTracker.Delta(ctr.IdShort)
|
|
recv_delta_raw := recvTracker.Delta(ctr.IdShort)
|
|
|
|
// Calculate bytes per second using per-cache-time read time to avoid
|
|
// interference between different cache intervals (e.g. 1000ms vs 60000ms)
|
|
var sent_delta, recv_delta uint64
|
|
if prevReadTime, ok := dm.lastNetworkReadTime[cacheTimeMs][ctr.IdShort]; ok {
|
|
millisecondsElapsed := uint64(time.Since(prevReadTime).Milliseconds())
|
|
if millisecondsElapsed > 0 {
|
|
if sent_delta_raw > 0 {
|
|
sent_delta = sent_delta_raw * 1000 / millisecondsElapsed
|
|
if sent_delta > maxNetworkSpeedBps {
|
|
slog.Warn("Bad network delta", "container", name)
|
|
sent_delta = 0
|
|
}
|
|
}
|
|
if recv_delta_raw > 0 {
|
|
recv_delta = recv_delta_raw * 1000 / millisecondsElapsed
|
|
if recv_delta > maxNetworkSpeedBps {
|
|
slog.Warn("Bad network delta", "container", name)
|
|
recv_delta = 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return sent_delta, recv_delta
|
|
}
|
|
|
|
// validateCpuPercentage checks if CPU percentage is within valid range
|
|
func validateCpuPercentage(cpuPct float64, containerName string) error {
|
|
if cpuPct > 100 {
|
|
return fmt.Errorf("%s cpu pct greater than 100: %+v", containerName, cpuPct)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateContainerStatsValues updates the final stats values
|
|
func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta uint64, readTime time.Time) {
|
|
stats.Cpu = utils.TwoDecimals(cpuPct)
|
|
stats.Mem = utils.BytesToMegabytes(float64(usedMemory))
|
|
stats.Bandwidth = [2]uint64{sent_delta, recv_delta}
|
|
// TODO(0.19+): stop populating NetworkSent/NetworkRecv (deprecated in 0.18.3)
|
|
stats.NetworkSent = utils.BytesToMegabytes(float64(sent_delta))
|
|
stats.NetworkRecv = utils.BytesToMegabytes(float64(recv_delta))
|
|
stats.PrevReadTime = readTime
|
|
}
|
|
|
|
// convertContainerPortsToString formats the ports of a container into a sorted, deduplicated string.
|
|
// ctr.Ports is nilled out after processing so the slice is not accidentally reused.
|
|
func convertContainerPortsToString(ctr *container.ApiInfo) string {
|
|
if len(ctr.Ports) == 0 {
|
|
return ""
|
|
}
|
|
sort.Slice(ctr.Ports, func(i, j int) bool {
|
|
return ctr.Ports[i].PublicPort < ctr.Ports[j].PublicPort
|
|
})
|
|
var builder strings.Builder
|
|
seenPorts := make(map[uint16]struct{})
|
|
for _, p := range ctr.Ports {
|
|
_, ok := seenPorts[p.PublicPort]
|
|
if p.PublicPort == 0 || ok {
|
|
continue
|
|
}
|
|
seenPorts[p.PublicPort] = struct{}{}
|
|
if builder.Len() > 0 {
|
|
builder.WriteString(", ")
|
|
}
|
|
switch p.IP {
|
|
case "0.0.0.0", "::":
|
|
default:
|
|
builder.WriteString(p.IP)
|
|
builder.WriteByte(':')
|
|
}
|
|
builder.WriteString(strconv.Itoa(int(p.PublicPort)))
|
|
}
|
|
// clear ports slice so it doesn't get reused and blend into next response
|
|
ctr.Ports = nil
|
|
return builder.String()
|
|
}
|
|
|
|
func parseDockerStatus(status string) (string, container.DockerHealth) {
|
|
trimmed := strings.TrimSpace(status)
|
|
if trimmed == "" {
|
|
return "", container.DockerHealthNone
|
|
}
|
|
|
|
// Remove "About " from status
|
|
trimmed = strings.Replace(trimmed, "About ", "", 1)
|
|
|
|
openIdx := strings.LastIndex(trimmed, "(")
|
|
if openIdx == -1 || !strings.HasSuffix(trimmed, ")") {
|
|
return trimmed, container.DockerHealthNone
|
|
}
|
|
|
|
statusText := strings.TrimSpace(trimmed[:openIdx])
|
|
if statusText == "" {
|
|
statusText = trimmed
|
|
}
|
|
|
|
healthText := strings.TrimSpace(strings.TrimSuffix(trimmed[openIdx+1:], ")"))
|
|
// Some Docker statuses include a "health:" prefix inside the parentheses.
|
|
// Strip it so it maps correctly to the known health states.
|
|
if colonIdx := strings.IndexRune(healthText, ':'); colonIdx != -1 {
|
|
prefix := strings.ToLower(strings.TrimSpace(healthText[:colonIdx]))
|
|
if prefix == "health" || prefix == "health status" {
|
|
healthText = strings.TrimSpace(healthText[colonIdx+1:])
|
|
}
|
|
}
|
|
if health, ok := parseDockerHealthStatus(healthText); ok {
|
|
return statusText, health
|
|
}
|
|
|
|
return trimmed, container.DockerHealthNone
|
|
}
|
|
|
|
// parseDockerHealthStatus maps Docker health status strings to container.DockerHealth values
|
|
func parseDockerHealthStatus(status string) (container.DockerHealth, bool) {
|
|
health, ok := container.DockerHealthStrings[strings.ToLower(strings.TrimSpace(status))]
|
|
return health, ok
|
|
}
|
|
|
|
// getPodmanContainerHealth fetches container health status from the container inspect endpoint.
|
|
// Used for Podman which doesn't provide health status in the /containers/json endpoint as of March 2026.
|
|
// https://github.com/containers/podman/issues/27786
|
|
func (dm *dockerManager) getPodmanContainerHealth(containerID string) (container.DockerHealth, error) {
|
|
resp, err := dm.client.Get(fmt.Sprintf("http://localhost/containers/%s/json", url.PathEscape(containerID)))
|
|
if err != nil {
|
|
return container.DockerHealthNone, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return container.DockerHealthNone, fmt.Errorf("container inspect request failed: %s", resp.Status)
|
|
}
|
|
|
|
var inspectInfo struct {
|
|
State struct {
|
|
Health struct {
|
|
Status string
|
|
}
|
|
}
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&inspectInfo); err != nil {
|
|
return container.DockerHealthNone, err
|
|
}
|
|
|
|
if health, ok := parseDockerHealthStatus(inspectInfo.State.Health.Status); ok {
|
|
return health, nil
|
|
}
|
|
|
|
return container.DockerHealthNone, nil
|
|
}
|
|
|
|
// Updates stats for individual container with cache-time-aware delta tracking
|
|
func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeMs uint16) error {
|
|
name := ctr.Names[0][1:]
|
|
|
|
resp, err := dm.client.Get(fmt.Sprintf("http://localhost/containers/%s/stats?stream=0&one-shot=1", ctr.IdShort))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
statusText, health := parseDockerStatus(ctr.Status)
|
|
|
|
// Docker exposes Health.Status on /containers/json in API 1.52+.
|
|
// Podman currently requires falling back to the inspect endpoint as of March 2026.
|
|
// https://github.com/containers/podman/issues/27786
|
|
if ctr.Health.Status != "" {
|
|
if h, ok := parseDockerHealthStatus(ctr.Health.Status); ok {
|
|
health = h
|
|
}
|
|
} else if dm.usingPodman {
|
|
if podmanHealth, err := dm.getPodmanContainerHealth(ctr.IdShort); err == nil {
|
|
health = podmanHealth
|
|
}
|
|
}
|
|
|
|
dm.containerStatsMutex.Lock()
|
|
defer dm.containerStatsMutex.Unlock()
|
|
|
|
// add empty values if they doesn't exist in map
|
|
stats, initialized := dm.containerStatsMap[ctr.IdShort]
|
|
if !initialized {
|
|
stats = &container.Stats{Name: name, Id: ctr.IdShort, Image: ctr.Image}
|
|
dm.containerStatsMap[ctr.IdShort] = stats
|
|
}
|
|
|
|
stats.Id = ctr.IdShort
|
|
stats.Status = statusText
|
|
stats.Health = health
|
|
|
|
if len(ctr.Ports) > 0 {
|
|
stats.Ports = convertContainerPortsToString(ctr)
|
|
}
|
|
|
|
// reset current stats
|
|
stats.Cpu = 0
|
|
stats.Mem = 0
|
|
stats.Bandwidth = [2]uint64{0, 0}
|
|
// TODO(0.19+): stop populating NetworkSent/NetworkRecv (deprecated in 0.18.3)
|
|
stats.NetworkSent = 0
|
|
stats.NetworkRecv = 0
|
|
|
|
res := dm.apiStats
|
|
res.Networks = nil
|
|
if err := dm.decode(resp, res); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Initialize CPU tracking for this cache time interval
|
|
dm.initializeCpuTracking(cacheTimeMs)
|
|
|
|
// Get previous CPU values
|
|
prevCpuContainer, prevCpuSystem := dm.getCpuPreviousValues(cacheTimeMs, ctr.IdShort)
|
|
|
|
// Calculate CPU percentage based on platform
|
|
var cpuPct float64
|
|
if dm.isWindows {
|
|
prevRead := dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort]
|
|
cpuPct = res.CalculateCpuPercentWindows(prevCpuContainer, prevRead)
|
|
} else {
|
|
cpuPct = res.CalculateCpuPercentLinux(prevCpuContainer, prevCpuSystem)
|
|
}
|
|
|
|
// Calculate memory usage
|
|
usedMemory, err := calculateMemoryUsage(res, dm.isWindows)
|
|
if err != nil {
|
|
return fmt.Errorf("%s - %w - see https://github.com/henrygd/beszel/issues/144", name, err)
|
|
}
|
|
|
|
// Store current CPU stats for next calculation
|
|
currentCpuContainer := res.CPUStats.CPUUsage.TotalUsage
|
|
currentCpuSystem := res.CPUStats.SystemUsage
|
|
dm.setCpuCurrentValues(cacheTimeMs, ctr.IdShort, currentCpuContainer, currentCpuSystem)
|
|
|
|
// Validate CPU percentage
|
|
if err := validateCpuPercentage(cpuPct, name); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Calculate network stats using DeltaTracker
|
|
sent_delta, recv_delta := dm.calculateNetworkStats(ctr, res, name, cacheTimeMs)
|
|
|
|
// Store per-cache-time network read time for next rate calculation
|
|
if dm.lastNetworkReadTime[cacheTimeMs] == nil {
|
|
dm.lastNetworkReadTime[cacheTimeMs] = make(map[string]time.Time)
|
|
}
|
|
dm.lastNetworkReadTime[cacheTimeMs][ctr.IdShort] = time.Now()
|
|
|
|
// Store current network values for legacy compatibility
|
|
var total_sent, total_recv uint64
|
|
for _, v := range res.Networks {
|
|
total_sent += v.TxBytes
|
|
total_recv += v.RxBytes
|
|
}
|
|
stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv
|
|
|
|
// Update final stats values
|
|
updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, res.Read)
|
|
// store per-cache-time read time for Windows CPU percent calc
|
|
dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort] = res.Read
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete container stats from map using mutex
|
|
func (dm *dockerManager) deleteContainerStatsSync(id string) {
|
|
dm.containerStatsMutex.Lock()
|
|
defer dm.containerStatsMutex.Unlock()
|
|
delete(dm.containerStatsMap, id)
|
|
for ct := range dm.lastCpuContainer {
|
|
delete(dm.lastCpuContainer[ct], id)
|
|
}
|
|
for ct := range dm.lastCpuSystem {
|
|
delete(dm.lastCpuSystem[ct], id)
|
|
}
|
|
for ct := range dm.lastCpuReadTime {
|
|
delete(dm.lastCpuReadTime[ct], id)
|
|
}
|
|
for ct := range dm.lastNetworkReadTime {
|
|
delete(dm.lastNetworkReadTime[ct], id)
|
|
}
|
|
}
|
|
|
|
// Creates a new http client for Docker or Podman API
|
|
func newDockerManager() *dockerManager {
|
|
dockerHost, exists := utils.GetEnv("DOCKER_HOST")
|
|
if exists {
|
|
// return nil if set to empty string
|
|
if dockerHost == "" {
|
|
return nil
|
|
}
|
|
} else {
|
|
dockerHost = getDockerHost()
|
|
}
|
|
|
|
parsedURL, err := url.Parse(dockerHost)
|
|
if err != nil {
|
|
os.Exit(1)
|
|
}
|
|
|
|
transport := &http.Transport{
|
|
DisableCompression: true,
|
|
MaxConnsPerHost: 0,
|
|
}
|
|
|
|
switch parsedURL.Scheme {
|
|
case "unix":
|
|
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
|
|
return (&net.Dialer{}).DialContext(ctx, "unix", parsedURL.Path)
|
|
}
|
|
case "tcp", "http", "https":
|
|
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
|
|
return (&net.Dialer{}).DialContext(ctx, "tcp", parsedURL.Host)
|
|
}
|
|
default:
|
|
slog.Error("Invalid DOCKER_HOST", "scheme", parsedURL.Scheme)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// configurable timeout
|
|
timeout := time.Millisecond * time.Duration(dockerTimeoutMs)
|
|
if t, set := utils.GetEnv("DOCKER_TIMEOUT"); set {
|
|
timeout, err = time.ParseDuration(t)
|
|
if err != nil {
|
|
slog.Error(err.Error())
|
|
os.Exit(1)
|
|
}
|
|
slog.Info("DOCKER_TIMEOUT", "timeout", timeout)
|
|
}
|
|
|
|
// Custom user-agent to avoid docker bug: https://github.com/docker/for-mac/issues/7575
|
|
userAgentTransport := &userAgentRoundTripper{
|
|
rt: transport,
|
|
userAgent: "Docker-Client/",
|
|
}
|
|
|
|
// Read container exclusion patterns from environment variable
|
|
var excludeContainers []string
|
|
if excludeStr, set := utils.GetEnv("EXCLUDE_CONTAINERS"); set && excludeStr != "" {
|
|
parts := strings.SplitSeq(excludeStr, ",")
|
|
for part := range parts {
|
|
trimmed := strings.TrimSpace(part)
|
|
if trimmed != "" {
|
|
excludeContainers = append(excludeContainers, trimmed)
|
|
}
|
|
}
|
|
slog.Info("EXCLUDE_CONTAINERS", "patterns", excludeContainers)
|
|
}
|
|
|
|
manager := &dockerManager{
|
|
client: &http.Client{
|
|
Timeout: timeout,
|
|
Transport: userAgentTransport,
|
|
},
|
|
containerStatsMap: make(map[string]*container.Stats),
|
|
sem: make(chan struct{}, 5),
|
|
apiContainerList: []*container.ApiInfo{},
|
|
apiStats: &container.ApiStats{},
|
|
excludeContainers: excludeContainers,
|
|
|
|
// Initialize cache-time-aware tracking structures
|
|
lastCpuContainer: make(map[uint16]map[string]uint64),
|
|
lastCpuSystem: make(map[uint16]map[string]uint64),
|
|
lastCpuReadTime: make(map[uint16]map[string]time.Time),
|
|
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
|
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
|
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
|
|
retrySleep: time.Sleep,
|
|
}
|
|
|
|
// If using podman, return client
|
|
if strings.Contains(dockerHost, "podman") {
|
|
manager.usingPodman = true
|
|
manager.goodDockerVersion = true
|
|
return manager
|
|
}
|
|
|
|
// run version check in goroutine to avoid blocking (server may not be ready and requires retries)
|
|
go manager.checkDockerVersion()
|
|
|
|
// give version check a chance to complete before returning
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
return manager
|
|
}
|
|
|
|
// checkDockerVersion checks Docker version and sets goodDockerVersion if at least 25.0.0.
|
|
// Versions before 25.0.0 have a bug with one-shot which requires all requests to be made in one batch.
|
|
func (dm *dockerManager) checkDockerVersion() {
|
|
var err error
|
|
var resp *http.Response
|
|
var versionInfo struct {
|
|
Version string `json:"Version"`
|
|
}
|
|
const versionMaxTries = 2
|
|
for i := 1; i <= versionMaxTries; i++ {
|
|
resp, err = dm.client.Get("http://localhost/version")
|
|
if err == nil && resp.StatusCode == http.StatusOK {
|
|
break
|
|
}
|
|
if resp != nil {
|
|
resp.Body.Close()
|
|
}
|
|
if i < versionMaxTries {
|
|
slog.Debug("Failed to get Docker version; retrying", "attempt", i, "err", err, "response", resp)
|
|
dm.retrySleep(5 * time.Second)
|
|
}
|
|
}
|
|
if err != nil || resp.StatusCode != http.StatusOK {
|
|
return
|
|
}
|
|
if err := dm.decode(resp, &versionInfo); err != nil {
|
|
return
|
|
}
|
|
// if version > 24, one-shot works correctly and we can limit concurrent operations
|
|
if dockerVersion, err := semver.Parse(versionInfo.Version); err == nil && dockerVersion.Major > 24 {
|
|
dm.goodDockerVersion = true
|
|
} else {
|
|
slog.Info(fmt.Sprintf("Docker %s is outdated. Upgrade if possible. See https://github.com/henrygd/beszel/issues/58", versionInfo.Version))
|
|
}
|
|
}
|
|
|
|
// Decodes Docker API JSON response using a reusable buffer and decoder. Not thread safe.
|
|
func (dm *dockerManager) decode(resp *http.Response, d any) error {
|
|
if dm.buf == nil {
|
|
// initialize buffer with 256kb starting size
|
|
dm.buf = bytes.NewBuffer(make([]byte, 0, 1024*256))
|
|
dm.decoder = json.NewDecoder(dm.buf)
|
|
}
|
|
defer resp.Body.Close()
|
|
defer dm.buf.Reset()
|
|
_, err := dm.buf.ReadFrom(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return dm.decoder.Decode(d)
|
|
}
|
|
|
|
// Test docker / podman sockets and return if one exists
|
|
func getDockerHost() string {
|
|
scheme := "unix://"
|
|
socks := []string{"/var/run/docker.sock", fmt.Sprintf("/run/user/%v/podman/podman.sock", os.Getuid())}
|
|
for _, sock := range socks {
|
|
if _, err := os.Stat(sock); err == nil {
|
|
return scheme + sock
|
|
}
|
|
}
|
|
return scheme + socks[0]
|
|
}
|
|
|
|
func validateContainerID(containerID string) error {
|
|
if !dockerContainerIDPattern.MatchString(containerID) {
|
|
return fmt.Errorf("invalid container id")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func buildDockerContainerEndpoint(containerID, action string, query url.Values) (string, error) {
|
|
if err := validateContainerID(containerID); err != nil {
|
|
return "", err
|
|
}
|
|
u := &url.URL{
|
|
Scheme: "http",
|
|
Host: "localhost",
|
|
Path: fmt.Sprintf("/containers/%s/%s", url.PathEscape(containerID), action),
|
|
}
|
|
if len(query) > 0 {
|
|
u.RawQuery = query.Encode()
|
|
}
|
|
return u.String(), nil
|
|
}
|
|
|
|
// getContainerInfo fetches the inspection data for a container
|
|
func (dm *dockerManager) getContainerInfo(ctx context.Context, containerID string) ([]byte, error) {
|
|
endpoint, err := buildDockerContainerEndpoint(containerID, "json", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := dm.client.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
|
return nil, fmt.Errorf("container info request failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
|
|
}
|
|
|
|
// Remove sensitive environment variables from Config.Env
|
|
var containerInfo map[string]any
|
|
if err := json.NewDecoder(resp.Body).Decode(&containerInfo); err != nil {
|
|
return nil, err
|
|
}
|
|
if config, ok := containerInfo["Config"].(map[string]any); ok {
|
|
delete(config, "Env")
|
|
}
|
|
|
|
return json.Marshal(containerInfo)
|
|
}
|
|
|
|
// getLogs fetches the logs for a container
|
|
func (dm *dockerManager) getLogs(ctx context.Context, containerID string) (string, error) {
|
|
query := url.Values{
|
|
"stdout": []string{"1"},
|
|
"stderr": []string{"1"},
|
|
"tail": []string{fmt.Sprintf("%d", dockerLogsTail)},
|
|
}
|
|
endpoint, err := buildDockerContainerEndpoint(containerID, "logs", query)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
resp, err := dm.client.Do(req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
|
return "", fmt.Errorf("logs request failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
|
|
}
|
|
|
|
var builder strings.Builder
|
|
contentType := resp.Header.Get("Content-Type")
|
|
multiplexed := strings.HasSuffix(contentType, "multiplexed-stream")
|
|
logReader := io.Reader(resp.Body)
|
|
if !multiplexed {
|
|
// Podman may return multiplexed logs without Content-Type. Sniff the first frame header
|
|
// with a small buffered reader only when the header check fails.
|
|
bufferedReader := bufio.NewReaderSize(resp.Body, 8)
|
|
multiplexed = detectDockerMultiplexedStream(bufferedReader)
|
|
logReader = bufferedReader
|
|
}
|
|
if err := decodeDockerLogStream(logReader, &builder, multiplexed); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Strip ANSI escape sequences from logs for clean display in web UI
|
|
logs := builder.String()
|
|
if strings.Contains(logs, "\x1b") {
|
|
logs = ansiEscapePattern.ReplaceAllString(logs, "")
|
|
}
|
|
return logs, nil
|
|
}
|
|
|
|
func detectDockerMultiplexedStream(reader *bufio.Reader) bool {
|
|
const headerSize = 8
|
|
header, err := reader.Peek(headerSize)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if header[0] != 0x01 && header[0] != 0x02 {
|
|
return false
|
|
}
|
|
// Docker's stream framing header reserves bytes 1-3 as zero.
|
|
if header[1] != 0 || header[2] != 0 || header[3] != 0 {
|
|
return false
|
|
}
|
|
frameLen := binary.BigEndian.Uint32(header[4:])
|
|
return frameLen <= maxLogFrameSize
|
|
}
|
|
|
|
func decodeDockerLogStream(reader io.Reader, builder *strings.Builder, multiplexed bool) error {
|
|
if !multiplexed {
|
|
_, err := io.Copy(builder, io.LimitReader(reader, maxTotalLogSize))
|
|
return err
|
|
}
|
|
const headerSize = 8
|
|
var header [headerSize]byte
|
|
totalBytesRead := 0
|
|
|
|
for {
|
|
if _, err := io.ReadFull(reader, header[:]); err != nil {
|
|
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
frameLen := binary.BigEndian.Uint32(header[4:])
|
|
if frameLen == 0 {
|
|
continue
|
|
}
|
|
|
|
// Prevent memory exhaustion from excessively large frames
|
|
if frameLen > maxLogFrameSize {
|
|
return fmt.Errorf("log frame size (%d) exceeds maximum (%d)", frameLen, maxLogFrameSize)
|
|
}
|
|
|
|
// Check if reading this frame would exceed total log size limit
|
|
if totalBytesRead+int(frameLen) > maxTotalLogSize {
|
|
// Read and discard remaining data to avoid blocking
|
|
_, _ = io.CopyN(io.Discard, reader, int64(frameLen))
|
|
slog.Debug("Truncating logs: limit reached", "read", totalBytesRead, "limit", maxTotalLogSize)
|
|
return nil
|
|
}
|
|
|
|
n, err := io.CopyN(builder, reader, int64(frameLen))
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
totalBytesRead += int(n)
|
|
}
|
|
}
|
|
|
|
// GetHostInfo fetches the system info from Docker
|
|
func (dm *dockerManager) GetHostInfo() (info container.HostInfo, err error) {
|
|
resp, err := dm.client.Get("http://localhost/info")
|
|
if err != nil {
|
|
return info, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
|
|
return info, err
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
func (dm *dockerManager) IsPodman() bool {
|
|
return dm.usingPodman
|
|
}
|