likely fix for huge net traffic on interface reset (#1267)

This commit is contained in:
henrygd
2025-10-17 11:54:52 -04:00
parent 1e32d13650
commit 2d8739052b
3 changed files with 84 additions and 5 deletions

View File

@@ -37,6 +37,16 @@ func (t *DeltaTracker[K, V]) Set(id K, value V) {
t.current[id] = value t.current[id] = value
} }
// Snapshot returns a copy of the current map.
// func (t *DeltaTracker[K, V]) Snapshot() map[K]V {
// t.RLock()
// defer t.RUnlock()
// copyMap := make(map[K]V, len(t.current))
// maps.Copy(copyMap, t.current)
// return copyMap
// }
// Deltas returns a map of all calculated deltas for the current interval. // Deltas returns a map of all calculated deltas for the current interval.
func (t *DeltaTracker[K, V]) Deltas() map[K]V { func (t *DeltaTracker[K, V]) Deltas() map[K]V {
t.RLock() t.RLock()
@@ -53,6 +63,15 @@ func (t *DeltaTracker[K, V]) Deltas() map[K]V {
return deltas return deltas
} }
// Previous returns the previously recorded value for the given key, if it exists.
func (t *DeltaTracker[K, V]) Previous(id K) (V, bool) {
t.RLock()
defer t.RUnlock()
value, ok := t.previous[id]
return value, ok
}
// Delta returns the delta for a single key. // Delta returns the delta for a single key.
// Returns 0 if the key doesn't exist or has no previous value. // Returns 0 if the key doesn't exist or has no previous value.
func (t *DeltaTracker[K, V]) Delta(id K) V { func (t *DeltaTracker[K, V]) Delta(id K) V {

View File

@@ -172,8 +172,24 @@ func (a *Agent) sumAndTrackPerNicDeltas(cacheTimeMs uint16, msElapsed uint64, ne
tracker.Set(upKey, v.BytesSent) tracker.Set(upKey, v.BytesSent)
tracker.Set(downKey, v.BytesRecv) tracker.Set(downKey, v.BytesRecv)
if msElapsed > 0 { if msElapsed > 0 {
upDelta = tracker.Delta(upKey) * 1000 / msElapsed if prevVal, ok := tracker.Previous(upKey); ok {
downDelta = tracker.Delta(downKey) * 1000 / msElapsed var deltaBytes uint64
if v.BytesSent >= prevVal {
deltaBytes = v.BytesSent - prevVal
} else {
deltaBytes = v.BytesSent
}
upDelta = deltaBytes * 1000 / msElapsed
}
if prevVal, ok := tracker.Previous(downKey); ok {
var deltaBytes uint64
if v.BytesRecv >= prevVal {
deltaBytes = v.BytesRecv - prevVal
} else {
deltaBytes = v.BytesRecv
}
downDelta = deltaBytes * 1000 / msElapsed
}
} }
systemStats.NetworkInterfaces[v.Name] = [4]uint64{upDelta, downDelta, v.BytesSent, v.BytesRecv} systemStats.NetworkInterfaces[v.Name] = [4]uint64{upDelta, downDelta, v.BytesSent, v.BytesRecv}
} }
@@ -212,6 +228,10 @@ func (a *Agent) applyNetworkTotals(
a.initializeNetIoStats() a.initializeNetIoStats()
delete(a.netIoStats, cacheTimeMs) delete(a.netIoStats, cacheTimeMs)
delete(a.netInterfaceDeltaTrackers, cacheTimeMs) delete(a.netInterfaceDeltaTrackers, cacheTimeMs)
systemStats.NetworkSent = 0
systemStats.NetworkRecv = 0
systemStats.Bandwidth[0], systemStats.Bandwidth[1] = 0, 0
return
} }
systemStats.NetworkSent = networkSentPs systemStats.NetworkSent = networkSentPs

View File

@@ -338,6 +338,43 @@ func TestSumAndTrackPerNicDeltas(t *testing.T) {
assert.Equal(t, uint64(7000), ni[1]) assert.Equal(t, uint64(7000), ni[1])
} }
func TestSumAndTrackPerNicDeltasHandlesCounterReset(t *testing.T) {
a := &Agent{
netInterfaces: map[string]struct{}{"eth0": {}},
netInterfaceDeltaTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
}
cache := uint16(77)
// First interval establishes baseline values
initial := []psutilNet.IOCountersStat{{Name: "eth0", BytesSent: 4_000, BytesRecv: 6_000}}
statsInitial := &system.Stats{}
a.ensureNetworkInterfacesMap(statsInitial)
_, _ = a.sumAndTrackPerNicDeltas(cache, 0, initial, statsInitial)
// Second interval increments counters normally so previous snapshot gets populated
increment := []psutilNet.IOCountersStat{{Name: "eth0", BytesSent: 9_000, BytesRecv: 11_000}}
statsIncrement := &system.Stats{}
a.ensureNetworkInterfacesMap(statsIncrement)
_, _ = a.sumAndTrackPerNicDeltas(cache, 1_000, increment, statsIncrement)
niIncrement, ok := statsIncrement.NetworkInterfaces["eth0"]
require.True(t, ok)
assert.Equal(t, uint64(5_000), niIncrement[0])
assert.Equal(t, uint64(5_000), niIncrement[1])
// Third interval simulates counter reset (values drop below previous totals)
reset := []psutilNet.IOCountersStat{{Name: "eth0", BytesSent: 1_200, BytesRecv: 1_500}}
statsReset := &system.Stats{}
a.ensureNetworkInterfacesMap(statsReset)
_, _ = a.sumAndTrackPerNicDeltas(cache, 1_000, reset, statsReset)
niReset, ok := statsReset.NetworkInterfaces["eth0"]
require.True(t, ok)
assert.Equal(t, uint64(1_200), niReset[0], "upload delta should match new counter value after reset")
assert.Equal(t, uint64(1_500), niReset[1], "download delta should match new counter value after reset")
}
func TestApplyNetworkTotals(t *testing.T) { func TestApplyNetworkTotals(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@@ -441,10 +478,13 @@ func TestApplyNetworkTotals(t *testing.T) {
) )
if tt.expectReset { if tt.expectReset {
// Should have reset network tracking state - delta trackers should be cleared // Should have reset network tracking state - maps cleared and stats zeroed
// Note: initializeNetIoStats resets the maps, then applyNetworkTotals sets nis back assert.NotContains(t, a.netIoStats, cacheTimeMs, "cache entry should be cleared after reset")
assert.Contains(t, a.netIoStats, cacheTimeMs, "cache entry should exist after reset")
assert.NotContains(t, a.netInterfaceDeltaTrackers, cacheTimeMs, "tracker should be cleared on reset") assert.NotContains(t, a.netInterfaceDeltaTrackers, cacheTimeMs, "tracker should be cleared on reset")
assert.Zero(t, systemStats.NetworkSent)
assert.Zero(t, systemStats.NetworkRecv)
assert.Zero(t, systemStats.Bandwidth[0])
assert.Zero(t, systemStats.Bandwidth[1])
} else { } else {
// Should have applied stats // Should have applied stats
assert.Equal(t, tt.expectedNetworkSent, systemStats.NetworkSent) assert.Equal(t, tt.expectedNetworkSent, systemStats.NetworkSent)