This commit is contained in:
henrygd
2026-04-22 17:42:11 -04:00
parent 6472af1ba4
commit 16e0f6c4a2
16 changed files with 421 additions and 417 deletions

View File

@@ -13,6 +13,13 @@ import (
"github.com/henrygd/beszel/internal/entities/probe"
)
const (
probeRawRetention = 2 * time.Minute
probeMinuteBucketTTL = time.Hour
probeMinuteBucketLen = int(probeMinuteBucketTTL/time.Minute) + 1
probeHourWindow = time.Hour
)
// ProbeManager manages network probe tasks.
type ProbeManager struct {
mu sync.RWMutex
@@ -20,16 +27,35 @@ type ProbeManager struct {
httpClient *http.Client
}
// probeTask owns retention buffers and cancellation for a single probe config.
type probeTask struct {
config probe.Config
cancel chan struct{}
mu sync.Mutex
samples []probeSample
buckets [probeMinuteBucketLen]probeBucket
}
// probeSample stores one probe attempt and its collection time.
type probeSample struct {
latencyMs float64 // -1 means loss
timestamp time.Time
responseMs float64 // -1 means loss
timestamp time.Time
}
// probeBucket stores one minute of aggregated probe data.
type probeBucket struct {
minute int64
filled bool
stats probeAggregate
}
// probeAggregate accumulates successful response stats and total sample counts.
type probeAggregate struct {
sumMs float64
minMs float64
maxMs float64
totalCount int
successCount int
}
func newProbeManager() *ProbeManager {
@@ -39,6 +65,74 @@ func newProbeManager() *ProbeManager {
}
}
// newProbeAggregate initializes an aggregate with an unset minimum value.
func newProbeAggregate() probeAggregate {
return probeAggregate{minMs: math.MaxFloat64}
}
// addResponse folds a single probe sample into the aggregate.
func (agg *probeAggregate) addResponse(responseMs float64) {
agg.totalCount++
if responseMs < 0 {
return
}
agg.successCount++
agg.sumMs += responseMs
if responseMs < agg.minMs {
agg.minMs = responseMs
}
if responseMs > agg.maxMs {
agg.maxMs = responseMs
}
}
// addAggregate merges another aggregate into this one.
func (agg *probeAggregate) addAggregate(other probeAggregate) {
if other.totalCount == 0 {
return
}
agg.totalCount += other.totalCount
agg.successCount += other.successCount
agg.sumMs += other.sumMs
if other.successCount == 0 {
return
}
if agg.minMs == math.MaxFloat64 || other.minMs < agg.minMs {
agg.minMs = other.minMs
}
if other.maxMs > agg.maxMs {
agg.maxMs = other.maxMs
}
}
// hasData reports whether the aggregate contains any samples.
func (agg probeAggregate) hasData() bool {
return agg.totalCount > 0
}
// result converts the aggregate into the probe result slice format.
func (agg probeAggregate) result() probe.Result {
avg := agg.avgResponse()
minMs := 0.0
if agg.successCount > 0 {
minMs = math.Round(agg.minMs*100) / 100
}
return probe.Result{
avg,
minMs,
math.Round(agg.maxMs*100) / 100,
math.Round(float64(agg.totalCount-agg.successCount)/float64(agg.totalCount)*10000) / 100,
}
}
// avgResponse returns the rounded average of successful samples.
func (agg probeAggregate) avgResponse() float64 {
if agg.successCount == 0 {
return 0
}
return math.Round(agg.sumMs/float64(agg.successCount)*100) / 100
}
// SyncProbes replaces all probe tasks with the given configs.
func (pm *ProbeManager) SyncProbes(configs []probe.Config) {
pm.mu.Lock()
@@ -79,52 +173,34 @@ func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result {
defer pm.mu.RUnlock()
results := make(map[string]probe.Result, len(pm.probes))
cutoff := time.Now().Add(-time.Duration(durationMs) * time.Millisecond)
now := time.Now()
duration := time.Duration(durationMs) * time.Millisecond
for key, task := range pm.probes {
task.mu.Lock()
var sum, minMs, maxMs float64
var count, lossCount int
minMs = math.MaxFloat64
for _, s := range task.samples {
if s.timestamp.Before(cutoff) {
continue
}
count++
if s.latencyMs < 0 {
lossCount++
continue
}
sum += s.latencyMs
if s.latencyMs < minMs {
minMs = s.latencyMs
}
if s.latencyMs > maxMs {
maxMs = s.latencyMs
}
}
agg := task.aggregateLocked(duration, now)
// The live request window still controls avg/loss, but the range fields are always 1h.
hourAgg := task.aggregateLocked(probeHourWindow, now)
task.mu.Unlock()
if count == 0 {
if !agg.hasData() {
continue
}
successCount := count - lossCount
var avg float64
if successCount > 0 {
avg = math.Round(sum/float64(successCount)*100) / 100
}
if minMs == math.MaxFloat64 {
minMs = 0
}
results[key] = probe.Result{
avg, // average latency in ms
math.Round(minMs*100) / 100, // min latency in ms
math.Round(maxMs*100) / 100, // max latency in ms
math.Round(float64(lossCount)/float64(count)*10000) / 100, // packet loss percentage
result := agg.result()
hourAvg := hourAgg.avgResponse()
if hourAgg.successCount > 0 {
result = probe.Result{
result[0],
hourAvg,
math.Round(hourAgg.minMs*100) / 100,
math.Round(hourAgg.maxMs*100) / 100,
result[3],
}
} else {
result = probe.Result{result[0], hourAvg, 0, 0, result[3]}
}
results[key] = result
}
return results
@@ -161,32 +237,48 @@ func (pm *ProbeManager) runProbe(task *probeTask) {
}
}
func (pm *ProbeManager) executeProbe(task *probeTask) {
var latencyMs float64
switch task.config.Protocol {
case "icmp":
latencyMs = probeICMP(task.config.Target)
case "tcp":
latencyMs = probeTCP(task.config.Target, task.config.Port)
case "http":
latencyMs = probeHTTP(pm.httpClient, task.config.Target)
default:
slog.Warn("unknown probe protocol", "protocol", task.config.Protocol)
return
// aggregateLocked collects probe data for the requested time window.
func (task *probeTask) aggregateLocked(duration time.Duration, now time.Time) probeAggregate {
cutoff := now.Add(-duration)
// Keep short windows exact; longer windows read from minute buckets to avoid raw-sample retention.
if duration <= probeRawRetention {
return aggregateSamplesSince(task.samples, cutoff)
}
return aggregateBucketsSince(task.buckets[:], cutoff, now)
}
sample := probeSample{
latencyMs: latencyMs,
timestamp: time.Now(),
// aggregateSamplesSince aggregates raw samples newer than the cutoff.
func aggregateSamplesSince(samples []probeSample, cutoff time.Time) probeAggregate {
agg := newProbeAggregate()
for _, sample := range samples {
if sample.timestamp.Before(cutoff) {
continue
}
agg.addResponse(sample.responseMs)
}
return agg
}
task.mu.Lock()
// Trim old samples beyond 120s to bound memory
cutoff := time.Now().Add(-120 * time.Second)
// aggregateBucketsSince aggregates minute buckets overlapping the requested window.
func aggregateBucketsSince(buckets []probeBucket, cutoff, now time.Time) probeAggregate {
agg := newProbeAggregate()
startMinute := cutoff.Unix() / 60
endMinute := now.Unix() / 60
for _, bucket := range buckets {
if !bucket.filled || bucket.minute < startMinute || bucket.minute > endMinute {
continue
}
agg.addAggregate(bucket.stats)
}
return agg
}
// addSampleLocked stores a fresh sample in both raw and per-minute retention buffers.
func (task *probeTask) addSampleLocked(sample probeSample) {
cutoff := sample.timestamp.Add(-probeRawRetention)
start := 0
for i := range task.samples {
if task.samples[i].timestamp.After(cutoff) {
if !task.samples[i].timestamp.Before(cutoff) {
start = i
break
}
@@ -199,10 +291,45 @@ func (pm *ProbeManager) executeProbe(task *probeTask) {
task.samples = task.samples[:size]
}
task.samples = append(task.samples, sample)
minute := sample.timestamp.Unix() / 60
// Each slot stores one wall-clock minute, so the ring stays fixed-size at ~1h per probe.
bucket := &task.buckets[int(minute%int64(probeMinuteBucketLen))]
if !bucket.filled || bucket.minute != minute {
bucket.minute = minute
bucket.filled = true
bucket.stats = newProbeAggregate()
}
bucket.stats.addResponse(sample.responseMs)
}
// executeProbe runs the configured probe and records the sample.
func (pm *ProbeManager) executeProbe(task *probeTask) {
var responseMs float64
switch task.config.Protocol {
case "icmp":
responseMs = probeICMP(task.config.Target)
case "tcp":
responseMs = probeTCP(task.config.Target, task.config.Port)
case "http":
responseMs = probeHTTP(pm.httpClient, task.config.Target)
default:
slog.Warn("unknown probe protocol", "protocol", task.config.Protocol)
return
}
sample := probeSample{
responseMs: responseMs,
timestamp: time.Now(),
}
task.mu.Lock()
task.addSampleLocked(sample)
task.mu.Unlock()
}
// probeTCP measures pure TCP handshake latency (excluding DNS resolution).
// probeTCP measures pure TCP handshake response (excluding DNS resolution).
// Returns -1 on failure.
func probeTCP(target string, port uint16) float64 {
// Resolve DNS first, outside the timing window
@@ -222,7 +349,7 @@ func probeTCP(target string, port uint16) float64 {
return float64(time.Since(start).Microseconds()) / 1000.0
}
// probeHTTP measures HTTP GET request latency. Returns -1 on failure.
// probeHTTP measures HTTP GET request response. Returns -1 on failure.
func probeHTTP(client *http.Client, url string) float64 {
start := time.Now()
resp, err := client.Get(url)