From 2d8739052b9987331bada4fa191a52f7f7167bcb Mon Sep 17 00:00:00 2001 From: henrygd Date: Fri, 17 Oct 2025 11:54:52 -0400 Subject: [PATCH] likely fix for huge net traffic on interface reset (#1267) --- agent/deltatracker/deltatracker.go | 19 ++++++++++++ agent/network.go | 24 ++++++++++++++-- agent/network_test.go | 46 ++++++++++++++++++++++++++++-- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/agent/deltatracker/deltatracker.go b/agent/deltatracker/deltatracker.go index c57ff24e..16b8ca44 100644 --- a/agent/deltatracker/deltatracker.go +++ b/agent/deltatracker/deltatracker.go @@ -37,6 +37,16 @@ func (t *DeltaTracker[K, V]) Set(id K, value V) { t.current[id] = value } +// Snapshot returns a copy of the current map. +// func (t *DeltaTracker[K, V]) Snapshot() map[K]V { +// t.RLock() +// defer t.RUnlock() + +// copyMap := make(map[K]V, len(t.current)) +// maps.Copy(copyMap, t.current) +// return copyMap +// } + // Deltas returns a map of all calculated deltas for the current interval. func (t *DeltaTracker[K, V]) Deltas() map[K]V { t.RLock() @@ -53,6 +63,15 @@ func (t *DeltaTracker[K, V]) Deltas() map[K]V { return deltas } +// Previous returns the previously recorded value for the given key, if it exists. +func (t *DeltaTracker[K, V]) Previous(id K) (V, bool) { + t.RLock() + defer t.RUnlock() + + value, ok := t.previous[id] + return value, ok +} + // Delta returns the delta for a single key. // Returns 0 if the key doesn't exist or has no previous value. func (t *DeltaTracker[K, V]) Delta(id K) V { diff --git a/agent/network.go b/agent/network.go index 90bb67bd..9b58489a 100644 --- a/agent/network.go +++ b/agent/network.go @@ -172,8 +172,24 @@ func (a *Agent) sumAndTrackPerNicDeltas(cacheTimeMs uint16, msElapsed uint64, ne tracker.Set(upKey, v.BytesSent) tracker.Set(downKey, v.BytesRecv) if msElapsed > 0 { - upDelta = tracker.Delta(upKey) * 1000 / msElapsed - downDelta = tracker.Delta(downKey) * 1000 / msElapsed + if prevVal, ok := tracker.Previous(upKey); ok { + var deltaBytes uint64 + if v.BytesSent >= prevVal { + deltaBytes = v.BytesSent - prevVal + } else { + deltaBytes = v.BytesSent + } + upDelta = deltaBytes * 1000 / msElapsed + } + if prevVal, ok := tracker.Previous(downKey); ok { + var deltaBytes uint64 + if v.BytesRecv >= prevVal { + deltaBytes = v.BytesRecv - prevVal + } else { + deltaBytes = v.BytesRecv + } + downDelta = deltaBytes * 1000 / msElapsed + } } systemStats.NetworkInterfaces[v.Name] = [4]uint64{upDelta, downDelta, v.BytesSent, v.BytesRecv} } @@ -212,6 +228,10 @@ func (a *Agent) applyNetworkTotals( a.initializeNetIoStats() delete(a.netIoStats, cacheTimeMs) delete(a.netInterfaceDeltaTrackers, cacheTimeMs) + systemStats.NetworkSent = 0 + systemStats.NetworkRecv = 0 + systemStats.Bandwidth[0], systemStats.Bandwidth[1] = 0, 0 + return } systemStats.NetworkSent = networkSentPs diff --git a/agent/network_test.go b/agent/network_test.go index bc3833fa..99b46c38 100644 --- a/agent/network_test.go +++ b/agent/network_test.go @@ -338,6 +338,43 @@ func TestSumAndTrackPerNicDeltas(t *testing.T) { assert.Equal(t, uint64(7000), ni[1]) } +func TestSumAndTrackPerNicDeltasHandlesCounterReset(t *testing.T) { + a := &Agent{ + netInterfaces: map[string]struct{}{"eth0": {}}, + netInterfaceDeltaTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]), + } + + cache := uint16(77) + + // First interval establishes baseline values + initial := []psutilNet.IOCountersStat{{Name: "eth0", BytesSent: 4_000, BytesRecv: 6_000}} + statsInitial := &system.Stats{} + a.ensureNetworkInterfacesMap(statsInitial) + _, _ = a.sumAndTrackPerNicDeltas(cache, 0, initial, statsInitial) + + // Second interval increments counters normally so previous snapshot gets populated + increment := []psutilNet.IOCountersStat{{Name: "eth0", BytesSent: 9_000, BytesRecv: 11_000}} + statsIncrement := &system.Stats{} + a.ensureNetworkInterfacesMap(statsIncrement) + _, _ = a.sumAndTrackPerNicDeltas(cache, 1_000, increment, statsIncrement) + + niIncrement, ok := statsIncrement.NetworkInterfaces["eth0"] + require.True(t, ok) + assert.Equal(t, uint64(5_000), niIncrement[0]) + assert.Equal(t, uint64(5_000), niIncrement[1]) + + // Third interval simulates counter reset (values drop below previous totals) + reset := []psutilNet.IOCountersStat{{Name: "eth0", BytesSent: 1_200, BytesRecv: 1_500}} + statsReset := &system.Stats{} + a.ensureNetworkInterfacesMap(statsReset) + _, _ = a.sumAndTrackPerNicDeltas(cache, 1_000, reset, statsReset) + + niReset, ok := statsReset.NetworkInterfaces["eth0"] + require.True(t, ok) + assert.Equal(t, uint64(1_200), niReset[0], "upload delta should match new counter value after reset") + assert.Equal(t, uint64(1_500), niReset[1], "download delta should match new counter value after reset") +} + func TestApplyNetworkTotals(t *testing.T) { tests := []struct { name string @@ -441,10 +478,13 @@ func TestApplyNetworkTotals(t *testing.T) { ) if tt.expectReset { - // Should have reset network tracking state - delta trackers should be cleared - // Note: initializeNetIoStats resets the maps, then applyNetworkTotals sets nis back - assert.Contains(t, a.netIoStats, cacheTimeMs, "cache entry should exist after reset") + // Should have reset network tracking state - maps cleared and stats zeroed + assert.NotContains(t, a.netIoStats, cacheTimeMs, "cache entry should be cleared after reset") assert.NotContains(t, a.netInterfaceDeltaTrackers, cacheTimeMs, "tracker should be cleared on reset") + assert.Zero(t, systemStats.NetworkSent) + assert.Zero(t, systemStats.NetworkRecv) + assert.Zero(t, systemStats.Bandwidth[0]) + assert.Zero(t, systemStats.Bandwidth[1]) } else { // Should have applied stats assert.Equal(t, tt.expectedNetworkSent, systemStats.NetworkSent)