This commit is contained in:
henrygd
2026-04-26 15:37:00 -04:00
parent 0378023b6f
commit af49ebf2df
3 changed files with 63 additions and 13 deletions

View File

@@ -4,8 +4,11 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"math/rand"
"net" "net"
"net/http" "net/http"
// "strconv"
"sync" "sync"
"time" "time"
@@ -208,7 +211,7 @@ func (pm *ProbeManager) SyncProbes(configs []probe.Config) {
} }
task = newProbeTaskFromExisting(cfg, task) task = newProbeTaskFromExisting(cfg, task)
pm.probes[key] = task pm.probes[key] = task
go pm.runProbe(task, true) go pm.runProbe(task, false)
} }
} }
@@ -270,7 +273,7 @@ func (pm *ProbeManager) UpsertProbe(config probe.Config, runNow bool) (*probe.Re
return result, nil return result, nil
} }
if startTask { if startTask {
go pm.runProbe(task, true) go pm.runProbe(task, false)
} }
return nil, nil return nil, nil
} }
@@ -325,25 +328,49 @@ func (pm *ProbeManager) Stop() {
func (pm *ProbeManager) runProbe(task *probeTask, runNow bool) { func (pm *ProbeManager) runProbe(task *probeTask, runNow bool) {
interval := time.Duration(task.config.Interval) * time.Second interval := time.Duration(task.config.Interval) * time.Second
if interval < time.Second { if interval < time.Second {
interval = 10 * time.Second interval = 30 * time.Second
} }
ticker := time.NewTicker(interval)
defer ticker.Stop() stagger := getStagger(interval.Milliseconds())
slog.Info("starting probe task", "id", task.config.ID, "initial_delay", stagger.String(), "interval", interval.String())
if runNow { if runNow {
pm.executeProbe(task) pm.executeProbe(task)
} }
select {
case <-task.cancel:
slog.Info("removed probe", "id", task.config.ID)
return
case <-time.After(stagger):
slog.Info("initial probe execution", "id", task.config.ID)
pm.executeProbe(task)
}
ticker := time.Tick(interval)
for { for {
select { select {
case <-task.cancel: case <-task.cancel:
slog.Info("removed probe", "id", task.config.ID)
return return
case <-ticker.C: case <-ticker:
slog.Info("running probe in main loop", "id", task.config.ID, "interval", interval.String())
pm.executeProbe(task) pm.executeProbe(task)
} }
} }
} }
// getStagger returns a random duration between intervalSeconds/2 and intervalSeconds to stagger probe executions
func getStagger(intervalMilli int64) time.Duration {
intervalMilliInt := int(intervalMilli)
randomDelayInt := rand.Intn(intervalMilliInt)
if randomDelayInt < intervalMilliInt/2 {
randomDelayInt += intervalMilliInt / 2
}
return time.Duration(randomDelayInt) * time.Millisecond
}
func (pm *ProbeManager) runProbeNow(task *probeTask) *probe.Result { func (pm *ProbeManager) runProbeNow(task *probeTask) *probe.Result {
pm.executeProbe(task) pm.executeProbe(task)
task.mu.Lock() task.mu.Lock()

View File

@@ -281,6 +281,14 @@ func TestProbeManagerApplySyncDeleteRemovesTask(t *testing.T) {
} }
} }
func TestProbeManagerGetRandomDelay(t *testing.T) {
for i := 1000; i < 360_000; i += 1000 {
delay := getStagger(int64(i))
assert.GreaterOrEqual(t, delay, time.Duration(i/2)*time.Millisecond)
assert.LessOrEqual(t, delay, time.Duration(i)*time.Millisecond)
}
}
func TestProbeHTTP(t *testing.T) { func TestProbeHTTP(t *testing.T) {
t.Run("success", func(t *testing.T) { t.Run("success", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@@ -24,7 +24,7 @@ import {
AlertDialogHeader, AlertDialogHeader,
AlertDialogTitle, AlertDialogTitle,
} from "@/components/ui/alert-dialog" } from "@/components/ui/alert-dialog"
import { buttonVariants } from "@/components/ui/button" import { Button, buttonVariants } from "@/components/ui/button"
import { memo, useCallback, useMemo, useRef, useState } from "react" import { memo, useCallback, useMemo, useRef, useState } from "react"
import { getProbeColumns } from "@/components/network-probes-table/network-probes-columns" import { getProbeColumns } from "@/components/network-probes-table/network-probes-columns"
import { Card, CardHeader, CardTitle } from "@/components/ui/card" import { Card, CardHeader, CardTitle } from "@/components/ui/card"
@@ -37,6 +37,7 @@ import { $allSystemsById } from "@/lib/stores"
import { cn, getVisualStringWidth, useBrowserStorage } from "@/lib/utils" import { cn, getVisualStringWidth, useBrowserStorage } from "@/lib/utils"
import type { NetworkProbeRecord } from "@/types" import type { NetworkProbeRecord } from "@/types"
import { AddProbeDialog, EditProbeDialog } from "./probe-dialog" import { AddProbeDialog, EditProbeDialog } from "./probe-dialog"
import { XIcon } from "lucide-react"
export default function NetworkProbesTableNew({ export default function NetworkProbesTableNew({
systemId, systemId,
@@ -232,12 +233,26 @@ export default function NetworkProbesTableNew({
</div> </div>
<div className="md:ms-auto flex items-center gap-2"> <div className="md:ms-auto flex items-center gap-2">
{probes.length > 0 && ( {probes.length > 0 && (
<Input <div className="relative">
placeholder={t`Filter...`} <Input
value={globalFilter} placeholder={t`Filter...`}
onChange={(e) => setGlobalFilter(e.target.value)} value={globalFilter}
className="ms-auto px-4 w-full max-w-full md:w-50" onChange={(e) => setGlobalFilter(e.target.value)}
/> className="ms-auto px-4 w-full max-w-full md:w-50"
/>
{globalFilter && (
<Button
type="button"
variant="ghost"
size="icon"
aria-label={t`Clear`}
className="absolute right-1 top-1/2 -translate-y-1/2 h-7 w-7 text-muted-foreground"
onClick={() => setGlobalFilter("")}
>
<XIcon className="h-4 w-4" />
</Button>
)}
</div>
)} )}
{canManageProbes ? <AddProbeDialog systemId={systemId} /> : null} {canManageProbes ? <AddProbeDialog systemId={systemId} /> : null}
{canManageProbes ? ( {canManageProbes ? (