This commit is contained in:
henrygd
2026-04-28 17:46:56 -04:00
parent b182b699d7
commit 891b03426f
12 changed files with 359 additions and 228 deletions

View File

@@ -36,28 +36,16 @@ func bindNetworkProbesEvents(hub *Hub) {
return nil
}
// if system connected, run the probe immediately
// if not, return and wait for the system to connect and sync probes then
// if not, return and wait for the system to connect and sync probes on reg schedule
system, err := hub.sm.GetSystem(e.Record.GetString("system"))
if err != nil || system.Status != "up" {
return nil
if err == nil && system.Status == "up" {
go hub.upsertNetworkProbe(e.Record, true)
}
result, err := hub.upsertNetworkProbe(e.Record, true)
if err != nil {
hub.Logger().Warn("failed to sync probe to agent", "system", e.Record.GetString("system"), "probe", e.Record.Id, "err", err)
return nil
}
if result == nil {
return nil
}
setProbeResultFields(e.Record, *result)
if err := e.App.SaveNoValidate(e.Record); err != nil {
hub.Logger().Warn("failed to save initial probe result", "system", e.Record.GetString("system"), "probe", e.Record.Id, "err", err)
}
return e.Next()
return err
})
// On API update requests, if the probe config changed in a way that requires a new ID, we will create a new
// record with the new ID and delete the old one. Otherwise, we will just update the existing probe on the agent.
// On API update requests, if the probe config changed in a way that requires a new ID, create a new
// record with the new ID and delete the old one. Otherwise, just update the existing probe on the agent.
hub.OnRecordUpdateRequest("network_probes").BindFunc(func(e *core.RecordRequestEvent) error {
systemID := e.Record.GetString("system")
ID := generateProbeID(systemID, *probeConfigFromRecord(e.Record))
@@ -73,18 +61,15 @@ func bindNetworkProbesEvents(hub *Hub) {
}
err := e.Next()
if e.Record.GetBool("enabled") {
var result *probe.Result
// if the probe is enabled, sync the updated config to the agent now
runNow := !e.Record.Original().GetBool("enabled")
result, err = hub.upsertNetworkProbe(e.Record, runNow)
if result != nil {
setProbeResultFields(e.Record, *result)
_ = e.App.SaveNoValidate(e.Record)
}
err = hub.upsertNetworkProbe(e.Record, runNow)
} else {
// if the probe is paused, remove it from the agent
err = hub.deleteNetworkProbe(e.Record)
}
if err != nil {
hub.Logger().Warn("failed to sync updated probe", "system", e.Record.GetString("system"), "probe", e.Record.Id, "err", err)
hub.Logger().Warn("failed to sync updated probe", "system", systemID, "probe", e.Record.Id, "err", err)
}
return nil
})
@@ -115,10 +100,10 @@ func setProbeResultFields(record *core.Record, result probe.Result) {
nowString := now.Format(types.DefaultDateLayout)
record.Set("res", result.Get(0))
record.Set("resAvg1h", result.Get(1))
record.Set("resMin1h", result.Get(2))
record.Set("resMax1h", result.Get(3))
record.Set("loss", result.Get(4))
record.Set("loss1h", result.Get(5))
record.Set("resMin1h", result.Get(3))
record.Set("resMax1h", result.Get(5))
// record.Set("loss", result.Get(4))
record.Set("loss1h", result.Get(7))
record.Set("updated", nowString)
}
@@ -133,14 +118,20 @@ func copyProbeToNewRecord(oldRecord *core.Record, newID string) *core.Record {
return newRecord
}
// upsertNetworkProbe applies the record's probe config to the target system.
func (h *Hub) upsertNetworkProbe(record *core.Record, runNow bool) (*probe.Result, error) {
// upsertNetworkProbe creates or updates the record's probe on the target system. If runNow
// is true, it will also trigger an immediate probe run and update the record with the result.
func (h *Hub) upsertNetworkProbe(record *core.Record, runNow bool) error {
systemID := record.GetString("system")
system, err := h.sm.GetSystem(systemID)
if err != nil {
return nil, err
return err
}
return system.UpsertNetworkProbe(*probeConfigFromRecord(record), runNow)
result, err := system.UpsertNetworkProbe(*probeConfigFromRecord(record), runNow)
if err != nil || result == nil {
return err
}
setProbeResultFields(record, *result)
return h.App.SaveNoValidate(record)
}
// deleteNetworkProbe removes the record's probe from the target system.

View File

@@ -30,6 +30,7 @@ import (
"github.com/lxzan/gws"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/security"
"github.com/pocketbase/pocketbase/tools/types"
"golang.org/x/crypto/ssh"
)
@@ -314,16 +315,16 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s
return err
}
func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, systemId string) error {
if len(data) == 0 {
func updateNetworkProbesRecords(app core.App, probeResults map[string]probe.Result, systemId string) error {
if len(probeResults) == 0 {
return nil
}
var err error
collectionName := "network_probes"
probeCollectionName := "network_probes"
// If realtime updates are active, we save via PocketBase records to trigger realtime events.
// Otherwise we can do a more efficient direct update via SQL
realtimeActive := utils.RealtimeActiveForCollection(app, collectionName, func(filterQuery string) bool {
realtimeActive := utils.RealtimeActiveForCollection(app, probeCollectionName, func(filterQuery string) bool {
return !strings.Contains(filterQuery, "system") || strings.Contains(filterQuery, systemId)
})
@@ -334,63 +335,68 @@ func updateNetworkProbesRecords(app core.App, data map[string]probe.Result, syst
var updateQuery *dbx.Query
if !realtimeActive {
db = app.DB()
sql := fmt.Sprintf("UPDATE %s SET res={:res}, resMin1h={:resMin1h}, resMax1h={:resMax1h}, resAvg1h={:resAvg1h}, loss={:loss}, loss1h={:loss1h}, updated={:updated} WHERE id={:id}", collectionName)
updateQuery = db.NewQuery(sql)
probeFields := []string{"res", "resMin1h", "resMax1h", "resAvg1h", "loss1h", "updated"}
setClauses := make([]string, len(probeFields))
for i, f := range probeFields {
setClauses[i] = fmt.Sprintf("%s={:%s}", f, f)
}
queryString := fmt.Sprintf("UPDATE %s SET %s WHERE id={:id}", probeCollectionName, strings.Join(setClauses, ", "))
updateQuery = db.NewQuery(queryString)
}
// update network_probes records
for id, values := range data {
for id, values := range probeResults {
probeData := map[string]any{
"id": id,
"res": values.Get(0),
"resAvg1h": values.Get(1),
"resMin1h": values.Get(3),
"resMax1h": values.Get(5),
"loss1h": values.Get(7),
"updated": nowString,
}
switch realtimeActive {
case true:
var record *core.Record
record, err = app.FindRecordById(collectionName, id)
record, err = app.FindRecordById(probeCollectionName, id)
if err == nil {
record.Set("res", values.Get(0))
record.Set("resAvg1h", values.Get(1))
record.Set("resMin1h", values.Get(2))
record.Set("resMax1h", values.Get(3))
record.Set("loss", values.Get(4))
record.Set("loss1h", values.Get(5))
record.Set("updated", nowString)
record.Load(probeData)
err = app.SaveNoValidate(record)
}
default:
_, err = updateQuery.Bind(dbx.Params{
"id": id,
"res": values.Get(0),
"resAvg1h": values.Get(1),
"resMin1h": values.Get(2),
"resMax1h": values.Get(3),
"loss": values.Get(4),
"loss1h": values.Get(5),
"updated": nowString,
}).Execute()
_, err = updateQuery.Bind(dbx.Params(probeData)).Execute()
}
if err != nil {
app.Logger().Warn("Failed to update probe", "system", systemId, "probe", id, "err", err)
}
}
// insert network probe stats records
switch realtimeActive {
case true:
collection, _ := app.FindCachedCollectionByNameOrId("network_probe_stats")
record := core.NewRecord(collection)
record.Set("system", systemId)
record.Set("stats", data)
record.Set("type", "1m")
record.Set("created", nowMilli)
err = app.SaveNoValidate(record)
default:
var statsJson types.JSONRaw
if err := statsJson.Scan(data); err == nil {
insertQuery := db.NewQuery("INSERT INTO network_probe_stats (system, stats, type, created) VALUES ({:system}, {:stats}, {:type}, {:created})")
_, err = insertQuery.Bind(dbx.Params{
"system": systemId,
"stats": statsJson,
"type": "1m",
"created": nowMilli,
}).Execute()
// handle stats collection as well
statsCollectionName := "network_probe_stats"
// we don't need the hour values for the stats collection
stats := make(map[string]probe.Stats, len(probeResults))
for key, values := range probeResults {
stats[key] = probe.Stats{}.FromResult(values)
}
statsRecordData := map[string]any{
"system": systemId,
"type": "1m",
"created": nowMilli,
}
var statsJson types.JSONRaw
if err = statsJson.Scan(stats); err == nil {
statsRecordData["stats"] = statsJson
switch realtimeActive {
case true:
collection, _ := app.FindCachedCollectionByNameOrId(statsCollectionName)
record := core.NewRecord(collection)
record.Load(statsRecordData)
err = app.SaveNoValidate(record)
default:
statsRecordData["id"] = security.PseudorandomStringWithAlphabet(10, core.DefaultIdAlphabet)
_, err = db.Insert(statsCollectionName, dbx.Params(statsRecordData)).Execute()
}
}
if err != nil {