This commit is contained in:
henrygd
2026-04-29 18:21:39 -04:00
parent aaa8eb773f
commit 526a2c6aab

View File

@@ -20,7 +20,7 @@ import (
// Probes run at user-defined intervals (e.g., every 10s). // Probes run at user-defined intervals (e.g., every 10s).
// To keep memory usage low and constant, data is stored in two layers: // To keep memory usage low and constant, data is stored in two layers:
// 1. Raw samples: The most recent individual results (kept for probeRawRetention). // 1. Raw samples: The most recent individual results (kept for probeRawRetention).
// 2. Minute buckets: A fixed-size ring buffer of 61 buckets, each representing one // 2. Minute buckets: A ring buffer of 61 buckets, each representing one
// wall-clock minute. Samples collected within the same minute are aggregated // wall-clock minute. Samples collected within the same minute are aggregated
// (sum, min, max, count) into a single bucket. // (sum, min, max, count) into a single bucket.
// //
@@ -29,8 +29,8 @@ import (
// of individual data points. // of individual data points.
const ( const (
// probeRawRetention is the duration to keep individual samples for high-precision short-term requests // probeRawRetention is the duration to keep individual samples
probeRawRetention = 70 * time.Second probeRawRetention = 61 * time.Second
// probeMinuteBucketLen is the number of 1-minute buckets to keep (1 hour + 1 for partials) // probeMinuteBucketLen is the number of 1-minute buckets to keep (1 hour + 1 for partials)
probeMinuteBucketLen int32 = 61 probeMinuteBucketLen int32 = 61
) )
@@ -381,16 +381,6 @@ func (pm *ProbeManager) runProbeNow(task *probeTask) *probe.Result {
return &result return &result
} }
// aggregateLocked collects probe data for the requested time window.
func (task *probeTask) aggregateLocked(duration time.Duration, now time.Time) probeAggregate {
cutoff := now.Add(-duration)
// Keep short windows exact; longer windows read from minute buckets to avoid raw-sample retention.
if duration <= probeRawRetention {
return aggregateSamplesSince(task.samples, cutoff)
}
return aggregateBucketsSince(task.buckets[:], cutoff, now)
}
// resultLocked returns the aggregated probe result for the requested duration along with a bool indicating whether any data was available. // resultLocked returns the aggregated probe result for the requested duration along with a bool indicating whether any data was available.
func (task *probeTask) resultLocked(duration time.Duration, now time.Time) (probe.Result, bool) { func (task *probeTask) resultLocked(duration time.Duration, now time.Time) (probe.Result, bool) {
agg := task.aggregateLocked(duration, now) agg := task.aggregateLocked(duration, now)
@@ -412,6 +402,16 @@ func (task *probeTask) resultLocked(duration time.Duration, now time.Time) (prob
return result, true return result, true
} }
// aggregateLocked collects probe data for the requested time window.
func (task *probeTask) aggregateLocked(duration time.Duration, now time.Time) probeAggregate {
cutoff := now.Add(-duration)
// Keep short windows exact; longer windows read from minute buckets to avoid raw-sample retention.
if duration <= probeRawRetention {
return aggregateSamplesSince(task.samples, cutoff)
}
return aggregateBucketsSince(task.buckets[:], cutoff, now)
}
// aggregateSamplesSince aggregates raw samples newer than the cutoff. // aggregateSamplesSince aggregates raw samples newer than the cutoff.
func aggregateSamplesSince(samples []probeSample, cutoff time.Time) probeAggregate { func aggregateSamplesSince(samples []probeSample, cutoff time.Time) probeAggregate {
agg := newProbeAggregate() agg := newProbeAggregate()