From 6472af1ba4f24853d37835819345e21024d836be Mon Sep 17 00:00:00 2001 From: henrygd Date: Tue, 21 Apr 2026 21:57:24 -0400 Subject: [PATCH] updates --- agent/agent.go | 2 +- agent/probe.go | 6 +- .../site/src/components/charts/area-chart.tsx | 2 +- .../site/src/components/charts/line-chart.tsx | 2 +- .../components/routes/system/chart-data.ts | 22 --- .../routes/system/charts/probes-charts.tsx | 9 +- .../routes/system/use-system-data.ts | 7 +- internal/site/src/components/ui/chart.tsx | 52 ++++--- internal/site/src/lib/use-network-probes.ts | 146 ++++++++++++++++-- internal/site/src/types.d.ts | 4 +- 10 files changed, 177 insertions(+), 75 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 888a4d79..0a5cbf10 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -183,7 +183,7 @@ func (a *Agent) gatherStats(options common.DataRequestOptions) *system.CombinedD } if a.probeManager != nil { - data.Probes = a.probeManager.GetResults() + data.Probes = a.probeManager.GetResults(cacheTimeMs) slog.Debug("Probes", "data", data.Probes) } diff --git a/agent/probe.go b/agent/probe.go index 1e28589b..e16be83e 100644 --- a/agent/probe.go +++ b/agent/probe.go @@ -73,13 +73,13 @@ func (pm *ProbeManager) SyncProbes(configs []probe.Config) { } } -// GetResults returns aggregated results for all probes over the last 60s window. -func (pm *ProbeManager) GetResults() map[string]probe.Result { +// GetResults returns aggregated results for all probes over the last supplied duration in ms. +func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result { pm.mu.RLock() defer pm.mu.RUnlock() results := make(map[string]probe.Result, len(pm.probes)) - cutoff := time.Now().Add(-60 * time.Second) + cutoff := time.Now().Add(-time.Duration(durationMs) * time.Millisecond) for key, task := range pm.probes { task.mu.Lock() diff --git a/internal/site/src/components/charts/area-chart.tsx b/internal/site/src/components/charts/area-chart.tsx index 052de9c7..09844c88 100644 --- a/internal/site/src/components/charts/area-chart.tsx +++ b/internal/site/src/components/charts/area-chart.tsx @@ -146,7 +146,7 @@ export default function AreaChartDefault({ axisLine={false} /> )} - {xAxis(chartData)} + {xAxis(chartData.chartTime, displayData.at(-1)?.created as number)} )} - {xAxis(chartData)} + {xAxis(chartData.chartTime, displayData.at(-1)?.created as number)} () -// create ticks and domain for charts -export function getTimeData(chartTime: ChartTimes, lastCreated: number) { - const cached = cache.get("td") as ChartTimeData | undefined - if (cached && cached.chartTime === chartTime) { - if (!lastCreated || cached.time >= lastCreated) { - return cached.data - } - } - - // const buffer = chartTime === "1m" ? 400 : 20_000 - const now = new Date(Date.now()) - const startTime = chartTimeData[chartTime].getOffset(now) - const ticks = timeTicks(startTime, now, chartTimeData[chartTime].ticks ?? 12).map((date) => date.getTime()) - const data = { - ticks, - domain: [chartTimeData[chartTime].getOffset(now).getTime(), now.getTime()], - } - cache.set("td", { time: now.getTime(), data, chartTime }) - return data -} - /** Append new records onto prev with gap detection. Converts string `created` values to ms timestamps in place. * Pass `maxLen` to cap the result length in one copy instead of slicing again after the call. */ export function appendData( diff --git a/internal/site/src/components/routes/system/charts/probes-charts.tsx b/internal/site/src/components/routes/system/charts/probes-charts.tsx index 8cd91e11..a0cacf43 100644 --- a/internal/site/src/components/routes/system/charts/probes-charts.tsx +++ b/internal/site/src/components/routes/system/charts/probes-charts.tsx @@ -7,11 +7,7 @@ import type { ChartData, NetworkProbeRecord, NetworkProbeStatsRecord } from "@/t import { useMemo } from "react" import { atom } from "nanostores" import { useStore } from "@nanostores/react" - -function probeKey(p: NetworkProbeRecord) { - if (p.protocol === "tcp") return `${p.protocol}:${p.target}:${p.port}` - return `${p.protocol}:${p.target}` -} +import { probeKey } from "@/lib/use-network-probes" const $filter = atom("") @@ -48,7 +44,7 @@ function ProbeChart({ const filter = useStore($filter) const { dataPoints, visibleKeys } = useMemo(() => { - const sortedProbes = [...probes].sort((a, b) => a.name.localeCompare(b.name)) + const sortedProbes = [...probes].sort((a, b) => b.latency - a.latency) const count = sortedProbes.length const points: DataPoint[] = [] const visibleKeys: string[] = [] @@ -67,6 +63,7 @@ function ProbeChart({ } visibleKeys.push(key) points.push({ + order: i, label: p.name || p.target, dataKey: (record: NetworkProbeStatsRecord) => record.stats?.[key]?.[valueIndex] ?? "-", color: count <= 5 ? i + 1 : `hsl(${(i * 360) / count}, var(--chart-saturation), var(--chart-lightness))`, diff --git a/internal/site/src/components/routes/system/use-system-data.ts b/internal/site/src/components/routes/system/use-system-data.ts index 27aeaec3..e1258abe 100644 --- a/internal/site/src/components/routes/system/use-system-data.ts +++ b/internal/site/src/components/routes/system/use-system-data.ts @@ -26,7 +26,7 @@ import type { SystemStatsRecord, } from "@/types" import { $router, navigate } from "../../router" -import { appendData, cache, getStats, getTimeData, makeContainerData, makeContainerPoint } from "./chart-data" +import { appendData, cache, getStats, makeContainerData, makeContainerPoint } from "./chart-data" export type SystemData = ReturnType @@ -151,16 +151,11 @@ export function useSystemData(id: string) { const agentVersion = useMemo(() => parseSemVer(system?.info?.v), [system?.info?.v]) const chartData: ChartData = useMemo(() => { - const lastCreated = Math.max( - (systemStats.at(-1)?.created as number) ?? 0, - (containerData.at(-1)?.created as number) ?? 0 - ) return { systemStats, containerData, chartTime, orientation: direction === "rtl" ? "right" : "left", - ...getTimeData(chartTime, lastCreated), agentVersion, } }, [systemStats, containerData, direction]) diff --git a/internal/site/src/components/ui/chart.tsx b/internal/site/src/components/ui/chart.tsx index 43ea9445..c5829605 100644 --- a/internal/site/src/components/ui/chart.tsx +++ b/internal/site/src/components/ui/chart.tsx @@ -3,9 +3,10 @@ import { useLingui } from "@lingui/react/macro" import * as React from "react" import * as RechartsPrimitive from "recharts" import { chartTimeData, cn } from "@/lib/utils" -import type { ChartData } from "@/types" +import type { ChartTimes } from "@/types" import { Separator } from "./separator" import { AxisDomain } from "recharts/types/util/types" +import { timeTicks } from "d3-time" // Format: { THEME_NAME: CSS_SELECTOR } const THEMES = { light: "", dark: ".dark" } as const @@ -400,26 +401,37 @@ function getPayloadConfigFromPayload(config: ChartConfig, payload: unknown, key: return configLabelKey in config ? config[configLabelKey] : config[key as keyof typeof config] } -let cachedAxis: JSX.Element -const xAxis = ({ domain, ticks, chartTime }: ChartData) => { - if (cachedAxis && ticks === cachedAxis.props.ticks) { - return cachedAxis +let cachedAxis: { + time: number + el: JSX.Element +} + +const xAxis = (chartTime: ChartTimes, lastCreationTime: number) => { + if (Math.abs(lastCreationTime - cachedAxis?.time) < 1000) { + return cachedAxis.el } - cachedAxis = ( - - ) - return cachedAxis + const now = new Date(lastCreationTime + 1000) + const startTime = chartTimeData[chartTime].getOffset(now) + const ticks = timeTicks(startTime, now, chartTimeData[chartTime].ticks ?? 12).map((date) => date.getTime()) + const domain = [chartTimeData[chartTime].getOffset(now).getTime(), now.getTime()] + cachedAxis = { + time: lastCreationTime, + el: ( + + ), + } + return cachedAxis.el } export { diff --git a/internal/site/src/lib/use-network-probes.ts b/internal/site/src/lib/use-network-probes.ts index 1b02ca22..24ed9735 100644 --- a/internal/site/src/lib/use-network-probes.ts +++ b/internal/site/src/lib/use-network-probes.ts @@ -8,6 +8,29 @@ import type { RecordListOptions, RecordSubscription } from "pocketbase" const cache = new Map() +function getCacheValue(systemId: string, chartTime: ChartTimes | "rt") { + return cache.get(`${systemId}${chartTime}`) || [] +} + +function appendCacheValue( + systemId: string, + chartTime: ChartTimes | "rt", + newStats: NetworkProbeStatsRecord[], + maxPoints = 100 +) { + const cache_key = `${systemId}${chartTime}` + const existingStats = getCacheValue(systemId, chartTime) + if (existingStats) { + const { expectedInterval } = chartTimeData[chartTime] + const updatedStats = appendData(existingStats, newStats, expectedInterval, maxPoints) + cache.set(cache_key, updatedStats) + return updatedStats + } else { + cache.set(cache_key, newStats) + return newStats + } +} + const NETWORK_PROBE_FIELDS = "id,name,system,target,protocol,port,interval,latency,loss,enabled,updated" interface UseNetworkProbesProps { @@ -91,25 +114,83 @@ export function useNetworkProbesData(props: UseNetworkProbesProps) { } }, [systemId]) - // fetch probe stats when probes update + // Subscribe to new probe stats + useEffect(() => { + if (!loadStats || !systemId) { + return + } + let unsubscribe: (() => void) | undefined + const pbOptions = { + fields: "stats,created,type", + filter: pb.filter("system = {:system}", { system: systemId }), + } + + ;(async () => { + try { + unsubscribe = await pb.collection("network_probe_stats").subscribe( + "*", + (event) => { + if (!chartTime || event.action !== "create") { + return + } + // if (typeof event.record.created === "string") { + // event.record.created = new Date(event.record.created).getTime() + // } + // return if not current chart time + // we could append to other chart times, but we would need to check the timestamps + // to make sure they fit in correctly, so for simplicity just ignore non-chart-time updates + // and fetch them via API when the user switches to that chart time + const chartTimeRecordType = chartTimeData[chartTime].type as ChartTimes + if (event.record.type !== chartTimeRecordType) { + // const lastCreated = getCacheValue(systemId, chartTime)?.at(-1)?.created ?? 0 + // if (lastCreated) { + // // if the new record is close enough to the last cached record, append it to the cache so it's available immediately if the user switches to that chart time + // const { expectedInterval } = chartTimeData[chartTime] + // if (event.record.created - lastCreated < expectedInterval * 1.5) { + // console.log( + // `Caching out-of-chart-time probe stats record for chart time ${chartTime} (record type: ${event.record.type})` + // ) + // const newStats = appendCacheValue(systemId, chartTime, [event.record]) + // cache.set(`${systemId}${chartTime}`, newStats) + // } + // } + // console.log(`Received probe stats for non-current chart time (${event.record.type}), ignoring for now`) + return + } + + // console.log("Appending new probe stats to chart:", event.record) + const newStats = appendCacheValue(systemId, chartTime, [event.record]) + setProbeStats(newStats) + }, + pbOptions + ) + } catch (error) { + console.error("Failed to subscribe to probe stats:", error) + } + })() + + return () => unsubscribe?.() + }, [systemId]) + + // fetch missing probe stats on load and when chart time changes useEffect(() => { if (!loadStats || !systemId || !chartTime || chartTime === "1m") { return } const { expectedInterval } = chartTimeData[chartTime] - const cache_key = `${systemId}${chartTime}` const requestId = ++statsRequestId.current - const cachedProbeStats = cache.get(cache_key) as NetworkProbeStatsRecord[] | undefined + const cachedProbeStats = getCacheValue(systemId, chartTime) // Render from cache immediately if available - if (cachedProbeStats?.length) { + if (cachedProbeStats.length) { setProbeStats(cachedProbeStats) // Skip the fetch if the latest cached point is recent enough that no new point is expected yet const lastCreated = cachedProbeStats.at(-1)?.created if (lastCreated && Date.now() - lastCreated < expectedInterval * 0.9) { + console.log("Using cached probe stats, skipping fetch") return } } @@ -120,17 +201,42 @@ export function useNetworkProbesData(props: UseNetworkProbesProps) { if (requestId !== statsRequestId.current) { return } - - // make new system stats - let probeStatsData = (cache.get(cache_key) || []) as NetworkProbeStatsRecord[] - if (probeStats.length) { - probeStatsData = appendData(probeStatsData, probeStats, expectedInterval, 100) - cache.set(cache_key, probeStatsData) - } - setProbeStats(probeStatsData) + const newStats = appendCacheValue(systemId, chartTime, probeStats) + setProbeStats(newStats) } ) - }, [chartTime, probes]) + }, [chartTime]) + + // subscribe to realtime metrics if chart time is 1m + useEffect(() => { + if (!loadStats || !systemId || chartTime !== "1m") { + return + } + let unsubscribe: (() => void) | undefined + const cache_key = `${systemId}rt` + pb.realtime + .subscribe( + `rt_metrics`, + (data: { Probes: NetworkProbeStatsRecord["stats"] }) => { + let prev = getCacheValue(systemId, "rt") + const now = Date.now() + // if no previous data or the last data point is older than 1min, + // create a new data set starting with a point 1 second ago to seed the chart data + if (!prev || (prev.at(-1)?.created ?? 0) < now - 60_000) { + prev = [{ created: now - 1000, stats: probesToStats(probes) }] + } + const stats = { created: now, stats: data.Probes } as NetworkProbeStatsRecord + const newStats = appendData(prev, [stats], 1000, 120) + setProbeStats(() => newStats) + cache.set(cache_key, newStats) + }, + { query: { system: systemId } } + ) + .then((us) => { + unsubscribe = us + }) + return () => unsubscribe?.() + }, [chartTime, systemId]) return { probes, @@ -138,6 +244,20 @@ export function useNetworkProbesData(props: UseNetworkProbesProps) { } } +export function probeKey(p: NetworkProbeRecord) { + if (p.protocol === "tcp") return `${p.protocol}:${p.target}:${p.port}` + return `${p.protocol}:${p.target}` +} + +function probesToStats(probes: NetworkProbeRecord[]): NetworkProbeStatsRecord["stats"] { + const stats: NetworkProbeStatsRecord["stats"] = {} + for (const probe of probes) { + const key = probeKey(probe) + stats[key] = [probe.latency, 0, 0, probe.loss] + } + return stats +} + async function fetchProbes(systemId?: string) { try { const res = await pb.collection("network_probes").getList(0, 2000, { diff --git a/internal/site/src/types.d.ts b/internal/site/src/types.d.ts index c476fa1c..f1429528 100644 --- a/internal/site/src/types.d.ts +++ b/internal/site/src/types.d.ts @@ -316,8 +316,6 @@ export interface ChartData { systemStats: SystemStatsRecord[] containerData: ChartDataContainer[] orientation: "right" | "left" - ticks: number[] - domain: number[] chartTime: ChartTimes } @@ -573,6 +571,8 @@ export interface NetworkProbeRecord { type ProbeResult = number[] export interface NetworkProbeStatsRecord { + id?: string + type?: string stats: Record created: number // unix timestamp (ms) for Recharts xAxis }