Compare commits

..

2 Commits

Author SHA1 Message Date
henrygd
c1c1cd1bcb ui: fix temperature chart filtering 2026-04-20 20:45:16 -04:00
Yvan Wang
cd9ea51039 fix(hub): use rfcEmail validator to allow IDN/Punycode email addresses (#1935)
valibot's email() action uses a domain-label regex that disallows the
-- sequence, so Punycode ACE labels like xn--mnchen-3ya.de (the ASCII
form of münchen.de) are incorrectly rejected.

Switching to rfcEmail() applies the RFC 5321 domain-label pattern,
which allows hyphens within labels and therefore accepts both standard
and internationalized domain names.
2026-04-18 12:23:14 -04:00
44 changed files with 94 additions and 2828 deletions

View File

@@ -48,7 +48,6 @@ type Agent struct {
keys []gossh.PublicKey // SSH public keys
smartManager *SmartManager // Manages SMART data
systemdManager *systemdManager // Manages systemd services
probeManager *ProbeManager // Manages network probes
}
// NewAgent creates a new agent with the given data directory for persisting data.
@@ -122,9 +121,6 @@ func NewAgent(dataDir ...string) (agent *Agent, err error) {
// initialize handler registry
agent.handlerRegistry = NewHandlerRegistry()
// initialize probe manager
agent.probeManager = newProbeManager()
// initialize disk info
agent.initializeDiskInfo()
@@ -182,11 +178,6 @@ func (a *Agent) gatherStats(options common.DataRequestOptions) *system.CombinedD
}
}
if a.probeManager != nil {
data.Probes = a.probeManager.GetResults(cacheTimeMs)
slog.Debug("Probes", "data", data.Probes)
}
// skip updating systemd services if cache time is not the default 60sec interval
if a.systemdManager != nil && cacheTimeMs == defaultDataCacheTimeMs {
totalCount := uint16(a.systemdManager.getServiceStatsCount())

View File

@@ -141,7 +141,6 @@ func (c *ConnectionManager) Start(serverOptions ServerOptions) error {
// }
func (c *ConnectionManager) stop() error {
_ = c.agent.StopServer()
c.agent.probeManager.Stop()
c.closeWebSocket()
return health.CleanUp()
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/fxamacker/cbor/v2"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/smart"
"log/slog"
@@ -52,7 +51,6 @@ func NewHandlerRegistry() *HandlerRegistry {
registry.Register(common.GetContainerInfo, &GetContainerInfoHandler{})
registry.Register(common.GetSmartData, &GetSmartDataHandler{})
registry.Register(common.GetSystemdInfo, &GetSystemdInfoHandler{})
registry.Register(common.SyncNetworkProbes, &SyncNetworkProbesHandler{})
return registry
}
@@ -205,19 +203,3 @@ func (h *GetSystemdInfoHandler) Handle(hctx *HandlerContext) error {
return hctx.SendResponse(details, hctx.RequestID)
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// SyncNetworkProbesHandler handles probe configuration sync from hub
type SyncNetworkProbesHandler struct{}
func (h *SyncNetworkProbesHandler) Handle(hctx *HandlerContext) error {
var configs []probe.Config
if err := cbor.Unmarshal(hctx.Request.Data, &configs); err != nil {
return err
}
hctx.Agent.probeManager.SyncProbes(configs)
slog.Info("network probes synced", "count", len(configs))
return hctx.SendResponse("ok", hctx.RequestID)
}

View File

@@ -1,237 +0,0 @@
package agent
import (
"fmt"
"math"
"net"
"net/http"
"sync"
"time"
"log/slog"
"github.com/henrygd/beszel/internal/entities/probe"
)
// ProbeManager manages network probe tasks.
type ProbeManager struct {
mu sync.RWMutex
probes map[string]*probeTask // key = probe.Config.Key()
httpClient *http.Client
}
type probeTask struct {
config probe.Config
cancel chan struct{}
mu sync.Mutex
samples []probeSample
}
type probeSample struct {
latencyMs float64 // -1 means loss
timestamp time.Time
}
func newProbeManager() *ProbeManager {
return &ProbeManager{
probes: make(map[string]*probeTask),
httpClient: &http.Client{Timeout: 10 * time.Second},
}
}
// SyncProbes replaces all probe tasks with the given configs.
func (pm *ProbeManager) SyncProbes(configs []probe.Config) {
pm.mu.Lock()
defer pm.mu.Unlock()
// Build set of new keys
newKeys := make(map[string]probe.Config, len(configs))
for _, cfg := range configs {
newKeys[cfg.Key()] = cfg
}
// Stop removed probes
for key, task := range pm.probes {
if _, exists := newKeys[key]; !exists {
close(task.cancel)
delete(pm.probes, key)
}
}
// Start new probes (skip existing ones with same key)
for key, cfg := range newKeys {
if _, exists := pm.probes[key]; exists {
continue
}
task := &probeTask{
config: cfg,
cancel: make(chan struct{}),
samples: make([]probeSample, 0, 64),
}
pm.probes[key] = task
go pm.runProbe(task)
}
}
// GetResults returns aggregated results for all probes over the last supplied duration in ms.
func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result {
pm.mu.RLock()
defer pm.mu.RUnlock()
results := make(map[string]probe.Result, len(pm.probes))
cutoff := time.Now().Add(-time.Duration(durationMs) * time.Millisecond)
for key, task := range pm.probes {
task.mu.Lock()
var sum, minMs, maxMs float64
var count, lossCount int
minMs = math.MaxFloat64
for _, s := range task.samples {
if s.timestamp.Before(cutoff) {
continue
}
count++
if s.latencyMs < 0 {
lossCount++
continue
}
sum += s.latencyMs
if s.latencyMs < minMs {
minMs = s.latencyMs
}
if s.latencyMs > maxMs {
maxMs = s.latencyMs
}
}
task.mu.Unlock()
if count == 0 {
continue
}
successCount := count - lossCount
var avg float64
if successCount > 0 {
avg = math.Round(sum/float64(successCount)*100) / 100
}
if minMs == math.MaxFloat64 {
minMs = 0
}
results[key] = probe.Result{
avg, // average latency in ms
math.Round(minMs*100) / 100, // min latency in ms
math.Round(maxMs*100) / 100, // max latency in ms
math.Round(float64(lossCount)/float64(count)*10000) / 100, // packet loss percentage
}
}
return results
}
// Stop stops all probe tasks.
func (pm *ProbeManager) Stop() {
pm.mu.Lock()
defer pm.mu.Unlock()
for key, task := range pm.probes {
close(task.cancel)
delete(pm.probes, key)
}
}
// runProbe executes a single probe task in a loop.
func (pm *ProbeManager) runProbe(task *probeTask) {
interval := time.Duration(task.config.Interval) * time.Second
if interval < time.Second {
interval = 10 * time.Second
}
ticker := time.Tick(interval)
// Run immediately on start
pm.executeProbe(task)
for {
select {
case <-task.cancel:
return
case <-ticker:
pm.executeProbe(task)
}
}
}
func (pm *ProbeManager) executeProbe(task *probeTask) {
var latencyMs float64
switch task.config.Protocol {
case "icmp":
latencyMs = probeICMP(task.config.Target)
case "tcp":
latencyMs = probeTCP(task.config.Target, task.config.Port)
case "http":
latencyMs = probeHTTP(pm.httpClient, task.config.Target)
default:
slog.Warn("unknown probe protocol", "protocol", task.config.Protocol)
return
}
sample := probeSample{
latencyMs: latencyMs,
timestamp: time.Now(),
}
task.mu.Lock()
// Trim old samples beyond 120s to bound memory
cutoff := time.Now().Add(-120 * time.Second)
start := 0
for i := range task.samples {
if task.samples[i].timestamp.After(cutoff) {
start = i
break
}
if i == len(task.samples)-1 {
start = len(task.samples)
}
}
if start > 0 {
size := copy(task.samples, task.samples[start:])
task.samples = task.samples[:size]
}
task.samples = append(task.samples, sample)
task.mu.Unlock()
}
// probeTCP measures pure TCP handshake latency (excluding DNS resolution).
// Returns -1 on failure.
func probeTCP(target string, port uint16) float64 {
// Resolve DNS first, outside the timing window
ips, err := net.LookupHost(target)
if err != nil || len(ips) == 0 {
return -1
}
addr := net.JoinHostPort(ips[0], fmt.Sprintf("%d", port))
// Measure only the TCP handshake
start := time.Now()
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
return -1
}
conn.Close()
return float64(time.Since(start).Microseconds()) / 1000.0
}
// probeHTTP measures HTTP GET request latency. Returns -1 on failure.
func probeHTTP(client *http.Client, url string) float64 {
start := time.Now()
resp, err := client.Get(url)
if err != nil {
return -1
}
resp.Body.Close()
if resp.StatusCode >= 400 {
return -1
}
return float64(time.Since(start).Microseconds()) / 1000.0
}

View File

@@ -1,242 +0,0 @@
package agent
import (
"net"
"os"
"os/exec"
"regexp"
"runtime"
"strconv"
"sync"
"time"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"log/slog"
)
var pingTimeRegex = regexp.MustCompile(`time[=<]([\d.]+)\s*ms`)
type icmpPacketConn interface {
Close() error
}
// icmpMethod tracks which ICMP approach to use. Once a method succeeds or
// all native methods fail, the choice is cached so subsequent probes skip
// the trial-and-error overhead.
type icmpMethod int
const (
icmpUntried icmpMethod = iota // haven't tried yet
icmpRaw // privileged raw socket
icmpDatagram // unprivileged datagram socket
icmpExecFallback // shell out to system ping command
)
// icmpFamily holds the network parameters and cached detection result for one address family.
type icmpFamily struct {
rawNetwork string // e.g. "ip4:icmp" or "ip6:ipv6-icmp"
dgramNetwork string // e.g. "udp4" or "udp6"
listenAddr string // "0.0.0.0" or "::"
echoType icmp.Type // outgoing echo request type
replyType icmp.Type // expected echo reply type
proto int // IANA protocol number for parsing replies
isIPv6 bool
mode icmpMethod // cached detection result (guarded by icmpModeMu)
}
var (
icmpV4 = icmpFamily{
rawNetwork: "ip4:icmp",
dgramNetwork: "udp4",
listenAddr: "0.0.0.0",
echoType: ipv4.ICMPTypeEcho,
replyType: ipv4.ICMPTypeEchoReply,
proto: 1,
}
icmpV6 = icmpFamily{
rawNetwork: "ip6:ipv6-icmp",
dgramNetwork: "udp6",
listenAddr: "::",
echoType: ipv6.ICMPTypeEchoRequest,
replyType: ipv6.ICMPTypeEchoReply,
proto: 58,
isIPv6: true,
}
icmpModeMu sync.Mutex
icmpListen = func(network, listenAddr string) (icmpPacketConn, error) {
return icmp.ListenPacket(network, listenAddr)
}
)
// probeICMP sends an ICMP echo request and measures round-trip latency.
// Supports both IPv4 and IPv6 targets. The ICMP method (raw socket,
// unprivileged datagram, or exec fallback) is detected once per address
// family and cached for subsequent probes.
// Returns latency in milliseconds, or -1 on failure.
func probeICMP(target string) float64 {
family, ip := resolveICMPTarget(target)
if family == nil {
return -1
}
icmpModeMu.Lock()
if family.mode == icmpUntried {
family.mode = detectICMPMode(family, icmpListen)
}
mode := family.mode
icmpModeMu.Unlock()
switch mode {
case icmpRaw:
return probeICMPNative(family.rawNetwork, family, &net.IPAddr{IP: ip})
case icmpDatagram:
return probeICMPNative(family.dgramNetwork, family, &net.UDPAddr{IP: ip})
case icmpExecFallback:
return probeICMPExec(target, family.isIPv6)
default:
return -1
}
}
// resolveICMPTarget resolves a target hostname or IP to determine the address
// family and concrete IP address. Prefers IPv4 for dual-stack hostnames.
func resolveICMPTarget(target string) (*icmpFamily, net.IP) {
if ip := net.ParseIP(target); ip != nil {
if ip.To4() != nil {
return &icmpV4, ip.To4()
}
return &icmpV6, ip
}
ips, err := net.LookupIP(target)
if err != nil || len(ips) == 0 {
return nil, nil
}
for _, ip := range ips {
if v4 := ip.To4(); v4 != nil {
return &icmpV4, v4
}
}
return &icmpV6, ips[0]
}
func detectICMPMode(family *icmpFamily, listen func(network, listenAddr string) (icmpPacketConn, error)) icmpMethod {
label := "IPv4"
if family.isIPv6 {
label = "IPv6"
}
if conn, err := listen(family.rawNetwork, family.listenAddr); err == nil {
conn.Close()
slog.Info("ICMP probe using raw socket", "family", label)
return icmpRaw
} else {
slog.Debug("ICMP raw socket unavailable", "family", label, "err", err)
}
if conn, err := listen(family.dgramNetwork, family.listenAddr); err == nil {
conn.Close()
slog.Info("ICMP probe using unprivileged datagram socket", "family", label)
return icmpDatagram
} else {
slog.Debug("ICMP datagram socket unavailable", "family", label, "err", err)
}
slog.Info("ICMP probe falling back to system ping command", "family", label)
return icmpExecFallback
}
// probeICMPNative sends an ICMP echo request using Go's x/net/icmp package.
func probeICMPNative(network string, family *icmpFamily, dst net.Addr) float64 {
conn, err := icmp.ListenPacket(network, family.listenAddr)
if err != nil {
return -1
}
defer conn.Close()
// Build ICMP echo request
msg := &icmp.Message{
Type: family.echoType,
Code: 0,
Body: &icmp.Echo{
ID: os.Getpid() & 0xffff,
Seq: 1,
Data: []byte("beszel-probe"),
},
}
msgBytes, err := msg.Marshal(nil)
if err != nil {
return -1
}
// Set deadline before sending
conn.SetDeadline(time.Now().Add(3 * time.Second))
start := time.Now()
if _, err := conn.WriteTo(msgBytes, dst); err != nil {
return -1
}
// Read reply
buf := make([]byte, 1500)
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
return -1
}
reply, err := icmp.ParseMessage(family.proto, buf[:n])
if err != nil {
return -1
}
if reply.Type == family.replyType {
return float64(time.Since(start).Microseconds()) / 1000.0
}
// Ignore non-echo-reply messages (e.g. destination unreachable) and keep reading
}
}
// probeICMPExec falls back to the system ping command. Returns -1 on failure.
func probeICMPExec(target string, isIPv6 bool) float64 {
var cmd *exec.Cmd
switch runtime.GOOS {
case "windows":
if isIPv6 {
cmd = exec.Command("ping", "-6", "-n", "1", "-w", "3000", target)
} else {
cmd = exec.Command("ping", "-n", "1", "-w", "3000", target)
}
default: // linux, darwin, freebsd
if isIPv6 {
cmd = exec.Command("ping", "-6", "-c", "1", "-W", "3", target)
} else {
cmd = exec.Command("ping", "-c", "1", "-W", "3", target)
}
}
start := time.Now()
output, err := cmd.Output()
if err != nil {
// If ping fails but we got output, still try to parse
if len(output) == 0 {
return -1
}
}
matches := pingTimeRegex.FindSubmatch(output)
if len(matches) >= 2 {
if ms, err := strconv.ParseFloat(string(matches[1]), 64); err == nil {
return ms
}
}
// Fallback: use wall clock time if ping succeeded but parsing failed
if err == nil {
return float64(time.Since(start).Microseconds()) / 1000.0
}
return -1
}

View File

@@ -1,118 +0,0 @@
//go:build testing
package agent
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testICMPPacketConn struct{}
func (testICMPPacketConn) Close() error { return nil }
func TestDetectICMPMode(t *testing.T) {
tests := []struct {
name string
family *icmpFamily
rawErr error
udpErr error
want icmpMethod
wantNetworks []string
}{
{
name: "IPv4 prefers raw socket when available",
family: &icmpV4,
want: icmpRaw,
wantNetworks: []string{"ip4:icmp"},
},
{
name: "IPv4 uses datagram when raw unavailable",
family: &icmpV4,
rawErr: errors.New("operation not permitted"),
want: icmpDatagram,
wantNetworks: []string{"ip4:icmp", "udp4"},
},
{
name: "IPv4 falls back to exec when both unavailable",
family: &icmpV4,
rawErr: errors.New("operation not permitted"),
udpErr: errors.New("protocol not supported"),
want: icmpExecFallback,
wantNetworks: []string{"ip4:icmp", "udp4"},
},
{
name: "IPv6 prefers raw socket when available",
family: &icmpV6,
want: icmpRaw,
wantNetworks: []string{"ip6:ipv6-icmp"},
},
{
name: "IPv6 uses datagram when raw unavailable",
family: &icmpV6,
rawErr: errors.New("operation not permitted"),
want: icmpDatagram,
wantNetworks: []string{"ip6:ipv6-icmp", "udp6"},
},
{
name: "IPv6 falls back to exec when both unavailable",
family: &icmpV6,
rawErr: errors.New("operation not permitted"),
udpErr: errors.New("protocol not supported"),
want: icmpExecFallback,
wantNetworks: []string{"ip6:ipv6-icmp", "udp6"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
calls := make([]string, 0, 2)
listen := func(network, listenAddr string) (icmpPacketConn, error) {
require.Equal(t, tt.family.listenAddr, listenAddr)
calls = append(calls, network)
switch network {
case tt.family.rawNetwork:
if tt.rawErr != nil {
return nil, tt.rawErr
}
case tt.family.dgramNetwork:
if tt.udpErr != nil {
return nil, tt.udpErr
}
default:
t.Fatalf("unexpected network %q", network)
}
return testICMPPacketConn{}, nil
}
assert.Equal(t, tt.want, detectICMPMode(tt.family, listen))
assert.Equal(t, tt.wantNetworks, calls)
})
}
}
func TestResolveICMPTarget(t *testing.T) {
t.Run("IPv4 literal", func(t *testing.T) {
family, ip := resolveICMPTarget("127.0.0.1")
require.NotNil(t, family)
assert.False(t, family.isIPv6)
assert.Equal(t, "127.0.0.1", ip.String())
})
t.Run("IPv6 literal", func(t *testing.T) {
family, ip := resolveICMPTarget("::1")
require.NotNil(t, family)
assert.True(t, family.isIPv6)
assert.Equal(t, "::1", ip.String())
})
t.Run("IPv4-mapped IPv6 resolves as IPv4", func(t *testing.T) {
family, ip := resolveICMPTarget("::ffff:127.0.0.1")
require.NotNil(t, family)
assert.False(t, family.isIPv6)
assert.Equal(t, "127.0.0.1", ip.String())
})
}

2
go.mod
View File

@@ -20,7 +20,6 @@ require (
github.com/stretchr/testify v1.11.1
golang.org/x/crypto v0.49.0
golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90
golang.org/x/net v0.52.0
golang.org/x/sys v0.42.0
gopkg.in/yaml.v3 v3.0.1
howett.net/plist v1.0.1
@@ -57,6 +56,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/image v0.38.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/term v0.41.0 // indirect

View File

@@ -22,8 +22,6 @@ const (
GetSmartData
// Request detailed systemd service info from agent
GetSystemdInfo
// Sync network probe configuration to agent
SyncNetworkProbes
// Add new actions here...
)

View File

@@ -1,32 +0,0 @@
package probe
import "strconv"
// Config defines a network probe task sent from hub to agent.
type Config struct {
Target string `cbor:"0,keyasint" json:"target"`
Protocol string `cbor:"1,keyasint" json:"protocol"` // "icmp", "tcp", or "http"
Port uint16 `cbor:"2,keyasint,omitempty" json:"port,omitempty"`
Interval uint16 `cbor:"3,keyasint" json:"interval"` // seconds
}
// Result holds aggregated probe results for a single target.
//
// 0: avg latency in ms
//
// 1: min latency in ms
//
// 2: max latency in ms
//
// 3: packet loss percentage (0-100)
type Result []float64
// Key returns the map key used for this probe config (e.g. "icmp:1.1.1.1", "tcp:host:443", "http:https://example.com").
func (c Config) Key() string {
switch c.Protocol {
case "tcp":
return c.Protocol + ":" + c.Target + ":" + strconv.FormatUint(uint64(c.Port), 10)
default:
return c.Protocol + ":" + c.Target
}
}

View File

@@ -7,7 +7,6 @@ import (
"time"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/systemd"
)
@@ -180,5 +179,4 @@ type CombinedData struct {
Containers []*container.Stats `json:"container" cbor:"2,keyasint"`
SystemdServices []*systemd.Service `json:"systemd,omitempty" cbor:"3,keyasint,omitempty"`
Details *Details `cbor:"4,keyasint,omitempty"`
Probes map[string]probe.Result `cbor:"5,keyasint,omitempty"`
}

View File

@@ -78,7 +78,7 @@ func setCollectionAuthSettings(app core.App) error {
return err
}
if err := applyCollectionRules(app, []string{"containers", "container_stats", "system_stats", "systemd_services", "network_probe_stats"}, collectionRules{
if err := applyCollectionRules(app, []string{"containers", "container_stats", "system_stats", "systemd_services"}, collectionRules{
list: &systemScopedReadRule,
}); err != nil {
return err
@@ -92,7 +92,7 @@ func setCollectionAuthSettings(app core.App) error {
return err
}
if err := applyCollectionRules(app, []string{"fingerprints", "network_probes"}, collectionRules{
if err := applyCollectionRules(app, []string{"fingerprints"}, collectionRules{
list: &systemScopedReadRule,
view: &systemScopedReadRule,
create: &systemScopedWriteRule,

View File

@@ -81,7 +81,6 @@ func (h *Hub) StartHub() error {
}
// register middlewares
h.registerMiddlewares(e)
// bind events that aren't set up in different
// register api routes
if err := h.registerApiRoutes(e); err != nil {
return err
@@ -110,8 +109,6 @@ func (h *Hub) StartHub() error {
h.App.OnRecordCreate("users").BindFunc(h.um.InitializeUserRole)
h.App.OnRecordCreate("user_settings").BindFunc(h.um.InitializeUserSettings)
bindNetworkProbesEvents(h)
pb, ok := h.App.(*pocketbase.PocketBase)
if !ok {
return errors.New("not a pocketbase app")

View File

@@ -1,54 +0,0 @@
package hub
import (
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/hub/systems"
"github.com/pocketbase/pocketbase/core"
)
func bindNetworkProbesEvents(h *Hub) {
// on create, make sure the id is set to a stable hash
h.OnRecordCreate("network_probes").BindFunc(func(e *core.RecordEvent) error {
systemID := e.Record.GetString("system")
config := &probe.Config{
Target: e.Record.GetString("target"),
Protocol: e.Record.GetString("protocol"),
Port: uint16(e.Record.GetInt("port")),
Interval: uint16(e.Record.GetInt("interval")),
}
key := config.Key()
id := systems.MakeStableHashId(systemID, key)
e.Record.Set("id", id)
return e.Next()
})
// sync probe to agent on creation
h.OnRecordAfterCreateSuccess("network_probes").BindFunc(func(e *core.RecordEvent) error {
systemID := e.Record.GetString("system")
h.syncProbesToAgent(systemID)
return e.Next()
})
// sync probe to agent on delete
h.OnRecordAfterDeleteSuccess("network_probes").BindFunc(func(e *core.RecordEvent) error {
systemID := e.Record.GetString("system")
h.syncProbesToAgent(systemID)
return e.Next()
})
// TODO: if enabled changes, sync to agent
}
// syncProbesToAgent fetches enabled probes for a system and sends them to the agent.
func (h *Hub) syncProbesToAgent(systemID string) {
system, err := h.sm.GetSystem(systemID)
if err != nil {
return
}
configs := h.sm.GetProbeConfigsForSystem(systemID)
go func() {
if err := system.SyncNetworkProbes(configs); err != nil {
h.Logger().Warn("failed to sync probes to agent", "system", systemID, "err", err)
}
}()
}

View File

@@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"hash/fnv"
"log/slog"
"math/rand"
"net"
"strings"
@@ -19,7 +18,6 @@ import (
"github.com/henrygd/beszel/internal/hub/ws"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/smart"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/entities/systemd"
@@ -31,7 +29,6 @@ import (
"github.com/lxzan/gws"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/types"
"golang.org/x/crypto/ssh"
)
@@ -241,12 +238,6 @@ func (sys *System) createRecords(data *system.CombinedData) (*core.Record, error
}
}
if data.Probes != nil {
if err := updateNetworkProbesRecords(txApp, data.Probes, sys.Id); err != nil {
return err
}
}
// update system record (do this last because it triggers alerts and we need above records to be inserted first)
systemRecord.Set("status", up)
systemRecord.Set("info", data.Info)
@@ -298,7 +289,7 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s
for i, service := range data {
suffix := fmt.Sprintf("%d", i)
valueStrings = append(valueStrings, fmt.Sprintf("({:id%[1]s}, {:system}, {:name%[1]s}, {:state%[1]s}, {:sub%[1]s}, {:cpu%[1]s}, {:cpuPeak%[1]s}, {:memory%[1]s}, {:memPeak%[1]s}, {:updated})", suffix))
params["id"+suffix] = MakeStableHashId(systemId, service.Name)
params["id"+suffix] = makeStableHashId(systemId, service.Name)
params["name"+suffix] = service.Name
params["state"+suffix] = service.State
params["sub"+suffix] = service.Sub
@@ -315,84 +306,6 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s
return err
}
func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, systemId string) error {
if len(data) == 0 {
return nil
}
var err error
collectionName := "network_probes"
// If realtime updates are active, we save via PocketBase records to trigger realtime events.
// Otherwise we can do a more efficient direct update via SQL
realtimeActive := utils.RealtimeActiveForCollection(app, collectionName, func(filterQuery string) bool {
slog.Info("Checking realtime subscription filter for network probes", "filterQuery", filterQuery)
return !strings.Contains(filterQuery, "system") || strings.Contains(filterQuery, systemId)
})
var db dbx.Builder
var nowString string
var updateQuery *dbx.Query
if !realtimeActive {
db = app.DB()
nowString = time.Now().UTC().Format(types.DefaultDateLayout)
sql := fmt.Sprintf("UPDATE %s SET latency={:latency}, loss={:loss}, updated={:updated} WHERE id={:id}", collectionName)
updateQuery = db.NewQuery(sql)
}
// insert network probe stats records
switch realtimeActive {
case true:
collection, _ := app.FindCachedCollectionByNameOrId("network_probe_stats")
record := core.NewRecord(collection)
record.Set("system", systemId)
record.Set("stats", data)
record.Set("type", "1m")
err = app.SaveNoValidate(record)
default:
if dataJson, e := json.Marshal(data); e == nil {
sql := "INSERT INTO network_probe_stats (system, stats, type, created) VALUES ({:system}, {:stats}, {:type}, {:created})"
insertQuery := db.NewQuery(sql)
_, err = insertQuery.Bind(dbx.Params{
"system": systemId,
"stats": dataJson,
"type": "1m",
"created": nowString,
}).Execute()
}
}
if err != nil {
app.Logger().Error("Failed to update probe stats", "system", systemId, "err", err)
}
// update network_probes records
for key := range data {
probe := data[key]
id := MakeStableHashId(systemId, key)
switch realtimeActive {
case true:
var record *core.Record
record, err = app.FindRecordById(collectionName, id)
if err == nil {
record.Set("latency", probe[0])
record.Set("loss", probe[3])
err = app.SaveNoValidate(record)
}
default:
_, err = updateQuery.Bind(dbx.Params{
"id": id,
"latency": probe[0],
"loss": probe[3],
"updated": nowString,
}).Execute()
}
if err != nil {
app.Logger().Warn("Failed to update probe", "system", systemId, "probe", key, "err", err)
}
}
return nil
}
// createContainerRecords creates container records
func createContainerRecords(app core.App, data []*container.Stats, systemId string) error {
if len(data) == 0 {
@@ -627,7 +540,7 @@ func (sys *System) FetchSmartDataFromAgent() (map[string]smart.SmartData, error)
return result, err
}
func MakeStableHashId(strings ...string) string {
func makeStableHashId(strings ...string) string {
hash := fnv.New32a()
for _, str := range strings {
hash.Write([]byte(str))

View File

@@ -7,7 +7,6 @@ import (
"github.com/henrygd/beszel/internal/hub/ws"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/hub/expirymap"
@@ -16,7 +15,6 @@ import (
"github.com/henrygd/beszel"
"github.com/blang/semver"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/store"
"golang.org/x/crypto/ssh"
@@ -319,17 +317,6 @@ func (sm *SystemManager) AddWebSocketSystem(systemId string, agentVersion semver
if err := sm.AddRecord(systemRecord, system); err != nil {
return err
}
// Sync network probes to the newly connected agent
go func() {
configs := sm.GetProbeConfigsForSystem(systemId)
if len(configs) > 0 {
if err := system.SyncNetworkProbes(configs); err != nil {
sm.hub.Logger().Warn("failed to sync probes on connect", "system", systemId, "err", err)
}
}
}()
return nil
}
@@ -342,31 +329,6 @@ func (sm *SystemManager) resetFailedSmartFetchState(systemID string) {
}
}
// GetProbeConfigsForSystem returns all enabled probe configs for a system.
func (sm *SystemManager) GetProbeConfigsForSystem(systemID string) []probe.Config {
records, err := sm.hub.FindRecordsByFilter(
"network_probes",
"system = {:system} && enabled = true",
"",
0, 0,
dbx.Params{"system": systemID},
)
if err != nil || len(records) == 0 {
return nil
}
configs := make([]probe.Config, 0, len(records))
for _, r := range records {
configs = append(configs, probe.Config{
Target: r.GetString("target"),
Protocol: r.GetString("protocol"),
Port: uint16(r.GetInt("port")),
Interval: uint16(r.GetInt("interval")),
})
}
return configs
}
// createSSHClientConfig initializes the SSH client configuration for connecting to an agent's server
func (sm *SystemManager) createSSHClientConfig() error {
privateKey, err := sm.hub.GetSSHKey("")

View File

@@ -1,57 +0,0 @@
package systems
import (
"context"
"time"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/probe"
)
// SyncNetworkProbes sends probe configurations to the agent.
func (sys *System) SyncNetworkProbes(configs []probe.Config) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var result string
return sys.request(ctx, common.SyncNetworkProbes, configs, &result)
}
// FetchNetworkProbeResults fetches probe results from the agent.
// func (sys *System) FetchNetworkProbeResults() (map[string]probe.Result, error) {
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// defer cancel()
// var results map[string]probe.Result
// err := sys.request(ctx, common.GetNetworkProbeResults, nil, &results)
// return results, err
// }
// hasEnabledProbes returns true if this system has any enabled network probes.
// func (sys *System) hasEnabledProbes() bool {
// count, err := sys.manager.hub.CountRecords("network_probes",
// dbx.NewExp("system = {:system} AND enabled = true", dbx.Params{"system": sys.Id}))
// return err == nil && count > 0
// }
// fetchAndSaveProbeResults fetches probe results and saves them to the database.
// func (sys *System) fetchAndSaveProbeResults() {
// hub := sys.manager.hub
// results, err := sys.FetchNetworkProbeResults()
// if err != nil || len(results) == 0 {
// return
// }
// collection, err := hub.FindCachedCollectionByNameOrId("network_probe_stats")
// if err != nil {
// return
// }
// record := core.NewRecord(collection)
// record.Set("system", sys.Id)
// record.Set("stats", results)
// record.Set("type", "1m")
// if err := hub.SaveNoValidate(record); err != nil {
// hub.Logger().Warn("failed to save probe stats", "system", sys.Id, "err", err)
// }
// }

View File

@@ -84,7 +84,7 @@ func (sys *System) saveSmartDevices(smartData map[string]smart.SmartData) error
func (sys *System) upsertSmartDeviceRecord(collection *core.Collection, deviceKey string, device smart.SmartData) error {
hub := sys.manager.hub
recordID := MakeStableHashId(sys.Id, deviceKey)
recordID := makeStableHashId(sys.Id, deviceKey)
record, err := hub.FindRecordById(collection, recordID)
if err != nil {

View File

@@ -14,9 +14,9 @@ func TestGetSystemdServiceId(t *testing.T) {
serviceName := "nginx.service"
// Call multiple times and ensure same result
id1 := MakeStableHashId(systemId, serviceName)
id2 := MakeStableHashId(systemId, serviceName)
id3 := MakeStableHashId(systemId, serviceName)
id1 := makeStableHashId(systemId, serviceName)
id2 := makeStableHashId(systemId, serviceName)
id3 := makeStableHashId(systemId, serviceName)
assert.Equal(t, id1, id2)
assert.Equal(t, id2, id3)
@@ -29,10 +29,10 @@ func TestGetSystemdServiceId(t *testing.T) {
serviceName1 := "nginx.service"
serviceName2 := "apache.service"
id1 := MakeStableHashId(systemId1, serviceName1)
id2 := MakeStableHashId(systemId2, serviceName1)
id3 := MakeStableHashId(systemId1, serviceName2)
id4 := MakeStableHashId(systemId2, serviceName2)
id1 := makeStableHashId(systemId1, serviceName1)
id2 := makeStableHashId(systemId2, serviceName1)
id3 := makeStableHashId(systemId1, serviceName2)
id4 := makeStableHashId(systemId2, serviceName2)
// All IDs should be different
assert.NotEqual(t, id1, id2)
@@ -56,14 +56,14 @@ func TestGetSystemdServiceId(t *testing.T) {
}
for _, tc := range testCases {
id := MakeStableHashId(tc.systemId, tc.serviceName)
id := makeStableHashId(tc.systemId, tc.serviceName)
// FNV-32 produces 8 hex characters
assert.Len(t, id, 8, "ID should be 8 characters for systemId='%s', serviceName='%s'", tc.systemId, tc.serviceName)
}
})
t.Run("hexadecimal output", func(t *testing.T) {
id := MakeStableHashId("test-system", "test-service")
id := makeStableHashId("test-system", "test-service")
assert.NotEmpty(t, id)
// Should only contain hexadecimal characters

View File

@@ -1,11 +1,7 @@
// Package utils provides utility functions for the hub.
package utils
import (
"os"
"github.com/pocketbase/pocketbase/core"
)
import "os"
// GetEnv retrieves an environment variable with a "BESZEL_HUB_" prefix, or falls back to the unprefixed key.
func GetEnv(key string) (value string, exists bool) {
@@ -14,26 +10,3 @@ func GetEnv(key string) (value string, exists bool) {
}
return os.LookupEnv(key)
}
// realtimeActiveForCollection checks if there are active WebSocket subscriptions for the given collection.
func RealtimeActiveForCollection(app core.App, collectionName string, validateFn func(filterQuery string) bool) bool {
broker := app.SubscriptionsBroker()
if broker.TotalClients() == 0 {
return false
}
for _, client := range broker.Clients() {
subs := client.Subscriptions(collectionName)
if len(subs) > 0 {
if validateFn == nil {
return true
}
for k := range subs {
filter := subs[k].Query["filter"]
if validateFn(filter) {
return true
}
}
}
}
return false
}

View File

@@ -1699,223 +1699,6 @@ func init() {
"type": "base",
"updateRule": null,
"viewRule": null
},
{
"id": "np_probes_001",
"listRule": null,
"viewRule": null,
"createRule": null,
"updateRule": null,
"deleteRule": null,
"name": "network_probes",
"type": "base",
"fields": [
{
"autogeneratePattern": "[a-z0-9]{15}",
"hidden": false,
"id": "text3208210256",
"max": 15,
"min": 15,
"name": "id",
"pattern": "^[a-z0-9]+$",
"presentable": false,
"primaryKey": true,
"required": true,
"system": true,
"type": "text"
},
{
"cascadeDelete": true,
"collectionId": "2hz5ncl8tizk5nx",
"hidden": false,
"id": "np_system",
"maxSelect": 1,
"minSelect": 0,
"name": "system",
"presentable": false,
"required": true,
"system": false,
"type": "relation"
},
{
"hidden": false,
"id": "np_name",
"max": 200,
"min": 0,
"name": "name",
"pattern": "",
"presentable": false,
"primaryKey": false,
"required": false,
"system": false,
"type": "text"
},
{
"hidden": false,
"id": "np_target",
"max": 500,
"min": 1,
"name": "target",
"pattern": "",
"presentable": false,
"primaryKey": false,
"required": true,
"system": false,
"type": "text"
},
{
"hidden": false,
"id": "np_protocol",
"maxSelect": 1,
"name": "protocol",
"presentable": false,
"required": true,
"system": false,
"type": "select",
"values": ["icmp", "tcp", "http"]
},
{
"hidden": false,
"id": "np_port",
"max": 65535,
"min": 0,
"name": "port",
"onlyInt": true,
"presentable": false,
"required": false,
"system": false,
"type": "number"
},
{
"hidden": false,
"id": "np_interval",
"max": 3600,
"min": 1,
"name": "interval",
"onlyInt": true,
"presentable": false,
"required": true,
"system": false,
"type": "number"
},
{
"hidden": false,
"id": "np_enabled",
"name": "enabled",
"presentable": false,
"required": false,
"system": false,
"type": "bool"
},
{
"hidden": false,
"id": "autodate2990389176",
"name": "created",
"onCreate": true,
"onUpdate": false,
"presentable": false,
"system": false,
"type": "autodate"
},
{
"hidden": false,
"id": "autodate3332085495",
"name": "updated",
"onCreate": true,
"onUpdate": true,
"presentable": false,
"system": false,
"type": "autodate"
}
],
"indexes": [
"CREATE INDEX ` + "`" + `idx_np_system_enabled` + "`" + ` ON ` + "`" + `network_probes` + "`" + ` (\n ` + "`" + `system` + "`" + `,\n ` + "`" + `enabled` + "`" + `\n)"
],
"system": false
},
{
"id": "np_stats_001",
"listRule": null,
"viewRule": null,
"createRule": null,
"updateRule": null,
"deleteRule": null,
"name": "network_probe_stats",
"type": "base",
"fields": [
{
"autogeneratePattern": "[a-z0-9]{15}",
"hidden": false,
"id": "text3208210256",
"max": 15,
"min": 15,
"name": "id",
"pattern": "^[a-z0-9]+$",
"presentable": false,
"primaryKey": true,
"required": true,
"system": true,
"type": "text"
},
{
"cascadeDelete": true,
"collectionId": "2hz5ncl8tizk5nx",
"hidden": false,
"id": "nps_system",
"maxSelect": 1,
"minSelect": 0,
"name": "system",
"presentable": false,
"required": true,
"system": false,
"type": "relation"
},
{
"hidden": false,
"id": "nps_stats",
"maxSize": 2000000,
"name": "stats",
"presentable": false,
"required": true,
"system": false,
"type": "json"
},
{
"hidden": false,
"id": "nps_type",
"maxSelect": 1,
"name": "type",
"presentable": false,
"required": true,
"system": false,
"type": "select",
"values": ["1m", "10m", "20m", "120m", "480m"]
},
{
"hidden": false,
"id": "autodate2990389176",
"name": "created",
"onCreate": true,
"onUpdate": false,
"presentable": false,
"system": false,
"type": "autodate"
},
{
"hidden": false,
"id": "autodate3332085495",
"name": "updated",
"onCreate": true,
"onUpdate": true,
"presentable": false,
"system": false,
"type": "autodate"
}
],
"indexes": [
"CREATE INDEX ` + "`" + `idx_nps_system_type_created` + "`" + ` ON ` + "`" + `network_probe_stats` + "`" + ` (\n ` + "`" + `system` + "`" + `,\n ` + "`" + `type` + "`" + `,\n ` + "`" + `created` + "`" + `\n)"
],
"system": false
}
]`

View File

@@ -1,62 +0,0 @@
package migrations
import (
"github.com/pocketbase/pocketbase/core"
m "github.com/pocketbase/pocketbase/migrations"
)
func init() {
m.Register(func(app core.App) error {
collection, err := app.FindCollectionByNameOrId("np_probes_001")
if err != nil {
return err
}
// add field
if err := collection.Fields.AddMarshaledJSONAt(7, []byte(`{
"hidden": false,
"id": "number926446584",
"max": null,
"min": null,
"name": "latency",
"onlyInt": false,
"presentable": false,
"required": false,
"system": false,
"type": "number"
}`)); err != nil {
return err
}
// add field
if err := collection.Fields.AddMarshaledJSONAt(8, []byte(`{
"hidden": false,
"id": "number3726709001",
"max": null,
"min": null,
"name": "loss",
"onlyInt": false,
"presentable": false,
"required": false,
"system": false,
"type": "number"
}`)); err != nil {
return err
}
return app.Save(collection)
}, func(app core.App) error {
collection, err := app.FindCollectionByNameOrId("np_probes_001")
if err != nil {
return err
}
// remove field
collection.Fields.RemoveById("number926446584")
// remove field
collection.Fields.RemoveById("number3726709001")
return app.Save(collection)
})
}

View File

@@ -1,245 +0,0 @@
package migrations
import (
"github.com/pocketbase/pocketbase/core"
m "github.com/pocketbase/pocketbase/migrations"
)
func init() {
m.Register(func(app core.App) error {
jsonData := `[
{
"id": "np_probes_001",
"listRule": null,
"viewRule": null,
"createRule": null,
"updateRule": null,
"deleteRule": null,
"name": "network_probes",
"type": "base",
"fields": [
{
"autogeneratePattern": "[a-z0-9]{15}",
"hidden": false,
"id": "text3208210256",
"max": 15,
"min": 15,
"name": "id",
"pattern": "^[a-z0-9]+$",
"presentable": false,
"primaryKey": true,
"required": true,
"system": true,
"type": "text"
},
{
"cascadeDelete": true,
"collectionId": "2hz5ncl8tizk5nx",
"hidden": false,
"id": "np_system",
"maxSelect": 1,
"minSelect": 0,
"name": "system",
"presentable": false,
"required": true,
"system": false,
"type": "relation"
},
{
"hidden": false,
"id": "np_name",
"max": 200,
"min": 0,
"name": "name",
"pattern": "",
"presentable": false,
"primaryKey": false,
"required": false,
"system": false,
"type": "text"
},
{
"hidden": false,
"id": "np_target",
"max": 500,
"min": 1,
"name": "target",
"pattern": "",
"presentable": false,
"primaryKey": false,
"required": true,
"system": false,
"type": "text"
},
{
"hidden": false,
"id": "np_protocol",
"maxSelect": 1,
"name": "protocol",
"presentable": false,
"required": true,
"system": false,
"type": "select",
"values": ["icmp", "tcp", "http"]
},
{
"hidden": false,
"id": "np_port",
"max": 65535,
"min": 0,
"name": "port",
"onlyInt": true,
"presentable": false,
"required": false,
"system": false,
"type": "number"
},
{
"hidden": false,
"id": "np_interval",
"max": 3600,
"min": 1,
"name": "interval",
"onlyInt": true,
"presentable": false,
"required": true,
"system": false,
"type": "number"
},
{
"hidden": false,
"id": "np_enabled",
"name": "enabled",
"presentable": false,
"required": false,
"system": false,
"type": "bool"
},
{
"hidden": false,
"id": "autodate2990389176",
"name": "created",
"onCreate": true,
"onUpdate": false,
"presentable": false,
"system": false,
"type": "autodate"
},
{
"hidden": false,
"id": "autodate3332085495",
"name": "updated",
"onCreate": true,
"onUpdate": true,
"presentable": false,
"system": false,
"type": "autodate"
}
],
"indexes": [
"CREATE INDEX ` + "`" + `idx_np_system_enabled` + "`" + ` ON ` + "`" + `network_probes` + "`" + ` (\n ` + "`" + `system` + "`" + `,\n ` + "`" + `enabled` + "`" + `\n)"
],
"system": false
},
{
"id": "np_stats_001",
"listRule": null,
"viewRule": null,
"createRule": null,
"updateRule": null,
"deleteRule": null,
"name": "network_probe_stats",
"type": "base",
"fields": [
{
"autogeneratePattern": "[a-z0-9]{15}",
"hidden": false,
"id": "text3208210256",
"max": 15,
"min": 15,
"name": "id",
"pattern": "^[a-z0-9]+$",
"presentable": false,
"primaryKey": true,
"required": true,
"system": true,
"type": "text"
},
{
"cascadeDelete": true,
"collectionId": "2hz5ncl8tizk5nx",
"hidden": false,
"id": "nps_system",
"maxSelect": 1,
"minSelect": 0,
"name": "system",
"presentable": false,
"required": true,
"system": false,
"type": "relation"
},
{
"hidden": false,
"id": "nps_stats",
"maxSize": 2000000,
"name": "stats",
"presentable": false,
"required": true,
"system": false,
"type": "json"
},
{
"hidden": false,
"id": "nps_type",
"maxSelect": 1,
"name": "type",
"presentable": false,
"required": true,
"system": false,
"type": "select",
"values": ["1m", "10m", "20m", "120m", "480m"]
},
{
"hidden": false,
"id": "autodate2990389176",
"name": "created",
"onCreate": true,
"onUpdate": false,
"presentable": false,
"system": false,
"type": "autodate"
},
{
"hidden": false,
"id": "autodate3332085495",
"name": "updated",
"onCreate": true,
"onUpdate": true,
"presentable": false,
"system": false,
"type": "autodate"
}
],
"indexes": [
"CREATE INDEX ` + "`" + `idx_nps_system_type_created` + "`" + ` ON ` + "`" + `network_probe_stats` + "`" + ` (\n ` + "`" + `system` + "`" + `,\n ` + "`" + `type` + "`" + `,\n ` + "`" + `created` + "`" + `\n)"
],
"system": false
}
]`
return app.ImportCollectionsByMarshaledJSON([]byte(jsonData), false)
}, func(app core.App) error {
// down: remove the network probe collections
if c, err := app.FindCollectionByNameOrId("network_probes"); err == nil {
if err := app.Delete(c); err != nil {
return err
}
}
if c, err := app.FindCollectionByNameOrId("network_probe_stats"); err == nil {
if err := app.Delete(c); err != nil {
return err
}
}
return nil
})
}

View File

@@ -8,7 +8,6 @@ import (
"time"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/pocketbase/dbx"
@@ -71,7 +70,7 @@ func (rm *RecordManager) CreateLongerRecords() {
// wrap the operations in a transaction
rm.app.RunInTransaction(func(txApp core.App) error {
var err error
collections := [3]*core.Collection{}
collections := [2]*core.Collection{}
collections[0], err = txApp.FindCachedCollectionByNameOrId("system_stats")
if err != nil {
return err
@@ -80,10 +79,6 @@ func (rm *RecordManager) CreateLongerRecords() {
if err != nil {
return err
}
collections[2], err = txApp.FindCachedCollectionByNameOrId("network_probe_stats")
if err != nil {
return err
}
var systems RecordIds
db := txApp.DB()
@@ -143,9 +138,8 @@ func (rm *RecordManager) CreateLongerRecords() {
case "system_stats":
longerRecord.Set("stats", rm.AverageSystemStats(db, recordIds))
case "container_stats":
longerRecord.Set("stats", rm.AverageContainerStats(db, recordIds))
case "network_probe_stats":
longerRecord.Set("stats", rm.AverageProbeStats(db, recordIds))
}
if err := txApp.SaveNoValidate(longerRecord); err != nil {
log.Println("failed to save longer record", "err", err)
@@ -506,63 +500,6 @@ func AverageContainerStatsSlice(records [][]container.Stats) []container.Stats {
return result
}
// AverageProbeStats averages probe stats across multiple records.
// For each probe key: avg of avgs, min of mins, max of maxes, avg of losses.
func (rm *RecordManager) AverageProbeStats(db dbx.Builder, records RecordIds) map[string]probe.Result {
type probeValues struct {
sums probe.Result
count float64
}
query := db.NewQuery("SELECT stats FROM network_probe_stats WHERE id = {:id}")
// accumulate sums for each probe key across records
sums := make(map[string]*probeValues)
var row StatsRecord
for _, rec := range records {
row.Stats = row.Stats[:0]
query.Bind(dbx.Params{"id": rec.Id}).One(&row)
var rawStats map[string]probe.Result
if err := json.Unmarshal(row.Stats, &rawStats); err != nil {
continue
}
for key, vals := range rawStats {
s, ok := sums[key]
if !ok {
s = &probeValues{sums: make(probe.Result, len(vals))}
sums[key] = s
}
for i := range vals {
switch i {
case 1: // min fields
if s.count == 0 || vals[i] < s.sums[i] {
s.sums[i] = vals[i]
}
case 2: // max fields
if vals[i] > s.sums[i] {
s.sums[i] = vals[i]
}
default: // average fields
s.sums[i] += vals[i]
}
}
s.count++
}
}
// compute final averages
result := make(map[string]probe.Result, len(sums))
for key, s := range sums {
if s.count == 0 {
continue
}
s.sums[0] = twoDecimals(s.sums[0] / s.count) // avg latency
s.sums[3] = twoDecimals(s.sums[3] / s.count) // packet loss
result[key] = s.sums
}
return result
}
/* Round float to two decimals */
func twoDecimals(value float64) float64 {
return math.Round(value*100) / 100

View File

@@ -59,7 +59,7 @@ func deleteOldAlertsHistory(app core.App, countToKeep, countBeforeDeletion int)
// Deletes system_stats records older than what is displayed in the UI
func deleteOldSystemStats(app core.App) error {
// Collections to process
collections := [3]string{"system_stats", "container_stats", "network_probe_stats"}
collections := [2]string{"system_stats", "container_stats"}
// Record types and their retention periods
type RecordDeletionData struct {

View File

@@ -146,7 +146,7 @@ export default function AreaChartDefault({
axisLine={false}
/>
)}
{xAxis(chartData.chartTime, displayData.at(-1)?.created as number)}
{xAxis(chartData)}
<ChartTooltip
animationEasing="ease-out"
animationDuration={150}

View File

@@ -41,7 +41,6 @@ export default function LineChartDefault({
filter,
truncate = false,
chartProps,
connectNulls,
}: {
chartData: ChartData
// biome-ignore lint/suspicious/noExplicitAny: accepts different data source types (systemStats or containerData)
@@ -63,7 +62,6 @@ export default function LineChartDefault({
filter?: string
truncate?: boolean
chartProps?: Omit<React.ComponentProps<typeof LineChart>, "data" | "margin">
connectNulls?: boolean
}) {
const { yAxisWidth, updateYAxisWidth } = useYAxisWidth()
const { isIntersecting, ref } = useIntersectionObserver({ freeze: false })
@@ -107,7 +105,6 @@ export default function LineChartDefault({
// stackId={dataPoint.stackId}
order={dataPoint.order || i}
activeDot={dataPoint.activeDot ?? true}
connectNulls={connectNulls}
/>
)
})
@@ -148,7 +145,7 @@ export default function LineChartDefault({
axisLine={false}
/>
)}
{xAxis(chartData.chartTime, displayData.at(-1)?.created as number)}
{xAxis(chartData)}
<ChartTooltip
animationEasing="ease-out"
animationDuration={150}

View File

@@ -17,7 +17,7 @@ import { toast } from "../ui/use-toast"
import { OtpInputForm } from "./otp-forms"
const honeypot = v.literal("")
const emailSchema = v.pipe(v.string(), v.email(t`Invalid email address.`))
const emailSchema = v.pipe(v.string(), v.rfcEmail(t`Invalid email address.`))
const passwordSchema = v.pipe(
v.string(),
v.minLength(8, t`Password must be at least 8 characters.`),

View File

@@ -8,7 +8,6 @@ import {
LogOutIcon,
LogsIcon,
MenuIcon,
NetworkIcon,
PlusIcon,
SearchIcon,
ServerIcon,
@@ -110,10 +109,6 @@ export default function Navbar() {
<HardDriveIcon className="h-4 w-4 me-2.5" strokeWidth={1.5} />
<span>S.M.A.R.T.</span>
</DropdownMenuItem>
<DropdownMenuItem onClick={() => navigate(getPagePath($router, "probes"))} className="flex items-center">
<NetworkIcon className="h-4 w-4 me-2.5" strokeWidth={1.5} />
<Trans>Network Probes</Trans>
</DropdownMenuItem>
<DropdownMenuItem
onClick={() => navigate(getPagePath($router, "settings", { name: "general" }))}
className="flex items-center"
@@ -185,21 +180,6 @@ export default function Navbar() {
</TooltipTrigger>
<TooltipContent>S.M.A.R.T.</TooltipContent>
</Tooltip>
<Tooltip>
<TooltipTrigger asChild>
<Link
href={getPagePath($router, "probes")}
className={cn("hidden md:grid", buttonVariants({ variant: "ghost", size: "icon" }))}
aria-label="Network Probes"
onMouseEnter={() => import("@/components/routes/probes")}
>
<NetworkIcon className="h-[1.2rem] w-[1.2rem]" strokeWidth={1.5} />
</Link>
</TooltipTrigger>
<TooltipContent>
<Trans>Network Probes</Trans>
</TooltipContent>
</Tooltip>
<LangToggle />
<ModeToggle />
<Tooltip>

View File

@@ -1,216 +0,0 @@
import type { Column, ColumnDef } from "@tanstack/react-table"
import { Button } from "@/components/ui/button"
import { cn, decimalString, hourWithSeconds } from "@/lib/utils"
import {
GlobeIcon,
TimerIcon,
ActivityIcon,
WifiOffIcon,
Trash2Icon,
ArrowLeftRightIcon,
MoreHorizontalIcon,
ServerIcon,
ClockIcon,
NetworkIcon,
} from "lucide-react"
import { t } from "@lingui/core/macro"
import type { NetworkProbeRecord } from "@/types"
import { DropdownMenu, DropdownMenuContent, DropdownMenuItem, DropdownMenuTrigger } from "@/components/ui/dropdown-menu"
import { Trans } from "@lingui/react/macro"
import { pb } from "@/lib/api"
import { toast } from "../ui/use-toast"
import { $allSystemsById } from "@/lib/stores"
import { useStore } from "@nanostores/react"
const protocolColors: Record<string, string> = {
icmp: "bg-blue-500/15 text-blue-400",
tcp: "bg-purple-500/15 text-purple-400",
http: "bg-green-500/15 text-green-400",
}
async function deleteProbe(id: string) {
try {
await pb.collection("network_probes").delete(id)
} catch (err: unknown) {
toast({ variant: "destructive", title: t`Error`, description: (err as Error)?.message })
}
}
export function getProbeColumns(longestName = 0, longestTarget = 0): ColumnDef<NetworkProbeRecord>[] {
return [
{
id: "name",
sortingFn: (a, b) => (a.original.name || a.original.target).localeCompare(b.original.name || b.original.target),
accessorFn: (record) => record.name || record.target,
header: ({ column }) => <HeaderButton column={column} name={t`Name`} Icon={NetworkIcon} />,
cell: ({ getValue }) => (
<div className="ms-1.5 max-w-40 block truncate tabular-nums" style={{ width: `${longestName / 1.05}ch` }}>
{getValue() as string}
</div>
),
},
{
id: "system",
accessorFn: (record) => record.system,
sortingFn: (a, b) => {
const allSystems = $allSystemsById.get()
const systemNameA = allSystems[a.original.system]?.name ?? ""
const systemNameB = allSystems[b.original.system]?.name ?? ""
return systemNameA.localeCompare(systemNameB)
},
header: ({ column }) => <HeaderButton column={column} name={t`System`} Icon={ServerIcon} />,
cell: ({ getValue }) => {
const allSystems = useStore($allSystemsById)
return <span className="ms-1.5 xl:w-34 block truncate">{allSystems[getValue() as string]?.name ?? ""}</span>
},
},
{
id: "target",
sortingFn: (a, b) => a.original.target.localeCompare(b.original.target),
accessorFn: (record) => record.target,
header: ({ column }) => <HeaderButton column={column} name={t`Target`} Icon={GlobeIcon} />,
cell: ({ getValue }) => (
<div className="ms-1.5 tabular-nums block truncate max-w-44" style={{ width: `${longestTarget / 1.05}ch` }}>
{getValue() as string}
</div>
),
},
{
id: "protocol",
accessorFn: (record) => record.protocol,
header: ({ column }) => <HeaderButton column={column} name={t`Protocol`} Icon={ArrowLeftRightIcon} />,
cell: ({ getValue }) => {
const protocol = getValue() as string
return (
<span className={cn("ms-1.5 px-2 py-0.5 rounded text-xs font-medium uppercase", protocolColors[protocol])}>
{protocol}
</span>
)
},
},
{
id: "interval",
accessorFn: (record) => record.interval,
header: ({ column }) => <HeaderButton column={column} name={t`Interval`} Icon={TimerIcon} />,
cell: ({ getValue }) => <span className="ms-1.5 tabular-nums">{getValue() as number}s</span>,
},
{
id: "latency",
accessorFn: (record) => record.latency,
invertSorting: true,
header: ({ column }) => <HeaderButton column={column} name={t`Latency`} Icon={ActivityIcon} />,
cell: ({ row }) => {
const val = row.original.latency
if (!val) {
return <span className="ms-1.5 text-muted-foreground">-</span>
}
let color = "bg-green-500"
if (val > 200) {
color = "bg-yellow-500"
}
if (val > 2000) {
color = "bg-red-500"
}
return (
<span className="ms-1.5 tabular-nums flex gap-2 items-center">
<span className={cn("shrink-0 size-2 rounded-full", color)} />
{decimalString(val, val < 100 ? 2 : 1).toLocaleString()} ms
</span>
)
},
},
{
id: "loss",
accessorFn: (record) => record.loss,
invertSorting: true,
header: ({ column }) => <HeaderButton column={column} name={t`Loss`} Icon={WifiOffIcon} />,
cell: ({ row }) => {
const { loss, latency } = row.original
if (loss === undefined || (!latency && !loss)) {
return <span className="ms-1.5 text-muted-foreground">-</span>
}
let color = "bg-green-500"
if (loss) {
color = loss > 20 ? "bg-red-500" : "bg-yellow-500"
}
return (
<span className="ms-1.5 tabular-nums flex gap-2 items-center">
<span className={cn("shrink-0 size-2 rounded-full", color)} />
{loss}%
</span>
)
},
},
{
id: "updated",
invertSorting: true,
accessorFn: (record) => record.updated,
header: ({ column }) => <HeaderButton column={column} name={t`Updated`} Icon={ClockIcon} />,
cell: ({ getValue }) => {
const timestamp = getValue() as number
return <span className="ms-1.5 tabular-nums">{hourWithSeconds(new Date(timestamp).toISOString())}</span>
},
},
{
id: "actions",
enableSorting: false,
header: () => null,
size: 40,
cell: ({ row }) => (
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button
variant="ghost"
size="icon"
className="size-10"
onClick={(event) => event.stopPropagation()}
onMouseDown={(event) => event.stopPropagation()}
>
<span className="sr-only">
<Trans>Open menu</Trans>
</span>
<MoreHorizontalIcon className="w-5" />
</Button>
</DropdownMenuTrigger>
<DropdownMenuContent align="end" onClick={(event) => event.stopPropagation()}>
<DropdownMenuItem
onClick={(event) => {
event.stopPropagation()
deleteProbe(row.original.id)
}}
>
<Trash2Icon className="me-2.5 size-4" />
<Trans>Delete</Trans>
</DropdownMenuItem>
</DropdownMenuContent>
</DropdownMenu>
),
},
]
}
function HeaderButton({
column,
name,
Icon,
}: {
column: Column<NetworkProbeRecord>
name: string
Icon: React.ElementType
}) {
const isSorted = column.getIsSorted()
return (
<Button
className={cn(
"h-9 px-3 flex items-center gap-2 duration-50",
isSorted && "bg-accent/70 light:bg-accent text-accent-foreground/90"
)}
variant="ghost"
onClick={() => column.toggleSorting(column.getIsSorted() === "asc")}
>
{Icon && <Icon className="size-4" />}
{name}
{/* <ArrowUpDownIcon className="size-4" /> */}
</Button>
)
}

View File

@@ -1,225 +0,0 @@
import { t } from "@lingui/core/macro"
import { Trans } from "@lingui/react/macro"
import {
type ColumnFiltersState,
flexRender,
getCoreRowModel,
getFilteredRowModel,
getSortedRowModel,
type Row,
type SortingState,
type Table as TableType,
useReactTable,
type VisibilityState,
} from "@tanstack/react-table"
import { useVirtualizer, type VirtualItem } from "@tanstack/react-virtual"
import { memo, useMemo, useRef, useState } from "react"
import { getProbeColumns } from "@/components/network-probes-table/network-probes-columns"
import { Card, CardHeader, CardTitle } from "@/components/ui/card"
import { Input } from "@/components/ui/input"
import { TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table"
import { isReadOnlyUser } from "@/lib/api"
import { $allSystemsById } from "@/lib/stores"
import { cn, getVisualStringWidth, useBrowserStorage } from "@/lib/utils"
import type { NetworkProbeRecord } from "@/types"
import { AddProbeDialog } from "./probe-dialog"
export default function NetworkProbesTableNew({
systemId,
probes,
}: {
systemId?: string
probes: NetworkProbeRecord[]
}) {
const [sorting, setSorting] = useBrowserStorage<SortingState>(
`sort-np-${systemId ? 1 : 0}`,
[{ id: systemId ? "name" : "system", desc: false }],
sessionStorage
)
const [columnFilters, setColumnFilters] = useState<ColumnFiltersState>([])
const [columnVisibility, setColumnVisibility] = useState<VisibilityState>({})
const [globalFilter, setGlobalFilter] = useState("")
const { longestName, longestTarget } = useMemo(() => {
let longestName = 0
let longestTarget = 0
for (const p of probes) {
longestName = Math.max(longestName, getVisualStringWidth(p.name || p.target))
longestTarget = Math.max(longestTarget, getVisualStringWidth(p.target))
}
return { longestName, longestTarget }
}, [probes])
// Filter columns based on whether systemId is provided
const columns = useMemo(() => {
let columns = getProbeColumns(longestName, longestTarget)
columns = systemId ? columns.filter((col) => col.id !== "system") : columns
columns = isReadOnlyUser() ? columns.filter((col) => col.id !== "actions") : columns
return columns
}, [systemId, longestName, longestTarget])
const table = useReactTable({
data: probes,
columns,
getCoreRowModel: getCoreRowModel(),
getSortedRowModel: getSortedRowModel(),
getFilteredRowModel: getFilteredRowModel(),
onSortingChange: setSorting,
onColumnFiltersChange: setColumnFilters,
onColumnVisibilityChange: setColumnVisibility,
defaultColumn: {
sortUndefined: "last",
size: 900,
minSize: 0,
},
state: {
sorting,
columnFilters,
columnVisibility,
globalFilter,
},
onGlobalFilterChange: setGlobalFilter,
globalFilterFn: (row, _columnId, filterValue) => {
const probe = row.original
const systemName = $allSystemsById.get()[probe.system]?.name ?? ""
const searchString = `${probe.name}${probe.target}${probe.protocol}${systemName}`.toLocaleLowerCase()
return (filterValue as string)
.toLowerCase()
.split(" ")
.every((term) => searchString.includes(term))
},
})
const rows = table.getRowModel().rows
const visibleColumns = table.getVisibleLeafColumns()
return (
<Card className="@container w-full px-3 py-5 sm:py-6 sm:px-6">
<CardHeader className="p-0 mb-3 sm:mb-4">
<div className="grid md:flex gap-x-5 gap-y-3 w-full items-end">
<div className="px-2 sm:px-1">
<CardTitle className="mb-2">
<Trans>Network Probes</Trans>
</CardTitle>
<div className="text-sm text-muted-foreground flex items-center flex-wrap">
<Trans>ICMP/TCP/HTTP latency monitoring from agents</Trans>
</div>
</div>
<div className="md:ms-auto flex items-center gap-2">
{probes.length > 0 && (
<Input
placeholder={t`Filter...`}
value={globalFilter}
onChange={(e) => setGlobalFilter(e.target.value)}
className="ms-auto px-4 w-full max-w-full md:w-64"
/>
)}
{!isReadOnlyUser() ? <AddProbeDialog systemId={systemId} /> : null}
</div>
</div>
</CardHeader>
<div className="rounded-md">
<NetworkProbesTable table={table} rows={rows} colLength={visibleColumns.length} />
</div>
</Card>
)
}
const NetworkProbesTable = memo(function NetworkProbeTable({
table,
rows,
colLength,
}: {
table: TableType<NetworkProbeRecord>
rows: Row<NetworkProbeRecord>[]
colLength: number
}) {
// The virtualizer will need a reference to the scrollable container element
const scrollRef = useRef<HTMLDivElement>(null)
const virtualizer = useVirtualizer<HTMLDivElement, HTMLTableRowElement>({
count: rows.length,
estimateSize: () => 54,
getScrollElement: () => scrollRef.current,
overscan: 5,
})
const virtualRows = virtualizer.getVirtualItems()
const paddingTop = Math.max(0, virtualRows[0]?.start ?? 0 - virtualizer.options.scrollMargin)
const paddingBottom = Math.max(0, virtualizer.getTotalSize() - (virtualRows[virtualRows.length - 1]?.end ?? 0))
return (
<div
className={cn(
"h-min max-h-[calc(100dvh-17rem)] max-w-full relative overflow-auto border rounded-md",
// don't set min height if there are less than 2 rows, do set if we need to display the empty state
(!rows.length || rows.length > 2) && "min-h-50"
)}
ref={scrollRef}
>
{/* add header height to table size */}
<div style={{ height: `${virtualizer.getTotalSize() + 48}px`, paddingTop, paddingBottom }}>
<table className="text-sm w-full h-full text-nowrap">
<NetworkProbeTableHead table={table} />
<TableBody>
{rows.length ? (
virtualRows.map((virtualRow) => {
const row = rows[virtualRow.index]
return <NetworkProbeTableRow key={row.id} row={row} virtualRow={virtualRow} />
})
) : (
<TableRow>
<TableCell colSpan={colLength} className="h-37 text-center pointer-events-none">
<Trans>No results.</Trans>
</TableCell>
</TableRow>
)}
</TableBody>
</table>
</div>
</div>
)
})
function NetworkProbeTableHead({ table }: { table: TableType<NetworkProbeRecord> }) {
return (
<TableHeader className="sticky top-0 z-50 w-full border-b-2">
{table.getHeaderGroups().map((headerGroup) => (
<tr key={headerGroup.id}>
{headerGroup.headers.map((header) => {
return (
<TableHead className="px-2" key={header.id}>
{header.isPlaceholder ? null : flexRender(header.column.columnDef.header, header.getContext())}
</TableHead>
)
})}
</tr>
))}
</TableHeader>
)
}
const NetworkProbeTableRow = memo(function NetworkProbeTableRow({
row,
virtualRow,
}: {
row: Row<NetworkProbeRecord>
virtualRow: VirtualItem
}) {
return (
<TableRow data-state={row.getIsSelected() && "selected"} className="transition-opacity">
{row.getVisibleCells().map((cell) => (
<TableCell
key={cell.id}
className="py-0"
style={{
width: `${cell.column.getSize()}px`,
height: virtualRow.size,
}}
>
{flexRender(cell.column.columnDef.cell, cell.getContext())}
</TableCell>
))}
</TableRow>
)
})

View File

@@ -1,178 +0,0 @@
import { useState } from "react"
import { Trans, useLingui } from "@lingui/react/macro"
import { useStore } from "@nanostores/react"
import { pb } from "@/lib/api"
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
DialogTrigger,
} from "@/components/ui/dialog"
import { Button } from "@/components/ui/button"
import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label"
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"
import { PlusIcon } from "lucide-react"
import { useToast } from "@/components/ui/use-toast"
import { $systems } from "@/lib/stores"
export function AddProbeDialog({ systemId }: { systemId?: string }) {
const [open, setOpen] = useState(false)
const [protocol, setProtocol] = useState<string>("icmp")
const [target, setTarget] = useState("")
const [port, setPort] = useState("")
const [probeInterval, setProbeInterval] = useState("30")
const [name, setName] = useState("")
const [loading, setLoading] = useState(false)
const [selectedSystemId, setSelectedSystemId] = useState("")
const systems = useStore($systems)
const { toast } = useToast()
const { t } = useLingui()
const targetName = target.replace(/^https?:\/\//, "")
const resetForm = () => {
setProtocol("icmp")
setTarget("")
setPort("")
setProbeInterval("30")
setName("")
setSelectedSystemId("")
}
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault()
setLoading(true)
try {
await pb.collection("network_probes").create({
system: systemId ?? selectedSystemId,
name: name || targetName,
target,
protocol,
port: protocol === "tcp" ? Number(port) : 0,
interval: Number(probeInterval),
enabled: true,
})
resetForm()
setOpen(false)
} catch (err: unknown) {
toast({ variant: "destructive", title: t`Error`, description: (err as Error)?.message })
} finally {
setLoading(false)
}
}
return (
<Dialog open={open} onOpenChange={setOpen}>
<DialogTrigger asChild>
<Button variant="outline">
<PlusIcon className="size-4 me-1" />
<Trans>Add {{ foo: t`Probe` }}</Trans>
</Button>
</DialogTrigger>
<DialogContent className="max-w-md">
<DialogHeader>
<DialogTitle>
<Trans>Add {{ foo: t`Network Probe` }}</Trans>
</DialogTitle>
<DialogDescription>
<Trans>Configure latency monitoring from this agent.</Trans>
</DialogDescription>
</DialogHeader>
<form onSubmit={handleSubmit} className="grid gap-4 tabular-nums">
{!systemId && (
<div className="grid gap-2">
<Label>
<Trans>System</Trans>
</Label>
<Select value={selectedSystemId} onValueChange={setSelectedSystemId} required>
<SelectTrigger>
<SelectValue placeholder={t`Select a system`} />
</SelectTrigger>
<SelectContent>
{systems.map((sys) => (
<SelectItem key={sys.id} value={sys.id}>
{sys.name}
</SelectItem>
))}
</SelectContent>
</Select>
</div>
)}
<div className="grid gap-2">
<Label>
<Trans>Target</Trans>
</Label>
<Input
value={target}
onChange={(e) => setTarget(e.target.value)}
placeholder={protocol === "http" ? "https://example.com" : "1.1.1.1"}
required
/>
</div>
<div className="grid gap-2">
<Label>
<Trans>Protocol</Trans>
</Label>
<Select value={protocol} onValueChange={setProtocol}>
<SelectTrigger>
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="icmp">ICMP</SelectItem>
<SelectItem value="tcp">TCP</SelectItem>
<SelectItem value="http">HTTP</SelectItem>
</SelectContent>
</Select>
</div>
{protocol === "tcp" && (
<div className="grid gap-2">
<Label>
<Trans>Port</Trans>
</Label>
<Input
type="number"
value={port}
onChange={(e) => setPort(e.target.value)}
placeholder="443"
min={1}
max={65535}
required
/>
</div>
)}
<div className="grid gap-2">
<Label>
<Trans>Interval (seconds)</Trans>
</Label>
<Input
type="number"
value={probeInterval}
onChange={(e) => setProbeInterval(e.target.value)}
min={1}
max={3600}
required
/>
</div>
<div className="grid gap-2">
<Label>
<Trans>Name (optional)</Trans>
</Label>
<Input
value={name}
onChange={(e) => setName(e.target.value)}
placeholder={targetName || t`e.g. Cloudflare DNS`}
/>
</div>
<DialogFooter>
<Button type="submit" disabled={loading || (!systemId && !selectedSystemId)}>
{loading ? <Trans>Creating...</Trans> : <Trans>Add {{ foo: t`Probe` }}</Trans>}
</Button>
</DialogFooter>
</form>
</DialogContent>
</Dialog>
)
}

View File

@@ -4,7 +4,6 @@ const routes = {
home: "/",
containers: "/containers",
smart: "/smart",
probes: "/probes",
system: `/system/:id`,
settings: `/settings/:name?`,
forgot_password: `/forgot-password`,

View File

@@ -1,25 +0,0 @@
import { useLingui } from "@lingui/react/macro"
import { memo, useEffect } from "react"
import NetworkProbesTableNew from "@/components/network-probes-table/network-probes-table"
import { ActiveAlerts } from "@/components/active-alerts"
import { FooterRepoLink } from "@/components/footer-repo-link"
import { useNetworkProbesData } from "@/lib/use-network-probes"
export default memo(() => {
const { t } = useLingui()
const { probes } = useNetworkProbesData({})
useEffect(() => {
document.title = `${t`Network Probes`} / Beszel`
}, [t])
return (
<>
<div className="grid gap-4">
<ActiveAlerts />
<NetworkProbesTableNew probes={probes} />
</div>
<FooterRepoLink />
</>
)
})

View File

@@ -24,7 +24,7 @@ interface ShoutrrrUrlCardProps {
}
const NotificationSchema = v.object({
emails: v.array(v.pipe(v.string(), v.email())),
emails: v.array(v.pipe(v.string(), v.rfcEmail())),
webhooks: v.array(v.pipe(v.string(), v.url())),
})

View File

@@ -11,9 +11,9 @@ import { RootDiskCharts, ExtraFsCharts } from "./system/charts/disk-charts"
import { BandwidthChart, ContainerNetworkChart } from "./system/charts/network-charts"
import { TemperatureChart, BatteryChart } from "./system/charts/sensor-charts"
import { GpuPowerChart, GpuDetailCharts } from "./system/charts/gpu-charts"
import { LazyContainersTable, LazySmartTable, LazySystemdTable, LazyNetworkProbesTable } from "./system/lazy-tables"
import { LazyContainersTable, LazySmartTable, LazySystemdTable } from "./system/lazy-tables"
import { LoadAverageChart } from "./system/charts/load-average-chart"
import { ContainerIcon, CpuIcon, HardDriveIcon, NetworkIcon, TerminalSquareIcon } from "lucide-react"
import { ContainerIcon, CpuIcon, HardDriveIcon, TerminalSquareIcon } from "lucide-react"
import { GpuIcon } from "../ui/icons"
import SystemdTable from "../systemd-table/systemd-table"
import ContainersTable from "../containers-table/containers-table"
@@ -65,7 +65,7 @@ export default memo(function SystemDetail({ id }: { id: string }) {
const hasGpu = hasGpuData || hasGpuPowerData
// keep tabsRef in sync for keyboard navigation
const tabs = ["core", "network", "disk"]
const tabs = ["core", "disk"]
if (hasGpu) tabs.push("gpu")
if (hasContainers) tabs.push("containers")
if (hasSystemd) tabs.push("services")
@@ -145,8 +145,6 @@ export default memo(function SystemDetail({ id }: { id: string }) {
{hasContainersTable && <LazyContainersTable systemId={system.id} />}
{hasSystemd && <LazySystemdTable systemId={system.id} />}
<LazyNetworkProbesTable systemId={system.id} systemData={systemData} />
</>
)
}
@@ -159,10 +157,6 @@ export default memo(function SystemDetail({ id }: { id: string }) {
<CpuIcon className="size-3.5" />
<Trans context="Core system metrics">Core</Trans>
</TabsTrigger>
<TabsTrigger value="network" className="w-full flex items-center gap-1.5">
<NetworkIcon className="size-3.5" />
<Trans>Network</Trans>
</TabsTrigger>
<TabsTrigger value="disk" className="w-full flex items-center gap-1.5">
<HardDriveIcon className="size-3.5" />
<Trans>Disk</Trans>
@@ -190,26 +184,16 @@ export default memo(function SystemDetail({ id }: { id: string }) {
<TabsContent value="core" forceMount className={activeTab === "core" ? "contents" : "hidden"}>
<div className="grid xl:grid-cols-2 gap-4">
<CpuChart {...coreProps} />
<LoadAverageChart chartData={chartData} grid={grid} dataEmpty={dataEmpty} />
<MemoryChart {...coreProps} />
<SwapChart chartData={chartData} grid={grid} dataEmpty={dataEmpty} systemStats={systemStats} />
<LoadAverageChart chartData={chartData} grid={grid} dataEmpty={dataEmpty} />
<BandwidthChart {...coreProps} systemStats={systemStats} />
<TemperatureChart {...coreProps} setPageBottomExtraMargin={setPageBottomExtraMargin} />
<BatteryChart {...coreProps} />
<SwapChart chartData={chartData} grid={grid} dataEmpty={dataEmpty} systemStats={systemStats} />
{pageBottomExtraMargin > 0 && <div style={{ marginBottom: pageBottomExtraMargin }}></div>}
</div>
</TabsContent>
<TabsContent value="network" forceMount className={activeTab === "network" ? "contents" : "hidden"}>
{mountedTabs.has("network") && (
<>
<div className="grid xl:grid-cols-2 gap-4">
<BandwidthChart {...coreProps} systemStats={systemStats} />
</div>
<LazyNetworkProbesTable systemId={system.id} systemData={systemData} />
</>
)}
</TabsContent>
<TabsContent value="disk" forceMount className={activeTab === "disk" ? "contents" : "hidden"}>
{mountedTabs.has("disk") && (
<>

View File

@@ -1,6 +1,7 @@
import { timeTicks } from "d3-time"
import { getPbTimestamp, pb } from "@/lib/api"
import { chartTimeData } from "@/lib/utils"
import type { ChartData, ChartTimes, ContainerStatsRecord, NetworkProbeStatsRecord, SystemStatsRecord } from "@/types"
import type { ChartData, ChartTimes, ContainerStatsRecord, SystemStatsRecord } from "@/types"
type ChartTimeData = {
time: number
@@ -16,6 +17,27 @@ export const cache = new Map<
ChartTimeData | SystemStatsRecord[] | ContainerStatsRecord[] | ChartData["containerData"]
>()
// create ticks and domain for charts
export function getTimeData(chartTime: ChartTimes, lastCreated: number) {
const cached = cache.get("td") as ChartTimeData | undefined
if (cached && cached.chartTime === chartTime) {
if (!lastCreated || cached.time >= lastCreated) {
return cached.data
}
}
// const buffer = chartTime === "1m" ? 400 : 20_000
const now = new Date(Date.now())
const startTime = chartTimeData[chartTime].getOffset(now)
const ticks = timeTicks(startTime, now, chartTimeData[chartTime].ticks ?? 12).map((date) => date.getTime())
const data = {
ticks,
domain: [chartTimeData[chartTime].getOffset(now).getTime(), now.getTime()],
}
cache.set("td", { time: now.getTime(), data, chartTime })
return data
}
/** Append new records onto prev with gap detection. Converts string `created` values to ms timestamps in place.
* Pass `maxLen` to cap the result length in one copy instead of slicing again after the call. */
export function appendData<T extends { created: string | number | null }>(
@@ -44,12 +66,12 @@ export function appendData<T extends { created: string | number | null }>(
return result
}
export async function getStats<T extends SystemStatsRecord | ContainerStatsRecord | NetworkProbeStatsRecord>(
export async function getStats<T extends SystemStatsRecord | ContainerStatsRecord>(
collection: string,
systemId: string,
chartTime: ChartTimes,
cachedStats?: { created: string | number | null }[]
chartTime: ChartTimes
): Promise<T[]> {
const cachedStats = cache.get(`${systemId}_${chartTime}_${collection}`) as T[] | undefined
const lastCached = cachedStats?.at(-1)?.created as number
return await pb.collection<T>(collection).getFullList({
filter: pb.filter("system={:id} && created > {:created} && type={:type}", {

View File

@@ -1,153 +0,0 @@
import LineChartDefault from "@/components/charts/line-chart"
import type { DataPoint } from "@/components/charts/line-chart"
import { toFixedFloat, decimalString } from "@/lib/utils"
import { useLingui } from "@lingui/react/macro"
import { ChartCard, FilterBar } from "../chart-card"
import type { ChartData, NetworkProbeRecord, NetworkProbeStatsRecord } from "@/types"
import { useMemo } from "react"
import { atom } from "nanostores"
import { useStore } from "@nanostores/react"
import { probeKey } from "@/lib/use-network-probes"
const $filter = atom("")
type ProbeChartProps = {
probeStats: NetworkProbeStatsRecord[]
grid?: boolean
probes: NetworkProbeRecord[]
chartData: ChartData
empty: boolean
}
type ProbeChartBaseProps = ProbeChartProps & {
valueIndex: number
title: string
description: string
tickFormatter: (value: number) => string
contentFormatter: ({ value }: { value: number | string }) => string | number
domain?: [number | "auto", number | "auto"]
}
function ProbeChart({
probeStats,
grid,
probes,
chartData,
empty,
valueIndex,
title,
description,
tickFormatter,
contentFormatter,
domain,
}: ProbeChartBaseProps) {
const filter = useStore($filter)
const { dataPoints, visibleKeys } = useMemo(() => {
const sortedProbes = [...probes].sort((a, b) => b.latency - a.latency)
const count = sortedProbes.length
const points: DataPoint<NetworkProbeStatsRecord>[] = []
const visibleKeys: string[] = []
const filterTerms = filter
? filter
.toLowerCase()
.split(" ")
.filter((term) => term.length > 0)
: []
for (let i = 0; i < count; i++) {
const p = sortedProbes[i]
const key = probeKey(p)
const filtered = filterTerms.length > 0 && !filterTerms.some((term) => key.toLowerCase().includes(term))
if (filtered) {
continue
}
visibleKeys.push(key)
points.push({
order: i,
label: p.name || p.target,
dataKey: (record: NetworkProbeStatsRecord) => record.stats?.[key]?.[valueIndex] ?? "-",
color: count <= 5 ? i + 1 : `hsl(${(i * 360) / count}, var(--chart-saturation), var(--chart-lightness))`,
})
}
return { dataPoints: points, visibleKeys }
}, [probes, filter, valueIndex])
const filteredProbeStats = useMemo(() => {
if (!visibleKeys.length) return probeStats
return probeStats.filter((record) => visibleKeys.some((key) => record.stats?.[key] != null))
}, [probeStats, visibleKeys])
const legend = dataPoints.length < 10
return (
<ChartCard
legend={legend}
cornerEl={<FilterBar store={$filter} />}
empty={empty}
title={title}
description={description}
grid={grid}
>
<LineChartDefault
chartData={chartData}
customData={filteredProbeStats}
dataPoints={dataPoints}
domain={domain ?? ["auto", "auto"]}
connectNulls
tickFormatter={tickFormatter}
contentFormatter={contentFormatter}
legend={legend}
filter={filter}
/>
</ChartCard>
)
}
export function LatencyChart({ probeStats, grid, probes, chartData, empty }: ProbeChartProps) {
const { t } = useLingui()
return (
<ProbeChart
probeStats={probeStats}
grid={grid}
probes={probes}
chartData={chartData}
empty={empty}
valueIndex={0}
title={t`Latency`}
description={t`Average round-trip time (ms)`}
tickFormatter={(value) => `${toFixedFloat(value, value >= 10 ? 0 : 1)} ms`}
contentFormatter={({ value }) => {
if (typeof value !== "number") {
return value
}
return `${decimalString(value, 2)} ms`
}}
/>
)
}
export function LossChart({ probeStats, grid, probes, chartData, empty }: ProbeChartProps) {
const { t } = useLingui()
return (
<ProbeChart
probeStats={probeStats}
grid={grid}
probes={probes}
chartData={chartData}
empty={empty}
valueIndex={3}
title={t`Loss`}
description={t`Packet loss (%)`}
domain={[0, 100]}
tickFormatter={(value) => `${toFixedFloat(value, value >= 10 ? 0 : 1)}%`}
contentFormatter={({ value }) => {
if (typeof value !== "number") {
return value
}
return `${decimalString(value, 2)}%`
}}
/>
)
}

View File

@@ -120,7 +120,8 @@ export function TemperatureChart({
label: key,
dataKey: dataKeys[key],
color: colorMap[key],
opacity: strokeOpacity,
strokeOpacity,
activeDot: !filtered,
}
})
}, [sortedKeys, filter, dataKeys, colorMap])
@@ -134,7 +135,7 @@ export function TemperatureChart({
// label: `Test ${++i}`,
// dataKey: () => 0,
// color: "red",
// opacity: 1,
// strokeOpacity: 1,
// })
// }
// }
@@ -202,6 +203,7 @@ export function TemperatureChart({
return `${decimalString(value)} ${unit}`
}}
dataPoints={dataPoints}
filter={filter}
></LineChartDefault>
</ChartCard>
</div>

View File

@@ -1,11 +1,6 @@
import { lazy } from "react"
import { useIntersectionObserver } from "@/lib/use-intersection-observer"
import { cn } from "@/lib/utils"
import { LatencyChart, LossChart } from "./charts/probes-charts"
import type { SystemData } from "./use-system-data"
import { $chartTime } from "@/lib/stores"
import { useStore } from "@nanostores/react"
import { useNetworkProbesData } from "@/lib/use-network-probes"
const ContainersTable = lazy(() => import("../../containers-table/containers-table"))
@@ -39,46 +34,3 @@ export function LazySystemdTable({ systemId }: { systemId: string }) {
</div>
)
}
const NetworkProbesTable = lazy(() => import("@/components/network-probes-table/network-probes-table"))
export function LazyNetworkProbesTable({ systemId, systemData }: { systemId: string; systemData: SystemData }) {
const { isIntersecting, ref } = useIntersectionObserver()
return (
<div ref={ref} className={cn(isIntersecting && "contents")}>
{isIntersecting && <ProbesTable systemId={systemId} systemData={systemData} />}
</div>
)
}
function ProbesTable({ systemId, systemData }: { systemId: string; systemData: SystemData }) {
const { grid, chartData } = systemData ?? {}
const chartTime = useStore($chartTime)
const { probes, probeStats } = useNetworkProbesData({ systemId, loadStats: !!chartData, chartTime })
return (
<>
<NetworkProbesTable systemId={systemId} probes={probes} />
{!!chartData && !!probes.length && (
<div className="grid xl:grid-cols-2 gap-4">
<LatencyChart
probeStats={probeStats}
grid={grid}
probes={probes}
chartData={chartData}
empty={!probeStats.length}
/>
<LossChart
probeStats={probeStats}
grid={grid}
probes={probes}
chartData={chartData}
empty={!probeStats.length}
/>
</div>
)}
</>
)
}

View File

@@ -26,7 +26,7 @@ import type {
SystemStatsRecord,
} from "@/types"
import { $router, navigate } from "../../router"
import { appendData, cache, getStats, makeContainerData, makeContainerPoint } from "./chart-data"
import { appendData, cache, getStats, getTimeData, makeContainerData, makeContainerPoint } from "./chart-data"
export type SystemData = ReturnType<typeof useSystemData>
@@ -151,11 +151,16 @@ export function useSystemData(id: string) {
const agentVersion = useMemo(() => parseSemVer(system?.info?.v), [system?.info?.v])
const chartData: ChartData = useMemo(() => {
const lastCreated = Math.max(
(systemStats.at(-1)?.created as number) ?? 0,
(containerData.at(-1)?.created as number) ?? 0
)
return {
systemStats,
containerData,
chartTime,
orientation: direction === "rtl" ? "right" : "left",
...getTimeData(chartTime, lastCreated),
agentVersion,
}
}, [systemStats, containerData, direction])
@@ -195,8 +200,8 @@ export function useSystemData(id: string) {
}
Promise.allSettled([
getStats<SystemStatsRecord>("system_stats", systemId, chartTime, cachedSystemStats),
getStats<ContainerStatsRecord>("container_stats", systemId, chartTime, cachedContainerData),
getStats<SystemStatsRecord>("system_stats", systemId, chartTime),
getStats<ContainerStatsRecord>("container_stats", systemId, chartTime),
]).then(([systemStats, containerStats]) => {
// If another request has been made since this one, ignore the results
if (requestId !== statsRequestId.current) {

View File

@@ -3,10 +3,9 @@ import { useLingui } from "@lingui/react/macro"
import * as React from "react"
import * as RechartsPrimitive from "recharts"
import { chartTimeData, cn } from "@/lib/utils"
import type { ChartTimes } from "@/types"
import type { ChartData } from "@/types"
import { Separator } from "./separator"
import { AxisDomain } from "recharts/types/util/types"
import { timeTicks } from "d3-time"
// Format: { THEME_NAME: CSS_SELECTOR }
const THEMES = { light: "", dark: ".dark" } as const
@@ -401,22 +400,12 @@ function getPayloadConfigFromPayload(config: ChartConfig, payload: unknown, key:
return configLabelKey in config ? config[configLabelKey] : config[key as keyof typeof config]
}
let cachedAxis: {
time: number
el: JSX.Element
let cachedAxis: JSX.Element
const xAxis = ({ domain, ticks, chartTime }: ChartData) => {
if (cachedAxis && domain[0] === cachedAxis.props.domain[0]) {
return cachedAxis
}
const xAxis = (chartTime: ChartTimes, lastCreationTime: number) => {
if (Math.abs(lastCreationTime - cachedAxis?.time) < 1000) {
return cachedAxis.el
}
const now = new Date(lastCreationTime + 1000)
const startTime = chartTimeData[chartTime].getOffset(now)
const ticks = timeTicks(startTime, now, chartTimeData[chartTime].ticks ?? 12).map((date) => date.getTime())
const domain = [chartTimeData[chartTime].getOffset(now).getTime(), now.getTime()]
cachedAxis = {
time: lastCreationTime,
el: (
cachedAxis = (
<RechartsPrimitive.XAxis
dataKey="created"
domain={domain}
@@ -429,9 +418,8 @@ const xAxis = (chartTime: ChartTimes, lastCreationTime: number) => {
axisLine={false}
tickFormatter={chartTimeData[chartTime].format}
/>
),
}
return cachedAxis.el
)
return cachedAxis
}
export {

View File

@@ -1,319 +0,0 @@
import { chartTimeData } from "@/lib/utils"
import type { ChartTimes, NetworkProbeRecord, NetworkProbeStatsRecord } from "@/types"
import { useEffect, useRef, useState } from "react"
import { getStats, appendData } from "@/components/routes/system/chart-data"
import { pb } from "@/lib/api"
import { toast } from "@/components/ui/use-toast"
import type { RecordListOptions, RecordSubscription } from "pocketbase"
const cache = new Map<string, NetworkProbeStatsRecord[]>()
function getCacheValue(systemId: string, chartTime: ChartTimes | "rt") {
return cache.get(`${systemId}${chartTime}`) || []
}
function appendCacheValue(
systemId: string,
chartTime: ChartTimes | "rt",
newStats: NetworkProbeStatsRecord[],
maxPoints = 100
) {
const cache_key = `${systemId}${chartTime}`
const existingStats = getCacheValue(systemId, chartTime)
if (existingStats) {
const { expectedInterval } = chartTimeData[chartTime]
const updatedStats = appendData(existingStats, newStats, expectedInterval, maxPoints)
cache.set(cache_key, updatedStats)
return updatedStats
} else {
cache.set(cache_key, newStats)
return newStats
}
}
const NETWORK_PROBE_FIELDS = "id,name,system,target,protocol,port,interval,latency,loss,enabled,updated"
interface UseNetworkProbesProps {
systemId?: string
loadStats?: boolean
chartTime?: ChartTimes
existingProbes?: NetworkProbeRecord[]
}
export function useNetworkProbesData(props: UseNetworkProbesProps) {
const { systemId, loadStats, chartTime, existingProbes } = props
const [p, setProbes] = useState<NetworkProbeRecord[]>([])
const [probeStats, setProbeStats] = useState<NetworkProbeStatsRecord[]>([])
const statsRequestId = useRef(0)
const pendingProbeEvents = useRef(new Map<string, RecordSubscription<NetworkProbeRecord>>())
const probeBatchTimeout = useRef<ReturnType<typeof setTimeout> | null>(null)
const probes = existingProbes ?? p
// clear old data when systemId changes
// useEffect(() => {
// return setProbes([])
// }, [systemId])
// initial load - fetch probes if not provided by caller
useEffect(() => {
if (!existingProbes) {
fetchProbes(systemId).then((probes) => setProbes(probes))
}
}, [systemId])
// Subscribe to updates if probes not provided by caller
useEffect(() => {
if (existingProbes) {
return
}
let unsubscribe: (() => void) | undefined
function flushPendingProbeEvents() {
probeBatchTimeout.current = null
if (!pendingProbeEvents.current.size) {
return
}
const events = pendingProbeEvents.current
pendingProbeEvents.current = new Map()
setProbes((currentProbes) => {
return applyProbeEvents(currentProbes ?? [], events.values(), systemId)
})
}
const pbOptions: RecordListOptions = { fields: NETWORK_PROBE_FIELDS }
if (systemId) {
pbOptions.filter = pb.filter("system = {:system}", { system: systemId })
}
;(async () => {
try {
unsubscribe = await pb.collection<NetworkProbeRecord>("network_probes").subscribe(
"*",
(event) => {
pendingProbeEvents.current.set(event.record.id, event)
if (!probeBatchTimeout.current) {
probeBatchTimeout.current = setTimeout(flushPendingProbeEvents, 50)
}
},
pbOptions
)
} catch (error) {
console.error("Failed to subscribe to probes", error)
}
})()
return () => {
if (probeBatchTimeout.current !== null) {
clearTimeout(probeBatchTimeout.current)
probeBatchTimeout.current = null
}
pendingProbeEvents.current.clear()
unsubscribe?.()
}
}, [systemId])
// Subscribe to new probe stats
useEffect(() => {
if (!loadStats || !systemId) {
return
}
let unsubscribe: (() => void) | undefined
const pbOptions = {
fields: "stats,created,type",
filter: pb.filter("system = {:system}", { system: systemId }),
}
;(async () => {
try {
unsubscribe = await pb.collection<NetworkProbeStatsRecord>("network_probe_stats").subscribe(
"*",
(event) => {
if (!chartTime || event.action !== "create") {
return
}
// if (typeof event.record.created === "string") {
// event.record.created = new Date(event.record.created).getTime()
// }
// return if not current chart time
// we could append to other chart times, but we would need to check the timestamps
// to make sure they fit in correctly, so for simplicity just ignore non-chart-time updates
// and fetch them via API when the user switches to that chart time
const chartTimeRecordType = chartTimeData[chartTime].type as ChartTimes
if (event.record.type !== chartTimeRecordType) {
// const lastCreated = getCacheValue(systemId, chartTime)?.at(-1)?.created ?? 0
// if (lastCreated) {
// // if the new record is close enough to the last cached record, append it to the cache so it's available immediately if the user switches to that chart time
// const { expectedInterval } = chartTimeData[chartTime]
// if (event.record.created - lastCreated < expectedInterval * 1.5) {
// console.log(
// `Caching out-of-chart-time probe stats record for chart time ${chartTime} (record type: ${event.record.type})`
// )
// const newStats = appendCacheValue(systemId, chartTime, [event.record])
// cache.set(`${systemId}${chartTime}`, newStats)
// }
// }
// console.log(`Received probe stats for non-current chart time (${event.record.type}), ignoring for now`)
return
}
// console.log("Appending new probe stats to chart:", event.record)
const newStats = appendCacheValue(systemId, chartTime, [event.record])
setProbeStats(newStats)
},
pbOptions
)
} catch (error) {
console.error("Failed to subscribe to probe stats:", error)
}
})()
return () => unsubscribe?.()
}, [systemId])
// fetch missing probe stats on load and when chart time changes
useEffect(() => {
if (!loadStats || !systemId || !chartTime || chartTime === "1m") {
return
}
const { expectedInterval } = chartTimeData[chartTime]
const requestId = ++statsRequestId.current
const cachedProbeStats = getCacheValue(systemId, chartTime)
// Render from cache immediately if available
if (cachedProbeStats.length) {
setProbeStats(cachedProbeStats)
// Skip the fetch if the latest cached point is recent enough that no new point is expected yet
const lastCreated = cachedProbeStats.at(-1)?.created
if (lastCreated && Date.now() - lastCreated < expectedInterval * 0.9) {
console.log("Using cached probe stats, skipping fetch")
return
}
}
getStats<NetworkProbeStatsRecord>("network_probe_stats", systemId, chartTime, cachedProbeStats).then(
(probeStats) => {
// If another request has been made since this one, ignore the results
if (requestId !== statsRequestId.current) {
return
}
const newStats = appendCacheValue(systemId, chartTime, probeStats)
setProbeStats(newStats)
}
)
}, [chartTime])
// subscribe to realtime metrics if chart time is 1m
useEffect(() => {
if (!loadStats || !systemId || chartTime !== "1m") {
return
}
let unsubscribe: (() => void) | undefined
const cache_key = `${systemId}rt`
pb.realtime
.subscribe(
`rt_metrics`,
(data: { Probes: NetworkProbeStatsRecord["stats"] }) => {
let prev = getCacheValue(systemId, "rt")
const now = Date.now()
// if no previous data or the last data point is older than 1min,
// create a new data set starting with a point 1 second ago to seed the chart data
if (!prev || (prev.at(-1)?.created ?? 0) < now - 60_000) {
prev = [{ created: now - 1000, stats: probesToStats(probes) }]
}
const stats = { created: now, stats: data.Probes } as NetworkProbeStatsRecord
const newStats = appendData(prev, [stats], 1000, 120)
setProbeStats(() => newStats)
cache.set(cache_key, newStats)
},
{ query: { system: systemId } }
)
.then((us) => {
unsubscribe = us
})
return () => unsubscribe?.()
}, [chartTime, systemId])
return {
probes,
probeStats,
}
}
export function probeKey(p: NetworkProbeRecord) {
if (p.protocol === "tcp") return `${p.protocol}:${p.target}:${p.port}`
return `${p.protocol}:${p.target}`
}
function probesToStats(probes: NetworkProbeRecord[]): NetworkProbeStatsRecord["stats"] {
const stats: NetworkProbeStatsRecord["stats"] = {}
for (const probe of probes) {
const key = probeKey(probe)
stats[key] = [probe.latency, 0, 0, probe.loss]
}
return stats
}
async function fetchProbes(systemId?: string) {
try {
const res = await pb.collection<NetworkProbeRecord>("network_probes").getList(0, 2000, {
fields: NETWORK_PROBE_FIELDS,
filter: systemId ? pb.filter("system={:system}", { system: systemId }) : undefined,
})
return res.items
} catch (error) {
toast({
title: "Error",
description: (error as Error)?.message,
variant: "destructive",
})
return []
}
}
function applyProbeEvents(
probes: NetworkProbeRecord[],
events: Iterable<RecordSubscription<NetworkProbeRecord>>,
systemId?: string
) {
// Use a map to handle updates/deletes in constant time
const probeById = new Map(probes.map((probe) => [probe.id, probe]))
const createdProbes: NetworkProbeRecord[] = []
for (const { action, record } of events) {
const matchesSystemScope = !systemId || record.system === systemId
if (action === "delete" || !matchesSystemScope) {
probeById.delete(record.id)
continue
}
if (!probeById.has(record.id)) {
createdProbes.push(record)
}
probeById.set(record.id, record)
}
const nextProbes: NetworkProbeRecord[] = []
// Prepend brand new probes (matching previous behavior)
for (let index = createdProbes.length - 1; index >= 0; index -= 1) {
nextProbes.push(createdProbes[index])
}
// Rebuild the final list while preserving original order for existing probes
for (const probe of probes) {
const nextProbe = probeById.get(probe.id)
if (!nextProbe) {
continue
}
nextProbes.push(nextProbe)
probeById.delete(probe.id)
}
return nextProbes
}

View File

@@ -30,7 +30,6 @@ const LoginPage = lazy(() => import("@/components/login/login.tsx"))
const Home = lazy(() => import("@/components/routes/home.tsx"))
const Containers = lazy(() => import("@/components/routes/containers.tsx"))
const Smart = lazy(() => import("@/components/routes/smart.tsx"))
const Probes = lazy(() => import("@/components/routes/probes.tsx"))
const SystemDetail = lazy(() => import("@/components/routes/system.tsx"))
const CopyToClipboardDialog = lazy(() => import("@/components/copy-to-clipboard.tsx"))
@@ -80,8 +79,6 @@ const App = memo(() => {
return <Containers />
} else if (page.route === "smart") {
return <Smart />
} else if (page.route === "probes") {
return <Probes />
} else if (page.route === "settings") {
return <Settings />
}

View File

@@ -316,6 +316,8 @@ export interface ChartData {
systemStats: SystemStatsRecord[]
containerData: ChartDataContainer[]
orientation: "right" | "left"
ticks: number[]
domain: number[]
chartTime: ChartTimes
}
@@ -544,35 +546,3 @@ export interface UpdateInfo {
v: string // new version
url: string // url to new version
}
export interface NetworkProbeRecord {
id: string
system: string
name: string
target: string
protocol: "icmp" | "tcp" | "http"
port: number
latency: number
loss: number
interval: number
enabled: boolean
updated: string
}
/**
* 0: avg latency in ms
*
* 1: min latency in ms
*
* 2: max latency in ms
*
* 3: packet loss in %
*/
type ProbeResult = number[]
export interface NetworkProbeStatsRecord {
id?: string
type?: string
stats: Record<string, ProbeResult>
created: number // unix timestamp (ms) for Recharts xAxis
}