mirror of
https://github.com/henrygd/beszel.git
synced 2026-05-06 19:01:48 +02:00
updates
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/pocketbase/dbx"
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
"github.com/pocketbase/pocketbase/tools/types"
|
||||
)
|
||||
|
||||
type RecordManager struct {
|
||||
@@ -40,7 +41,7 @@ type StatsRecord struct {
|
||||
|
||||
// Create longer records by averaging shorter records
|
||||
func (rm *RecordManager) CreateLongerRecords() {
|
||||
// start := time.Now()
|
||||
now := time.Now().UTC()
|
||||
longerRecordData := []LongerRecordData{
|
||||
{
|
||||
shorterType: "1m",
|
||||
@@ -71,6 +72,7 @@ func (rm *RecordManager) CreateLongerRecords() {
|
||||
// wrap the operations in a transaction
|
||||
rm.app.RunInTransaction(func(txApp core.App) error {
|
||||
var err error
|
||||
|
||||
collections := [3]*core.Collection{}
|
||||
collections[0], err = txApp.FindCachedCollectionByNameOrId("system_stats")
|
||||
if err != nil {
|
||||
@@ -96,49 +98,64 @@ func (rm *RecordManager) CreateLongerRecords() {
|
||||
recordData := longerRecordData[i]
|
||||
// log.Println("processing longer record type", recordData.longerType)
|
||||
// add one minute padding for longer records because they are created slightly later than the job start time
|
||||
longerRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration + time.Minute)
|
||||
longerRecordPeriod := now.Add(recordData.longerTimeDuration + time.Minute)
|
||||
// shorter records are created independently of longer records, so we shouldn't need to add padding
|
||||
shorterRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration)
|
||||
shorterRecordPeriod := now.Add(recordData.longerTimeDuration)
|
||||
// loop through both collections
|
||||
for _, collection := range collections {
|
||||
// check creation time of last longer record if not 10m, since 10m is created every run
|
||||
if recordData.longerType != "10m" {
|
||||
count, err := txApp.CountRecords(
|
||||
collection.Id,
|
||||
dbx.NewExp(
|
||||
"system = {:system} AND type = {:type} AND created > {:created}",
|
||||
dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod},
|
||||
),
|
||||
)
|
||||
var existingRecord struct {
|
||||
Id string
|
||||
}
|
||||
|
||||
params := dbx.Params{
|
||||
"type": recordData.longerType,
|
||||
"system": system.Id,
|
||||
"created": getCreatedTimeField(collection.Name, longerRecordPeriod),
|
||||
}
|
||||
|
||||
_ = db.Select("id").
|
||||
From(collection.Name).
|
||||
Where(dbx.NewExp("system = {:system} AND type = {:type} AND created > {:created}", params)).
|
||||
Limit(1).
|
||||
One(&existingRecord)
|
||||
|
||||
// continue if longer record exists
|
||||
if err != nil || count > 0 {
|
||||
if existingRecord.Id != "" {
|
||||
continue
|
||||
}
|
||||
}
|
||||
// get shorter records from the past x minutes
|
||||
var recordIds RecordIds
|
||||
|
||||
err := txApp.DB().
|
||||
params := dbx.Params{
|
||||
"type": recordData.shorterType,
|
||||
"system": system.Id,
|
||||
"created": getCreatedTimeField(collection.Name, shorterRecordPeriod),
|
||||
}
|
||||
|
||||
_ = txApp.DB().
|
||||
Select("id").
|
||||
From(collection.Name).
|
||||
AndWhere(dbx.NewExp(
|
||||
Where(dbx.NewExp(
|
||||
"system={:system} AND type={:type} AND created > {:created}",
|
||||
dbx.Params{
|
||||
"type": recordData.shorterType,
|
||||
"system": system.Id,
|
||||
"created": shorterRecordPeriod,
|
||||
},
|
||||
params,
|
||||
)).
|
||||
All(&recordIds)
|
||||
|
||||
// continue if not enough shorter records
|
||||
if err != nil || len(recordIds) < recordData.minShorterRecords {
|
||||
if len(recordIds) < recordData.minShorterRecords {
|
||||
continue
|
||||
}
|
||||
// average the shorter records and create longer record
|
||||
longerRecord := core.NewRecord(collection)
|
||||
longerRecord.Set("system", system.Id)
|
||||
longerRecord.Set("type", recordData.longerType)
|
||||
// network_probe_stats uses created as unix timestamp in milliseconds, so we need to set it manually here instead of relying on the default created field
|
||||
if collection.Name == "network_probe_stats" {
|
||||
longerRecord.Set("created", now.UnixMilli())
|
||||
}
|
||||
switch collection.Name {
|
||||
case "system_stats":
|
||||
longerRecord.Set("stats", rm.AverageSystemStats(db, recordIds))
|
||||
@@ -160,6 +177,13 @@ func (rm *RecordManager) CreateLongerRecords() {
|
||||
// log.Println("finished creating longer records", "time (ms)", time.Since(start).Milliseconds())
|
||||
}
|
||||
|
||||
func getCreatedTimeField(collectionName string, period time.Time) any {
|
||||
if collectionName == "network_probe_stats" {
|
||||
return period.UnixMilli()
|
||||
}
|
||||
return period.Format(types.DefaultDateLayout)
|
||||
}
|
||||
|
||||
// Calculate the average stats of a list of system_stats records without reflect
|
||||
func (rm *RecordManager) AverageSystemStats(db dbx.Builder, records RecordIds) *system.Stats {
|
||||
stats := make([]system.Stats, 0, len(records))
|
||||
|
||||
@@ -3,7 +3,6 @@ package records
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pocketbase/dbx"
|
||||
@@ -75,24 +74,17 @@ func deleteOldSystemStats(app core.App) error {
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
db := app.DB()
|
||||
|
||||
for _, collection := range collections {
|
||||
// Build the WHERE clause
|
||||
var conditionParts []string
|
||||
var params dbx.Params = make(map[string]any)
|
||||
for i := range recordData {
|
||||
rd := recordData[i]
|
||||
// Create parameterized condition for this record type
|
||||
dateParam := fmt.Sprintf("date%d", i)
|
||||
conditionParts = append(conditionParts, fmt.Sprintf("(type = '%s' AND created < {:%s})", rd.recordType, dateParam))
|
||||
params[dateParam] = now.Add(-rd.retention)
|
||||
}
|
||||
// Combine conditions with OR
|
||||
conditionStr := strings.Join(conditionParts, " OR ")
|
||||
// Construct and execute the full raw query
|
||||
rawQuery := fmt.Sprintf("DELETE FROM %s WHERE %s", collection, conditionStr)
|
||||
if _, err := app.DB().NewQuery(rawQuery).Bind(params).Execute(); err != nil {
|
||||
return fmt.Errorf("failed to delete from %s: %v", collection, err)
|
||||
query := db.Delete(collection, dbx.NewExp("type={:type} AND created<{:created}"))
|
||||
for _, rd := range recordData {
|
||||
if _, err := query.Bind(dbx.Params{
|
||||
"type": rd.recordType,
|
||||
"created": getCreatedTimeField(collection, now.Add(-rd.retention)),
|
||||
}).Execute(); err != nil {
|
||||
return fmt.Errorf("failed to delete from %s: %v", collection, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user