feat: add network probe data to realtime mode

Include probe results in the 1-second realtime WebSocket broadcast so
the frontend can update probe latency/loss every second, matching the
behavior of system and container metrics.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
xiaomiku01
2026-04-11 11:54:22 +08:00
parent fab5e8a656
commit bc0581ea61
4 changed files with 74 additions and 7 deletions

View File

@@ -7,10 +7,21 @@ import (
"time" "time"
"github.com/henrygd/beszel/internal/common" "github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/henrygd/beszel/internal/entities/probe"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/subscriptions" "github.com/pocketbase/pocketbase/tools/subscriptions"
) )
// realtimePayload wraps system data with optional network probe results for realtime broadcast.
type realtimePayload struct {
Stats system.Stats `json:"stats"`
Info system.Info `json:"info"`
Containers []*container.Stats `json:"container"`
Probes map[string]probe.Result `json:"probes,omitempty"`
}
type subscriptionInfo struct { type subscriptionInfo struct {
subscription string subscription string
connectedClients uint8 connectedClients uint8
@@ -142,16 +153,25 @@ func (sm *SystemManager) startRealtimeWorker() {
// fetchRealtimeDataAndNotify fetches realtime data for all active subscriptions and notifies the clients. // fetchRealtimeDataAndNotify fetches realtime data for all active subscriptions and notifies the clients.
func (sm *SystemManager) fetchRealtimeDataAndNotify() { func (sm *SystemManager) fetchRealtimeDataAndNotify() {
for systemId, info := range activeSubscriptions { for systemId, info := range activeSubscriptions {
system, err := sm.GetSystem(systemId) sys, err := sm.GetSystem(systemId)
if err != nil { if err != nil {
continue continue
} }
go func() { go func() {
data, err := system.fetchDataFromAgent(common.DataRequestOptions{CacheTimeMs: 1000}) data, err := sys.fetchDataFromAgent(common.DataRequestOptions{CacheTimeMs: 1000})
if err != nil { if err != nil {
return return
} }
bytes, err := json.Marshal(data) payload := realtimePayload{
Stats: data.Stats,
Info: data.Info,
Containers: data.Containers,
}
// Fetch network probe results (lightweight in-memory read on agent)
if probes, err := sys.FetchNetworkProbeResults(); err == nil && len(probes) > 0 {
payload.Probes = probes
}
bytes, err := json.Marshal(payload)
if err == nil { if err == nil {
notify(sm.hub, info.subscription, bytes) notify(sm.hub, info.subscription, bytes)
} }

View File

@@ -29,6 +29,7 @@ export default memo(function SystemDetail({ id }: { id: string }) {
system, system,
systemStats, systemStats,
containerData, containerData,
probeStats,
chartData, chartData,
containerChartConfigs, containerChartConfigs,
details, details,
@@ -148,7 +149,7 @@ export default memo(function SystemDetail({ id }: { id: string }) {
{hasSystemd && <LazySystemdTable systemId={system.id} />} {hasSystemd && <LazySystemdTable systemId={system.id} />}
<Suspense> <Suspense>
<NetworkProbes systemId={system.id} chartData={chartData} grid={grid} /> <NetworkProbes systemId={system.id} chartData={chartData} grid={grid} realtimeProbeStats={probeStats} />
</Suspense> </Suspense>
</> </>
) )
@@ -198,7 +199,7 @@ export default memo(function SystemDetail({ id }: { id: string }) {
{pageBottomExtraMargin > 0 && <div style={{ marginBottom: pageBottomExtraMargin }}></div>} {pageBottomExtraMargin > 0 && <div style={{ marginBottom: pageBottomExtraMargin }}></div>}
</div> </div>
<Suspense> <Suspense>
<NetworkProbes systemId={system.id} chartData={chartData} grid={grid} /> <NetworkProbes systemId={system.id} chartData={chartData} grid={grid} realtimeProbeStats={probeStats} />
</Suspense> </Suspense>
</TabsContent> </TabsContent>

View File

@@ -23,10 +23,12 @@ export default function NetworkProbes({
systemId, systemId,
chartData, chartData,
grid, grid,
realtimeProbeStats,
}: { }: {
systemId: string systemId: string
chartData: ChartData chartData: ChartData
grid: boolean grid: boolean
realtimeProbeStats?: NetworkProbeStatsRecord[]
}) { }) {
const [probes, setProbes] = useState<NetworkProbeRecord[]>([]) const [probes, setProbes] = useState<NetworkProbeRecord[]>([])
const [stats, setStats] = useState<NetworkProbeStatsRecord[]>([]) const [stats, setStats] = useState<NetworkProbeStatsRecord[]>([])
@@ -50,13 +52,42 @@ export default function NetworkProbes({
// Build set of current probe keys to filter out deleted probes from stats // Build set of current probe keys to filter out deleted probes from stats
const activeProbeKeys = useMemo(() => new Set(probes.map(probeKey)), [probes]) const activeProbeKeys = useMemo(() => new Set(probes.map(probeKey)), [probes])
// Fetch probe stats based on chart time // Use realtime probe stats when in 1m mode
useEffect(() => {
if (chartTime !== "1m" || !realtimeProbeStats) {
return
}
// Filter stats to only include currently active probes
const data: NetworkProbeStatsRecord[] = realtimeProbeStats.map((r) => {
const filtered: NetworkProbeStatsRecord["stats"] = {}
for (const [key, val] of Object.entries(r.stats)) {
if (activeProbeKeys.has(key)) {
filtered[key] = val
}
}
return { stats: filtered, created: r.created }
})
setStats(data)
if (data.length > 0) {
const last = data[data.length - 1].stats
const latest: Record<string, { avg: number; loss: number }> = {}
for (const [key, val] of Object.entries(last)) {
latest[key] = { avg: val.avg, loss: val.loss }
}
setLatestResults(latest)
}
}, [chartTime, realtimeProbeStats, activeProbeKeys])
// Fetch probe stats based on chart time (skip in realtime mode)
useEffect(() => { useEffect(() => {
if (probes.length === 0) { if (probes.length === 0) {
setStats([]) setStats([])
setLatestResults({}) setLatestResults({})
return return
} }
if (chartTime === "1m") {
return
}
const controller = new AbortController() const controller = new AbortController()
const statsType = chartTimeData[chartTime]?.type ?? "1m" const statsType = chartTimeData[chartTime]?.type ?? "1m"

View File

@@ -19,6 +19,7 @@ import { chartTimeData, listen, parseSemVer, useBrowserStorage } from "@/lib/uti
import type { import type {
ChartData, ChartData,
ContainerStatsRecord, ContainerStatsRecord,
NetworkProbeStatsRecord,
SystemDetailsRecord, SystemDetailsRecord,
SystemInfo, SystemInfo,
SystemRecord, SystemRecord,
@@ -48,6 +49,7 @@ export function useSystemData(id: string) {
const [system, setSystem] = useState({} as SystemRecord) const [system, setSystem] = useState({} as SystemRecord)
const [systemStats, setSystemStats] = useState([] as SystemStatsRecord[]) const [systemStats, setSystemStats] = useState([] as SystemStatsRecord[])
const [containerData, setContainerData] = useState([] as ChartData["containerData"]) const [containerData, setContainerData] = useState([] as ChartData["containerData"])
const [probeStats, setProbeStats] = useState([] as NetworkProbeStatsRecord[])
const persistChartTime = useRef(false) const persistChartTime = useRef(false)
const statsRequestId = useRef(0) const statsRequestId = useRef(0)
const [chartLoading, setChartLoading] = useState(true) const [chartLoading, setChartLoading] = useState(true)
@@ -119,24 +121,36 @@ export function useSystemData(id: string) {
pb.realtime pb.realtime
.subscribe( .subscribe(
`rt_metrics`, `rt_metrics`,
(data: { container: ContainerStatsRecord[]; info: SystemInfo; stats: SystemStats }) => { (data: {
container: ContainerStatsRecord[]
info: SystemInfo
stats: SystemStats
probes?: NetworkProbeStatsRecord["stats"]
}) => {
const now = Date.now() const now = Date.now()
const statsPoint = { created: now, stats: data.stats } as SystemStatsRecord const statsPoint = { created: now, stats: data.stats } as SystemStatsRecord
const containerPoint = const containerPoint =
data.container?.length > 0 data.container?.length > 0
? makeContainerPoint(now, data.container as unknown as ContainerStatsRecord["stats"]) ? makeContainerPoint(now, data.container as unknown as ContainerStatsRecord["stats"])
: null : null
const probePoint: NetworkProbeStatsRecord | null = data.probes
? { stats: data.probes, created: now }
: null
// on first message, make sure we clear out data from other time periods // on first message, make sure we clear out data from other time periods
if (isFirst) { if (isFirst) {
isFirst = false isFirst = false
setSystemStats([statsPoint]) setSystemStats([statsPoint])
setContainerData(containerPoint ? [containerPoint] : []) setContainerData(containerPoint ? [containerPoint] : [])
setProbeStats(probePoint ? [probePoint] : [])
return return
} }
setSystemStats((prev) => appendData(prev, [statsPoint], 1000, 60)) setSystemStats((prev) => appendData(prev, [statsPoint], 1000, 60))
if (containerPoint) { if (containerPoint) {
setContainerData((prev) => appendData(prev, [containerPoint], 1000, 60)) setContainerData((prev) => appendData(prev, [containerPoint], 1000, 60))
} }
if (probePoint) {
setProbeStats((prev) => appendData(prev, [probePoint], 1000, 60))
}
}, },
{ query: { system: system.id } } { query: { system: system.id } }
) )
@@ -322,6 +336,7 @@ export function useSystemData(id: string) {
system, system,
systemStats, systemStats,
containerData, containerData,
probeStats,
chartData, chartData,
containerChartConfigs, containerChartConfigs,
details, details,