Compare commits

...

2 Commits

Author SHA1 Message Date
henrygd
e3e453140e fix(agent): isolate container network rate tracking per cache interval
Previously, the agent shared a single PrevReadTime timestamp across all
collection intervals (e.g., 1s and 60s). This caused the 60s collector
to divide its accumulated 60s byte delta by the tiny time elapsed since
the last 1s collection, resulting in astronomically inflated network
rates. The fix introduces per-cache-time read time tracking, ensuring
calculations for each interval use their own independent timing context.
2026-03-24 13:07:56 -04:00
henrygd
7a64da9f65 hub: add guard to WSConn.Ping to ensure no nil conn ptr 2026-03-23 15:25:43 -04:00
3 changed files with 111 additions and 24 deletions

View File

@@ -77,6 +77,7 @@ type dockerManager struct {
// cacheTimeMs -> DeltaTracker for network bytes sent/received
networkSentTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
networkRecvTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
lastNetworkReadTime map[uint16]map[string]time.Time // cacheTimeMs -> containerId -> last network read time
retrySleep func(time.Duration)
}
@@ -285,7 +286,7 @@ func (dm *dockerManager) cycleNetworkDeltasForCacheTime(cacheTimeMs uint16) {
}
// calculateNetworkStats calculates network sent/receive deltas using DeltaTracker
func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, stats *container.Stats, initialized bool, name string, cacheTimeMs uint16) (uint64, uint64) {
func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, name string, cacheTimeMs uint16) (uint64, uint64) {
var total_sent, total_recv uint64
for _, v := range apiStats.Networks {
total_sent += v.TxBytes
@@ -304,10 +305,11 @@ func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats
sent_delta_raw := sentTracker.Delta(ctr.IdShort)
recv_delta_raw := recvTracker.Delta(ctr.IdShort)
// Calculate bytes per second independently for Tx and Rx if we have previous data
// Calculate bytes per second using per-cache-time read time to avoid
// interference between different cache intervals (e.g. 1000ms vs 60000ms)
var sent_delta, recv_delta uint64
if initialized {
millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds())
if prevReadTime, ok := dm.lastNetworkReadTime[cacheTimeMs][ctr.IdShort]; ok {
millisecondsElapsed := uint64(time.Since(prevReadTime).Milliseconds())
if millisecondsElapsed > 0 {
if sent_delta_raw > 0 {
sent_delta = sent_delta_raw * 1000 / millisecondsElapsed
@@ -542,7 +544,13 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeM
}
// Calculate network stats using DeltaTracker
sent_delta, recv_delta := dm.calculateNetworkStats(ctr, res, stats, initialized, name, cacheTimeMs)
sent_delta, recv_delta := dm.calculateNetworkStats(ctr, res, name, cacheTimeMs)
// Store per-cache-time network read time for next rate calculation
if dm.lastNetworkReadTime[cacheTimeMs] == nil {
dm.lastNetworkReadTime[cacheTimeMs] = make(map[string]time.Time)
}
dm.lastNetworkReadTime[cacheTimeMs][ctr.IdShort] = time.Now()
// Store current network values for legacy compatibility
var total_sent, total_recv uint64
@@ -574,6 +582,9 @@ func (dm *dockerManager) deleteContainerStatsSync(id string) {
for ct := range dm.lastCpuReadTime {
delete(dm.lastCpuReadTime[ct], id)
}
for ct := range dm.lastNetworkReadTime {
delete(dm.lastNetworkReadTime[ct], id)
}
}
// Creates a new http client for Docker or Podman API
@@ -659,6 +670,7 @@ func newDockerManager() *dockerManager {
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
retrySleep: time.Sleep,
}

View File

@@ -408,6 +408,7 @@ func TestCalculateNetworkStats(t *testing.T) {
dm := &dockerManager{
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
}
cacheTimeMs := uint16(30000)
@@ -423,6 +424,11 @@ func TestCalculateNetworkStats(t *testing.T) {
dm.networkSentTrackers[cacheTimeMs] = sentTracker
dm.networkRecvTrackers[cacheTimeMs] = recvTracker
// Set per-cache-time network read time (1 second ago)
dm.lastNetworkReadTime[cacheTimeMs] = map[string]time.Time{
"container1": time.Now().Add(-time.Second),
}
ctr := &container.ApiInfo{
IdShort: "container1",
}
@@ -433,12 +439,8 @@ func TestCalculateNetworkStats(t *testing.T) {
},
}
stats := &container.Stats{
PrevReadTime: time.Now().Add(-time.Second), // 1 second ago
}
// Test with initialized container
sent, recv := dm.calculateNetworkStats(ctr, apiStats, stats, true, "test-container", cacheTimeMs)
sent, recv := dm.calculateNetworkStats(ctr, apiStats, "test-container", cacheTimeMs)
// Should return calculated byte rates per second
assert.GreaterOrEqual(t, sent, uint64(0))
@@ -446,12 +448,76 @@ func TestCalculateNetworkStats(t *testing.T) {
// Cycle and test one-direction change (Tx only) is reflected independently
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
dm.lastNetworkReadTime[cacheTimeMs]["container1"] = time.Now().Add(-time.Second)
apiStats.Networks["eth0"] = container.NetworkStats{TxBytes: 2500, RxBytes: 1800} // +500 Tx only
sent, recv = dm.calculateNetworkStats(ctr, apiStats, stats, true, "test-container", cacheTimeMs)
sent, recv = dm.calculateNetworkStats(ctr, apiStats, "test-container", cacheTimeMs)
assert.Greater(t, sent, uint64(0))
assert.Equal(t, uint64(0), recv)
}
// TestNetworkStatsCacheTimeIsolation verifies that frequent collections at one cache time
// (e.g. 1000ms) don't cause inflated rates at another cache time (e.g. 60000ms).
// This was a bug where PrevReadTime was shared, so the 60000ms tracker would see a
// large byte delta divided by a tiny elapsed time (set by the 1000ms path).
func TestNetworkStatsCacheTimeIsolation(t *testing.T) {
dm := &dockerManager{
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
}
ctr := &container.ApiInfo{IdShort: "container1"}
fastCache := uint16(1000)
slowCache := uint16(60000)
// Baseline for both cache times at T=0 with 100 bytes total
baseline := &container.ApiStats{
Networks: map[string]container.NetworkStats{
"eth0": {TxBytes: 100, RxBytes: 100},
},
}
dm.calculateNetworkStats(ctr, baseline, "test", fastCache)
dm.calculateNetworkStats(ctr, baseline, "test", slowCache)
// Record read times and cycle both
now := time.Now()
dm.lastNetworkReadTime[fastCache] = map[string]time.Time{"container1": now}
dm.lastNetworkReadTime[slowCache] = map[string]time.Time{"container1": now}
dm.cycleNetworkDeltasForCacheTime(fastCache)
dm.cycleNetworkDeltasForCacheTime(slowCache)
// Simulate many fast (1000ms) collections over ~5 seconds, each adding 10 bytes
totalBytes := uint64(100)
for i := 0; i < 5; i++ {
totalBytes += 10
stats := &container.ApiStats{
Networks: map[string]container.NetworkStats{
"eth0": {TxBytes: totalBytes, RxBytes: totalBytes},
},
}
// Set fast cache read time to 1 second ago
dm.lastNetworkReadTime[fastCache]["container1"] = time.Now().Add(-time.Second)
sent, _ := dm.calculateNetworkStats(ctr, stats, "test", fastCache)
// Fast cache should see ~10 bytes/sec per interval
assert.LessOrEqual(t, sent, uint64(100), "fast cache rate should be reasonable")
dm.cycleNetworkDeltasForCacheTime(fastCache)
}
// Now do slow cache collection — total delta is 50 bytes over ~5 seconds
// Set slow cache read time to 5 seconds ago (the actual elapsed time)
dm.lastNetworkReadTime[slowCache]["container1"] = time.Now().Add(-5 * time.Second)
finalStats := &container.ApiStats{
Networks: map[string]container.NetworkStats{
"eth0": {TxBytes: totalBytes, RxBytes: totalBytes},
},
}
sent, _ := dm.calculateNetworkStats(ctr, finalStats, "test", slowCache)
// Slow cache rate should be ~10 bytes/sec (50 bytes / 5 seconds), NOT 100x inflated
assert.LessOrEqual(t, sent, uint64(100), "slow cache rate should NOT be inflated by fast cache collections")
assert.GreaterOrEqual(t, sent, uint64(1), "slow cache should still report some traffic")
}
func TestDockerManagerCreation(t *testing.T) {
// Test that dockerManager can be created without panicking
dm := &dockerManager{
@@ -460,6 +526,7 @@ func TestDockerManagerCreation(t *testing.T) {
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
}
assert.NotNil(t, dm)
@@ -467,6 +534,7 @@ func TestDockerManagerCreation(t *testing.T) {
assert.NotNil(t, dm.lastCpuSystem)
assert.NotNil(t, dm.networkSentTrackers)
assert.NotNil(t, dm.networkRecvTrackers)
assert.NotNil(t, dm.lastNetworkReadTime)
}
func TestCheckDockerVersion(t *testing.T) {
@@ -651,6 +719,7 @@ func TestDockerStatsWithMockData(t *testing.T) {
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
containerStatsMap: make(map[string]*container.Stats),
}
@@ -796,23 +865,22 @@ func TestNetworkStatsCalculationWithRealData(t *testing.T) {
dm := &dockerManager{
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
}
ctr := &container.ApiInfo{IdShort: "test-container"}
cacheTimeMs := uint16(30000) // Test with 30 second cache
// Use exact timing for deterministic results
exactly1000msAgo := time.Now().Add(-1000 * time.Millisecond)
stats := &container.Stats{
PrevReadTime: exactly1000msAgo,
}
// First call sets baseline
sent1, recv1 := dm.calculateNetworkStats(ctr, apiStats1, stats, true, "test", cacheTimeMs)
// First call sets baseline (no previous read time, so rates should be 0)
sent1, recv1 := dm.calculateNetworkStats(ctr, apiStats1, "test", cacheTimeMs)
assert.Equal(t, uint64(0), sent1)
assert.Equal(t, uint64(0), recv1)
// Cycle to establish baseline for this cache time
// Record read time and cycle to establish baseline for this cache time
exactly1000msAgo := time.Now().Add(-1000 * time.Millisecond)
dm.lastNetworkReadTime[cacheTimeMs] = map[string]time.Time{
"test-container": exactly1000msAgo,
}
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
// Calculate expected results precisely
@@ -823,7 +891,7 @@ func TestNetworkStatsCalculationWithRealData(t *testing.T) {
expectedRecvRate := deltaRecv * 1000 / expectedElapsedMs // Should be exactly 1000000
// Second call with changed data
sent2, recv2 := dm.calculateNetworkStats(ctr, apiStats2, stats, true, "test", cacheTimeMs)
sent2, recv2 := dm.calculateNetworkStats(ctr, apiStats2, "test", cacheTimeMs)
// Should be exactly the expected rates (no tolerance needed)
assert.Equal(t, expectedSentRate, sent2)
@@ -831,12 +899,13 @@ func TestNetworkStatsCalculationWithRealData(t *testing.T) {
// Bad speed cap: set absurd delta over 1ms and expect 0 due to cap
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
stats.PrevReadTime = time.Now().Add(-1 * time.Millisecond)
dm.lastNetworkReadTime[cacheTimeMs]["test-container"] = time.Now().Add(-1 * time.Millisecond)
apiStats1.Networks["eth0"] = container.NetworkStats{TxBytes: 0, RxBytes: 0}
apiStats2.Networks["eth0"] = container.NetworkStats{TxBytes: 10 * 1024 * 1024 * 1024, RxBytes: 0} // 10GB delta
_, _ = dm.calculateNetworkStats(ctr, apiStats1, stats, true, "test", cacheTimeMs) // baseline
_, _ = dm.calculateNetworkStats(ctr, apiStats1, "test", cacheTimeMs) // baseline
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
sent3, recv3 := dm.calculateNetworkStats(ctr, apiStats2, stats, true, "test", cacheTimeMs)
dm.lastNetworkReadTime[cacheTimeMs]["test-container"] = time.Now().Add(-1 * time.Millisecond)
sent3, recv3 := dm.calculateNetworkStats(ctr, apiStats2, "test", cacheTimeMs)
assert.Equal(t, uint64(0), sent3)
assert.Equal(t, uint64(0), recv3)
}
@@ -857,6 +926,7 @@ func TestContainerStatsEndToEndWithRealData(t *testing.T) {
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
containerStatsMap: make(map[string]*container.Stats),
}
@@ -978,6 +1048,7 @@ func TestDockerStatsWorkflow(t *testing.T) {
lastCpuSystem: make(map[uint16]map[string]uint64),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
containerStatsMap: make(map[string]*container.Stats),
}
@@ -1242,6 +1313,7 @@ func TestUpdateContainerStatsUsesPodmanInspectHealthFallback(t *testing.T) {
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
lastNetworkReadTime: make(map[uint16]map[string]time.Time),
}
ctr := &container.ApiInfo{

View File

@@ -111,6 +111,9 @@ func (ws *WsConn) Close(msg []byte) {
// Ping sends a ping frame to keep the connection alive.
func (ws *WsConn) Ping() error {
if ws.conn == nil {
return gws.ErrConnClosed
}
ws.conn.SetDeadline(time.Now().Add(deadline))
return ws.conn.WritePing(nil)
}