This commit is contained in:
henrygd
2026-04-29 17:59:30 -04:00
parent d2eb3b259a
commit 099935e78e
6 changed files with 130 additions and 112 deletions

View File

@@ -221,6 +221,5 @@ func (h *SyncNetworkProbesHandler) Handle(hctx *HandlerContext) error {
if err != nil {
return err
}
slog.Info("network probes synced", "action", req.Action)
return hctx.SendResponse(resp, hctx.RequestID)
}

View File

@@ -333,7 +333,8 @@ func (pm *ProbeManager) runProbe(task *probeTask, runNow bool) {
}
stagger := getStagger(interval.Milliseconds())
slog.Info("starting probe task", "id", task.config.ID, "initial_delay", stagger.String(), "interval", interval.String())
slog.Debug("starting probe task", "target", task.config.Target, "delay", stagger.String(), "interval", interval.String())
if runNow {
pm.executeProbe(task)
@@ -341,10 +342,9 @@ func (pm *ProbeManager) runProbe(task *probeTask, runNow bool) {
select {
case <-task.cancel:
slog.Info("removed probe", "id", task.config.ID)
// slog.Info("removed probe", "target", task.config.Target)
return
case <-time.After(stagger):
slog.Info("initial probe execution", "id", task.config.ID)
pm.executeProbe(task)
}
@@ -353,16 +353,15 @@ func (pm *ProbeManager) runProbe(task *probeTask, runNow bool) {
for {
select {
case <-task.cancel:
slog.Info("removed probe", "id", task.config.ID)
// slog.Info("removed probe", "target", task.config.Target)
return
case <-ticker:
slog.Info("running probe in main loop", "id", task.config.ID, "interval", interval.String())
pm.executeProbe(task)
}
}
}
// getStagger returns a random duration between intervalSeconds/2 and intervalSeconds to stagger probe executions
// getStagger returns a random duration between intervalSeconds/2 and intervalSeconds to stagger initial probe executions
func getStagger(intervalMilli int64) time.Duration {
intervalMilliInt := int(intervalMilli)
randomDelayInt := rand.Intn(intervalMilliInt)
@@ -472,20 +471,26 @@ func (task *probeTask) addSampleLocked(sample probeSample) {
// executeProbe runs the configured probe and records the sample.
func (pm *ProbeManager) executeProbe(task *probeTask) {
// slog.Info("running probe", "id", task.config.ID, "interval", task.config.Interval)
var responseUs int64
var err error
switch task.config.Protocol {
case "icmp":
responseUs = probeICMP(task.config.Target)
responseUs, err = probeICMP(task.config.Target)
case "tcp":
responseUs = probeTCP(task.config.Target, task.config.Port)
responseUs, err = probeTCP(task.config.Target, task.config.Port)
case "http":
responseUs = probeHTTP(pm.httpClient, task.config.Target)
responseUs, err = probeHTTP(pm.httpClient, task.config.Target)
default:
slog.Warn("unknown probe protocol", "protocol", task.config.Protocol)
return
}
if err != nil {
slog.Warn("probe failed", "err", err, "target", task.config.Target, "protocol", task.config.Protocol)
}
sample := probeSample{
responseUs: responseUs,
timestamp: time.Now(),
@@ -497,12 +502,12 @@ func (pm *ProbeManager) executeProbe(task *probeTask) {
}
// probeTCP measures pure TCP handshake response (excluding DNS resolution).
// Returns -1 on failure.
func probeTCP(target string, port uint16) int64 {
// Returns -1 and an error on failure.
func probeTCP(target string, port uint16) (int64, error) {
// Resolve DNS first, outside the timing window
ips, err := net.LookupHost(target)
if err != nil || len(ips) == 0 {
return -1
return -1, err
}
addr := net.JoinHostPort(ips[0], fmt.Sprintf("%d", port))
@@ -510,25 +515,25 @@ func probeTCP(target string, port uint16) int64 {
start := time.Now()
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
return -1
return -1, err
}
conn.Close()
return time.Since(start).Microseconds()
return time.Since(start).Microseconds(), nil
}
// probeHTTP measures HTTP GET request response in microseconds. Returns -1 on failure.
func probeHTTP(client *http.Client, url string) int64 {
// probeHTTP measures HTTP GET request response in microseconds. Returns -1 and an error on failure.
func probeHTTP(client *http.Client, url string) (int64, error) {
if client == nil {
client = http.DefaultClient
}
start := time.Now()
resp, err := client.Get(url)
if err != nil {
return -1
return -1, err
}
resp.Body.Close()
if resp.StatusCode >= 400 {
return -1
return -1, fmt.Errorf("HTTP error: %s", resp.Status)
}
return time.Since(start).Microseconds()
return time.Since(start).Microseconds(), nil
}

View File

@@ -1,6 +1,7 @@
package agent
import (
"errors"
"math"
"net"
"os"
@@ -27,7 +28,7 @@ type icmpPacketConn interface {
// icmpMethod tracks which ICMP approach to use. Once a method succeeds or
// all native methods fail, the choice is cached so subsequent probes skip
// the trial-and-error overhead.
type icmpMethod int
type icmpMethod uint8
const (
icmpUntried icmpMethod = iota // haven't tried yet
@@ -76,11 +77,11 @@ var (
// Supports both IPv4 and IPv6 targets. The ICMP method (raw socket,
// unprivileged datagram, or exec fallback) is detected once per address
// family and cached for subsequent probes.
// Returns response in microseconds, or -1 on failure.
func probeICMP(target string) int64 {
family, ip := resolveICMPTarget(target)
if family == nil {
return -1
// Returns response in microseconds, or -1 and an error on failure.
func probeICMP(target string) (int64, error) {
family, ip, err := resolveICMPTarget(target)
if err != nil {
return -1, err
}
icmpModeMu.Lock()
@@ -98,30 +99,30 @@ func probeICMP(target string) int64 {
case icmpExecFallback:
return probeICMPExec(target, family.isIPv6)
default:
return -1
return -1, errors.New("unsupported ICMP mode")
}
}
// resolveICMPTarget resolves a target hostname or IP to determine the address
// family and concrete IP address. Prefers IPv4 for dual-stack hostnames.
func resolveICMPTarget(target string) (*icmpFamily, net.IP) {
func resolveICMPTarget(target string) (*icmpFamily, net.IP, error) {
if ip := net.ParseIP(target); ip != nil {
if ip.To4() != nil {
return &icmpV4, ip.To4()
return &icmpV4, ip.To4(), nil
}
return &icmpV6, ip
return &icmpV6, ip, nil
}
ips, err := net.LookupIP(target)
if err != nil || len(ips) == 0 {
return nil, nil
return nil, nil, err
}
for _, ip := range ips {
if v4 := ip.To4(); v4 != nil {
return &icmpV4, v4
return &icmpV4, v4, nil
}
}
return &icmpV6, ips[0]
return &icmpV6, ips[0], nil
}
func detectICMPMode(family *icmpFamily, listen func(network, listenAddr string) (icmpPacketConn, error)) icmpMethod {
@@ -130,31 +131,28 @@ func detectICMPMode(family *icmpFamily, listen func(network, listenAddr string)
label = "IPv6"
}
if conn, err := listen(family.rawNetwork, family.listenAddr); err == nil {
conn, err := listen(family.rawNetwork, family.listenAddr)
slog.Debug("ICMP raw socket test", "family", label, "err", err)
if err == nil {
conn.Close()
slog.Info("ICMP probe using raw socket", "family", label)
return icmpRaw
} else {
slog.Debug("ICMP raw socket unavailable", "family", label, "err", err)
}
if conn, err := listen(family.dgramNetwork, family.listenAddr); err == nil {
conn, err = listen(family.dgramNetwork, family.listenAddr)
slog.Debug("ICMP datagram socket test", "family", label, "err", err)
if err == nil {
conn.Close()
slog.Info("ICMP probe using unprivileged datagram socket", "family", label)
return icmpDatagram
} else {
slog.Debug("ICMP datagram socket unavailable", "family", label, "err", err)
}
slog.Info("ICMP probe falling back to system ping command", "family", label)
return icmpExecFallback
}
// probeICMPNative sends an ICMP echo request using Go's x/net/icmp package.
func probeICMPNative(network string, family *icmpFamily, dst net.Addr) int64 {
func probeICMPNative(network string, family *icmpFamily, dst net.Addr) (int64, error) {
conn, err := icmp.ListenPacket(network, family.listenAddr)
if err != nil {
return -1
return -1, err
}
defer conn.Close()
@@ -170,7 +168,7 @@ func probeICMPNative(network string, family *icmpFamily, dst net.Addr) int64 {
}
msgBytes, err := msg.Marshal(nil)
if err != nil {
return -1
return -1, err
}
// Set deadline before sending
@@ -178,7 +176,7 @@ func probeICMPNative(network string, family *icmpFamily, dst net.Addr) int64 {
start := time.Now()
if _, err := conn.WriteTo(msgBytes, dst); err != nil {
return -1
return -1, err
}
// Read reply
@@ -186,23 +184,23 @@ func probeICMPNative(network string, family *icmpFamily, dst net.Addr) int64 {
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
return -1
return -1, err
}
reply, err := icmp.ParseMessage(family.proto, buf[:n])
if err != nil {
return -1
return -1, err
}
if reply.Type == family.replyType {
return time.Since(start).Microseconds()
return time.Since(start).Microseconds(), nil
}
// Ignore non-echo-reply messages (e.g. destination unreachable) and keep reading
}
}
// probeICMPExec falls back to the system ping command. Returns -1 on failure.
func probeICMPExec(target string, isIPv6 bool) int64 {
// probeICMPExec falls back to the system ping command. Returns -1 and an error on failure.
func probeICMPExec(target string, isIPv6 bool) (int64, error) {
var cmd *exec.Cmd
switch runtime.GOOS {
case "windows":
@@ -211,7 +209,7 @@ func probeICMPExec(target string, isIPv6 bool) int64 {
} else {
cmd = exec.Command("ping", "-n", "1", "-w", "3000", target)
}
default: // linux, darwin, freebsd
default:
if isIPv6 {
cmd = exec.Command("ping", "-6", "-c", "1", "-W", "3", target)
} else {
@@ -224,20 +222,20 @@ func probeICMPExec(target string, isIPv6 bool) int64 {
if err != nil {
// If ping fails but we got output, still try to parse
if len(output) == 0 {
return -1
return -1, err
}
}
matches := pingTimeRegex.FindSubmatch(output)
if len(matches) >= 2 {
if ms, err := strconv.ParseFloat(string(matches[1]), 64); err == nil {
return int64(math.Round(ms * 1000))
return int64(math.Round(ms * 1000)), nil
}
}
// Fallback: use wall clock time if ping succeeded but parsing failed
if err == nil {
return time.Since(start).Microseconds()
return time.Since(start).Microseconds(), nil
}
return -1
return -1, err
}

View File

@@ -303,7 +303,8 @@ func TestProbeHTTP(t *testing.T) {
}))
defer server.Close()
responseUs := probeHTTP(server.Client(), server.URL)
responseUs, err := probeHTTP(server.Client(), server.URL)
require.NoError(t, err)
assert.GreaterOrEqual(t, responseUs, int64(0))
})
@@ -313,7 +314,9 @@ func TestProbeHTTP(t *testing.T) {
}))
defer server.Close()
assert.Equal(t, int64(-1), probeHTTP(server.Client(), server.URL))
responseUs, err := probeHTTP(server.Client(), server.URL)
assert.Equal(t, int64(-1), responseUs)
require.Error(t, err)
})
}
@@ -333,7 +336,8 @@ func TestProbeTCP(t *testing.T) {
}()
port := uint16(listener.Addr().(*net.TCPAddr).Port)
responseUs := probeTCP("127.0.0.1", port)
responseUs, err := probeTCP("127.0.0.1", port)
require.NoError(t, err)
assert.GreaterOrEqual(t, responseUs, int64(0))
<-accepted
})
@@ -345,6 +349,8 @@ func TestProbeTCP(t *testing.T) {
port := uint16(listener.Addr().(*net.TCPAddr).Port)
require.NoError(t, listener.Close())
assert.Equal(t, int64(-1), probeTCP("127.0.0.1", port))
responseUs, err := probeTCP("127.0.0.1", port)
assert.Equal(t, int64(-1), responseUs)
require.Error(t, err)
})
}