mirror of
https://github.com/henrygd/beszel.git
synced 2026-04-25 22:11:49 +02:00
update
This commit is contained in:
@@ -13,11 +13,23 @@ import (
|
||||
"github.com/henrygd/beszel/internal/entities/probe"
|
||||
)
|
||||
|
||||
// Probe functionality overview:
|
||||
// Probes run at user-defined intervals (e.g., every 10s).
|
||||
// To keep memory usage low and constant, data is stored in two layers:
|
||||
// 1. Raw samples: The most recent individual results (kept for probeRawRetention).
|
||||
// 2. Minute buckets: A fixed-size ring buffer of 61 buckets, each representing one
|
||||
// wall-clock minute. Samples collected within the same minute are aggregated
|
||||
// (sum, min, max, count) into a single bucket.
|
||||
//
|
||||
// Short-term requests (<= 2m) use raw samples for perfect accuracy.
|
||||
// Long-term requests (up to 1h) use the minute buckets to avoid storing thousands
|
||||
// of individual data points.
|
||||
|
||||
const (
|
||||
probeRawRetention = 2 * time.Minute
|
||||
probeMinuteBucketTTL = time.Hour
|
||||
probeMinuteBucketLen = int(probeMinuteBucketTTL/time.Minute) + 1
|
||||
probeHourWindow = time.Hour
|
||||
// probeRawRetention is the duration to keep individual samples for high-precision short-term requests
|
||||
probeRawRetention = 80 * time.Second
|
||||
// probeMinuteBucketLen is the number of 1-minute buckets to keep (1 hour + 1 for partials)
|
||||
probeMinuteBucketLen int32 = 61
|
||||
)
|
||||
|
||||
// ProbeManager manages network probe tasks.
|
||||
@@ -44,7 +56,7 @@ type probeSample struct {
|
||||
|
||||
// probeBucket stores one minute of aggregated probe data.
|
||||
type probeBucket struct {
|
||||
minute int64
|
||||
minute int32
|
||||
filled bool
|
||||
stats probeAggregate
|
||||
}
|
||||
@@ -121,7 +133,7 @@ func (agg probeAggregate) result() probe.Result {
|
||||
avg,
|
||||
minMs,
|
||||
math.Round(agg.maxMs*100) / 100,
|
||||
math.Round(float64(agg.totalCount-agg.successCount)/float64(agg.totalCount)*10000) / 100,
|
||||
agg.lossPercentage(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +145,14 @@ func (agg probeAggregate) avgResponse() float64 {
|
||||
return math.Round(agg.sumMs/float64(agg.successCount)*100) / 100
|
||||
}
|
||||
|
||||
// lossPercentage returns the rounded failure rate for the aggregate.
|
||||
func (agg probeAggregate) lossPercentage() float64 {
|
||||
if agg.totalCount == 0 {
|
||||
return 0
|
||||
}
|
||||
return math.Round(float64(agg.totalCount-agg.successCount)/float64(agg.totalCount)*10000) / 100
|
||||
}
|
||||
|
||||
// SyncProbes replaces all probe tasks with the given configs.
|
||||
func (pm *ProbeManager) SyncProbes(configs []probe.Config) {
|
||||
pm.mu.Lock()
|
||||
@@ -180,7 +200,7 @@ func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result {
|
||||
task.mu.Lock()
|
||||
agg := task.aggregateLocked(duration, now)
|
||||
// The live request window still controls avg/loss, but the range fields are always 1h.
|
||||
hourAgg := task.aggregateLocked(probeHourWindow, now)
|
||||
hourAgg := task.aggregateLocked(time.Hour, now)
|
||||
task.mu.Unlock()
|
||||
|
||||
if !agg.hasData() {
|
||||
@@ -189,16 +209,17 @@ func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result {
|
||||
|
||||
result := agg.result()
|
||||
hourAvg := hourAgg.avgResponse()
|
||||
hourLoss := hourAgg.lossPercentage()
|
||||
if hourAgg.successCount > 0 {
|
||||
result = probe.Result{
|
||||
result[0],
|
||||
hourAvg,
|
||||
math.Round(hourAgg.minMs*100) / 100,
|
||||
math.Round(hourAgg.maxMs*100) / 100,
|
||||
result[3],
|
||||
hourLoss,
|
||||
}
|
||||
} else {
|
||||
result = probe.Result{result[0], hourAvg, 0, 0, result[3]}
|
||||
result = probe.Result{result[0], hourAvg, 0, 0, hourLoss}
|
||||
}
|
||||
results[key] = result
|
||||
}
|
||||
@@ -262,8 +283,8 @@ func aggregateSamplesSince(samples []probeSample, cutoff time.Time) probeAggrega
|
||||
// aggregateBucketsSince aggregates minute buckets overlapping the requested window.
|
||||
func aggregateBucketsSince(buckets []probeBucket, cutoff, now time.Time) probeAggregate {
|
||||
agg := newProbeAggregate()
|
||||
startMinute := cutoff.Unix() / 60
|
||||
endMinute := now.Unix() / 60
|
||||
startMinute := int32(cutoff.Unix() / 60)
|
||||
endMinute := int32(now.Unix() / 60)
|
||||
for _, bucket := range buckets {
|
||||
if !bucket.filled || bucket.minute < startMinute || bucket.minute > endMinute {
|
||||
continue
|
||||
@@ -292,9 +313,9 @@ func (task *probeTask) addSampleLocked(sample probeSample) {
|
||||
}
|
||||
task.samples = append(task.samples, sample)
|
||||
|
||||
minute := sample.timestamp.Unix() / 60
|
||||
minute := int32(sample.timestamp.Unix() / 60)
|
||||
// Each slot stores one wall-clock minute, so the ring stays fixed-size at ~1h per probe.
|
||||
bucket := &task.buckets[int(minute%int64(probeMinuteBucketLen))]
|
||||
bucket := &task.buckets[minute%probeMinuteBucketLen]
|
||||
if !bucket.filled || bucket.minute != minute {
|
||||
bucket.minute = minute
|
||||
bucket.filled = true
|
||||
|
||||
@@ -85,5 +85,5 @@ func TestProbeManagerGetResultsIncludesHourResponseRange(t *testing.T) {
|
||||
assert.Equal(t, 25.0, result[1])
|
||||
assert.Equal(t, 10.0, result[2])
|
||||
assert.Equal(t, 40.0, result[3])
|
||||
assert.Equal(t, 0.0, result[4])
|
||||
assert.Equal(t, 20.0, result[4])
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user