From 845369ab54f785475cd63f9ebb8c96c4ec3f9482 Mon Sep 17 00:00:00 2001 From: henrygd Date: Tue, 10 Feb 2026 18:58:52 -0500 Subject: [PATCH] reset Docker client connections after repeated old-engine list failures (#1728) --- agent/docker.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/agent/docker.go b/agent/docker.go index f89e7497..10e16b2d 100644 --- a/agent/docker.go +++ b/agent/docker.go @@ -32,6 +32,10 @@ var ansiEscapePattern = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]|\x1b\][^\x07]* const ( // Docker API timeout in milliseconds dockerTimeoutMs = 2100 + // Number of consecutive /containers/json failures before forcing a client reset on old Docker versions + dockerClientResetFailureThreshold = 3 + // Minimum time between Docker client resets to avoid reset flapping + dockerClientResetCooldown = 30 * time.Second // Maximum realistic network speed (5 GB/s) to detect bad deltas maxNetworkSpeedBps uint64 = 5e9 // Maximum conceivable memory usage of a container (100TB) to detect bad memory stats @@ -55,12 +59,16 @@ type dockerManager struct { containerStatsMap map[string]*container.Stats // Keeps track of container stats validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly) + versionChecked bool // Whether docker version detection completed successfully isWindows bool // Whether the Docker Engine API is running on Windows buf *bytes.Buffer // Buffer to store and read response bodies decoder *json.Decoder // Reusable JSON decoder that reads from buf apiStats *container.ApiStats // Reusable API stats object excludeContainers []string // Patterns to exclude containers by name usingPodman bool // Whether the Docker Engine API is running on Podman + transport *http.Transport // Base transport used by client for connection resets + consecutiveListFailures int // Number of consecutive /containers/json request failures + lastClientReset time.Time // Last time the Docker client connections were reset // Cache-time-aware tracking for CPU stats (similar to cpu.go) // Maps cache time intervals to container-specific CPU usage tracking @@ -119,8 +127,10 @@ func (dm *dockerManager) shouldExcludeContainer(name string) bool { func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats, error) { resp, err := dm.client.Get("http://localhost/containers/json") if err != nil { + dm.handleContainerListError(err) return nil, err } + dm.consecutiveListFailures = 0 dm.apiContainerList = dm.apiContainerList[:0] if err := dm.decode(resp, &dm.apiContainerList); err != nil { @@ -204,6 +214,50 @@ func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats, return stats, nil } +func (dm *dockerManager) handleContainerListError(err error) { + dm.consecutiveListFailures++ + if !dm.shouldResetDockerClient(err) { + return + } + dm.resetDockerClientConnections() +} + +func (dm *dockerManager) shouldResetDockerClient(err error) bool { + if !dm.versionChecked || dm.goodDockerVersion { + return false + } + if dm.consecutiveListFailures < dockerClientResetFailureThreshold { + return false + } + if !dm.lastClientReset.IsZero() && time.Since(dm.lastClientReset) < dockerClientResetCooldown { + return false + } + return isDockerApiOverloadError(err) +} + +func isDockerApiOverloadError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + msg := err.Error() + return strings.Contains(msg, "Client.Timeout exceeded") || + strings.Contains(msg, "request canceled") || + strings.Contains(msg, "context deadline exceeded") || + strings.Contains(msg, "EOF") +} + +func (dm *dockerManager) resetDockerClientConnections() { + if dm.transport == nil { + return + } + dm.transport.CloseIdleConnections() + dm.lastClientReset = time.Now() + slog.Warn("Reset Docker client connections after repeated /containers/json failures", "failures", dm.consecutiveListFailures) +} + // initializeCpuTracking initializes CPU tracking maps for a specific cache time interval func (dm *dockerManager) initializeCpuTracking(cacheTimeMs uint16) { // Initialize cache time maps if they don't exist @@ -553,6 +607,7 @@ func newDockerManager() *dockerManager { Timeout: timeout, Transport: userAgentTransport, }, + transport: transport, containerStatsMap: make(map[string]*container.Stats), sem: make(chan struct{}, 5), apiContainerList: []*container.ApiInfo{}, @@ -611,6 +666,7 @@ func (dm *dockerManager) checkDockerVersion() { if err := dm.decode(resp, &versionInfo); err != nil { return } + dm.versionChecked = true // if version > 24, one-shot works correctly and we can limit concurrent operations if dockerVersion, err := semver.Parse(versionInfo.Version); err == nil && dockerVersion.Major > 24 { dm.goodDockerVersion = true