This commit is contained in:
henrygd
2026-04-21 21:57:24 -04:00
parent e931165566
commit 6472af1ba4
10 changed files with 177 additions and 75 deletions

View File

@@ -183,7 +183,7 @@ func (a *Agent) gatherStats(options common.DataRequestOptions) *system.CombinedD
}
if a.probeManager != nil {
data.Probes = a.probeManager.GetResults()
data.Probes = a.probeManager.GetResults(cacheTimeMs)
slog.Debug("Probes", "data", data.Probes)
}

View File

@@ -73,13 +73,13 @@ func (pm *ProbeManager) SyncProbes(configs []probe.Config) {
}
}
// GetResults returns aggregated results for all probes over the last 60s window.
func (pm *ProbeManager) GetResults() map[string]probe.Result {
// GetResults returns aggregated results for all probes over the last supplied duration in ms.
func (pm *ProbeManager) GetResults(durationMs uint16) map[string]probe.Result {
pm.mu.RLock()
defer pm.mu.RUnlock()
results := make(map[string]probe.Result, len(pm.probes))
cutoff := time.Now().Add(-60 * time.Second)
cutoff := time.Now().Add(-time.Duration(durationMs) * time.Millisecond)
for key, task := range pm.probes {
task.mu.Lock()

View File

@@ -146,7 +146,7 @@ export default function AreaChartDefault({
axisLine={false}
/>
)}
{xAxis(chartData)}
{xAxis(chartData.chartTime, displayData.at(-1)?.created as number)}
<ChartTooltip
animationEasing="ease-out"
animationDuration={150}

View File

@@ -148,7 +148,7 @@ export default function LineChartDefault({
axisLine={false}
/>
)}
{xAxis(chartData)}
{xAxis(chartData.chartTime, displayData.at(-1)?.created as number)}
<ChartTooltip
animationEasing="ease-out"
animationDuration={150}

View File

@@ -1,4 +1,3 @@
import { timeTicks } from "d3-time"
import { getPbTimestamp, pb } from "@/lib/api"
import { chartTimeData } from "@/lib/utils"
import type { ChartData, ChartTimes, ContainerStatsRecord, NetworkProbeStatsRecord, SystemStatsRecord } from "@/types"
@@ -17,27 +16,6 @@ export const cache = new Map<
ChartTimeData | SystemStatsRecord[] | ContainerStatsRecord[] | ChartData["containerData"]
>()
// create ticks and domain for charts
export function getTimeData(chartTime: ChartTimes, lastCreated: number) {
const cached = cache.get("td") as ChartTimeData | undefined
if (cached && cached.chartTime === chartTime) {
if (!lastCreated || cached.time >= lastCreated) {
return cached.data
}
}
// const buffer = chartTime === "1m" ? 400 : 20_000
const now = new Date(Date.now())
const startTime = chartTimeData[chartTime].getOffset(now)
const ticks = timeTicks(startTime, now, chartTimeData[chartTime].ticks ?? 12).map((date) => date.getTime())
const data = {
ticks,
domain: [chartTimeData[chartTime].getOffset(now).getTime(), now.getTime()],
}
cache.set("td", { time: now.getTime(), data, chartTime })
return data
}
/** Append new records onto prev with gap detection. Converts string `created` values to ms timestamps in place.
* Pass `maxLen` to cap the result length in one copy instead of slicing again after the call. */
export function appendData<T extends { created: string | number | null }>(

View File

@@ -7,11 +7,7 @@ import type { ChartData, NetworkProbeRecord, NetworkProbeStatsRecord } from "@/t
import { useMemo } from "react"
import { atom } from "nanostores"
import { useStore } from "@nanostores/react"
function probeKey(p: NetworkProbeRecord) {
if (p.protocol === "tcp") return `${p.protocol}:${p.target}:${p.port}`
return `${p.protocol}:${p.target}`
}
import { probeKey } from "@/lib/use-network-probes"
const $filter = atom("")
@@ -48,7 +44,7 @@ function ProbeChart({
const filter = useStore($filter)
const { dataPoints, visibleKeys } = useMemo(() => {
const sortedProbes = [...probes].sort((a, b) => a.name.localeCompare(b.name))
const sortedProbes = [...probes].sort((a, b) => b.latency - a.latency)
const count = sortedProbes.length
const points: DataPoint<NetworkProbeStatsRecord>[] = []
const visibleKeys: string[] = []
@@ -67,6 +63,7 @@ function ProbeChart({
}
visibleKeys.push(key)
points.push({
order: i,
label: p.name || p.target,
dataKey: (record: NetworkProbeStatsRecord) => record.stats?.[key]?.[valueIndex] ?? "-",
color: count <= 5 ? i + 1 : `hsl(${(i * 360) / count}, var(--chart-saturation), var(--chart-lightness))`,

View File

@@ -26,7 +26,7 @@ import type {
SystemStatsRecord,
} from "@/types"
import { $router, navigate } from "../../router"
import { appendData, cache, getStats, getTimeData, makeContainerData, makeContainerPoint } from "./chart-data"
import { appendData, cache, getStats, makeContainerData, makeContainerPoint } from "./chart-data"
export type SystemData = ReturnType<typeof useSystemData>
@@ -151,16 +151,11 @@ export function useSystemData(id: string) {
const agentVersion = useMemo(() => parseSemVer(system?.info?.v), [system?.info?.v])
const chartData: ChartData = useMemo(() => {
const lastCreated = Math.max(
(systemStats.at(-1)?.created as number) ?? 0,
(containerData.at(-1)?.created as number) ?? 0
)
return {
systemStats,
containerData,
chartTime,
orientation: direction === "rtl" ? "right" : "left",
...getTimeData(chartTime, lastCreated),
agentVersion,
}
}, [systemStats, containerData, direction])

View File

@@ -3,9 +3,10 @@ import { useLingui } from "@lingui/react/macro"
import * as React from "react"
import * as RechartsPrimitive from "recharts"
import { chartTimeData, cn } from "@/lib/utils"
import type { ChartData } from "@/types"
import type { ChartTimes } from "@/types"
import { Separator } from "./separator"
import { AxisDomain } from "recharts/types/util/types"
import { timeTicks } from "d3-time"
// Format: { THEME_NAME: CSS_SELECTOR }
const THEMES = { light: "", dark: ".dark" } as const
@@ -400,26 +401,37 @@ function getPayloadConfigFromPayload(config: ChartConfig, payload: unknown, key:
return configLabelKey in config ? config[configLabelKey] : config[key as keyof typeof config]
}
let cachedAxis: JSX.Element
const xAxis = ({ domain, ticks, chartTime }: ChartData) => {
if (cachedAxis && ticks === cachedAxis.props.ticks) {
return cachedAxis
let cachedAxis: {
time: number
el: JSX.Element
}
const xAxis = (chartTime: ChartTimes, lastCreationTime: number) => {
if (Math.abs(lastCreationTime - cachedAxis?.time) < 1000) {
return cachedAxis.el
}
cachedAxis = (
<RechartsPrimitive.XAxis
dataKey="created"
domain={domain}
ticks={ticks}
allowDataOverflow
type="number"
scale="time"
minTickGap={12}
tickMargin={8}
axisLine={false}
tickFormatter={chartTimeData[chartTime].format}
/>
)
return cachedAxis
const now = new Date(lastCreationTime + 1000)
const startTime = chartTimeData[chartTime].getOffset(now)
const ticks = timeTicks(startTime, now, chartTimeData[chartTime].ticks ?? 12).map((date) => date.getTime())
const domain = [chartTimeData[chartTime].getOffset(now).getTime(), now.getTime()]
cachedAxis = {
time: lastCreationTime,
el: (
<RechartsPrimitive.XAxis
dataKey="created"
domain={domain}
ticks={ticks}
allowDataOverflow
type="number"
scale="time"
minTickGap={12}
tickMargin={8}
axisLine={false}
tickFormatter={chartTimeData[chartTime].format}
/>
),
}
return cachedAxis.el
}
export {

View File

@@ -8,6 +8,29 @@ import type { RecordListOptions, RecordSubscription } from "pocketbase"
const cache = new Map<string, NetworkProbeStatsRecord[]>()
function getCacheValue(systemId: string, chartTime: ChartTimes | "rt") {
return cache.get(`${systemId}${chartTime}`) || []
}
function appendCacheValue(
systemId: string,
chartTime: ChartTimes | "rt",
newStats: NetworkProbeStatsRecord[],
maxPoints = 100
) {
const cache_key = `${systemId}${chartTime}`
const existingStats = getCacheValue(systemId, chartTime)
if (existingStats) {
const { expectedInterval } = chartTimeData[chartTime]
const updatedStats = appendData(existingStats, newStats, expectedInterval, maxPoints)
cache.set(cache_key, updatedStats)
return updatedStats
} else {
cache.set(cache_key, newStats)
return newStats
}
}
const NETWORK_PROBE_FIELDS = "id,name,system,target,protocol,port,interval,latency,loss,enabled,updated"
interface UseNetworkProbesProps {
@@ -91,25 +114,83 @@ export function useNetworkProbesData(props: UseNetworkProbesProps) {
}
}, [systemId])
// fetch probe stats when probes update
// Subscribe to new probe stats
useEffect(() => {
if (!loadStats || !systemId) {
return
}
let unsubscribe: (() => void) | undefined
const pbOptions = {
fields: "stats,created,type",
filter: pb.filter("system = {:system}", { system: systemId }),
}
;(async () => {
try {
unsubscribe = await pb.collection<NetworkProbeStatsRecord>("network_probe_stats").subscribe(
"*",
(event) => {
if (!chartTime || event.action !== "create") {
return
}
// if (typeof event.record.created === "string") {
// event.record.created = new Date(event.record.created).getTime()
// }
// return if not current chart time
// we could append to other chart times, but we would need to check the timestamps
// to make sure they fit in correctly, so for simplicity just ignore non-chart-time updates
// and fetch them via API when the user switches to that chart time
const chartTimeRecordType = chartTimeData[chartTime].type as ChartTimes
if (event.record.type !== chartTimeRecordType) {
// const lastCreated = getCacheValue(systemId, chartTime)?.at(-1)?.created ?? 0
// if (lastCreated) {
// // if the new record is close enough to the last cached record, append it to the cache so it's available immediately if the user switches to that chart time
// const { expectedInterval } = chartTimeData[chartTime]
// if (event.record.created - lastCreated < expectedInterval * 1.5) {
// console.log(
// `Caching out-of-chart-time probe stats record for chart time ${chartTime} (record type: ${event.record.type})`
// )
// const newStats = appendCacheValue(systemId, chartTime, [event.record])
// cache.set(`${systemId}${chartTime}`, newStats)
// }
// }
// console.log(`Received probe stats for non-current chart time (${event.record.type}), ignoring for now`)
return
}
// console.log("Appending new probe stats to chart:", event.record)
const newStats = appendCacheValue(systemId, chartTime, [event.record])
setProbeStats(newStats)
},
pbOptions
)
} catch (error) {
console.error("Failed to subscribe to probe stats:", error)
}
})()
return () => unsubscribe?.()
}, [systemId])
// fetch missing probe stats on load and when chart time changes
useEffect(() => {
if (!loadStats || !systemId || !chartTime || chartTime === "1m") {
return
}
const { expectedInterval } = chartTimeData[chartTime]
const cache_key = `${systemId}${chartTime}`
const requestId = ++statsRequestId.current
const cachedProbeStats = cache.get(cache_key) as NetworkProbeStatsRecord[] | undefined
const cachedProbeStats = getCacheValue(systemId, chartTime)
// Render from cache immediately if available
if (cachedProbeStats?.length) {
if (cachedProbeStats.length) {
setProbeStats(cachedProbeStats)
// Skip the fetch if the latest cached point is recent enough that no new point is expected yet
const lastCreated = cachedProbeStats.at(-1)?.created
if (lastCreated && Date.now() - lastCreated < expectedInterval * 0.9) {
console.log("Using cached probe stats, skipping fetch")
return
}
}
@@ -120,17 +201,42 @@ export function useNetworkProbesData(props: UseNetworkProbesProps) {
if (requestId !== statsRequestId.current) {
return
}
// make new system stats
let probeStatsData = (cache.get(cache_key) || []) as NetworkProbeStatsRecord[]
if (probeStats.length) {
probeStatsData = appendData(probeStatsData, probeStats, expectedInterval, 100)
cache.set(cache_key, probeStatsData)
}
setProbeStats(probeStatsData)
const newStats = appendCacheValue(systemId, chartTime, probeStats)
setProbeStats(newStats)
}
)
}, [chartTime, probes])
}, [chartTime])
// subscribe to realtime metrics if chart time is 1m
useEffect(() => {
if (!loadStats || !systemId || chartTime !== "1m") {
return
}
let unsubscribe: (() => void) | undefined
const cache_key = `${systemId}rt`
pb.realtime
.subscribe(
`rt_metrics`,
(data: { Probes: NetworkProbeStatsRecord["stats"] }) => {
let prev = getCacheValue(systemId, "rt")
const now = Date.now()
// if no previous data or the last data point is older than 1min,
// create a new data set starting with a point 1 second ago to seed the chart data
if (!prev || (prev.at(-1)?.created ?? 0) < now - 60_000) {
prev = [{ created: now - 1000, stats: probesToStats(probes) }]
}
const stats = { created: now, stats: data.Probes } as NetworkProbeStatsRecord
const newStats = appendData(prev, [stats], 1000, 120)
setProbeStats(() => newStats)
cache.set(cache_key, newStats)
},
{ query: { system: systemId } }
)
.then((us) => {
unsubscribe = us
})
return () => unsubscribe?.()
}, [chartTime, systemId])
return {
probes,
@@ -138,6 +244,20 @@ export function useNetworkProbesData(props: UseNetworkProbesProps) {
}
}
export function probeKey(p: NetworkProbeRecord) {
if (p.protocol === "tcp") return `${p.protocol}:${p.target}:${p.port}`
return `${p.protocol}:${p.target}`
}
function probesToStats(probes: NetworkProbeRecord[]): NetworkProbeStatsRecord["stats"] {
const stats: NetworkProbeStatsRecord["stats"] = {}
for (const probe of probes) {
const key = probeKey(probe)
stats[key] = [probe.latency, 0, 0, probe.loss]
}
return stats
}
async function fetchProbes(systemId?: string) {
try {
const res = await pb.collection<NetworkProbeRecord>("network_probes").getList(0, 2000, {

View File

@@ -316,8 +316,6 @@ export interface ChartData {
systemStats: SystemStatsRecord[]
containerData: ChartDataContainer[]
orientation: "right" | "left"
ticks: number[]
domain: number[]
chartTime: ChartTimes
}
@@ -573,6 +571,8 @@ export interface NetworkProbeRecord {
type ProbeResult = number[]
export interface NetworkProbeStatsRecord {
id?: string
type?: string
stats: Record<string, ProbeResult>
created: number // unix timestamp (ms) for Recharts xAxis
}