refactor go code for pocketbase 0.23.0 (#300)

This commit is contained in:
Henry Dollman
2024-11-24 18:15:24 -05:00
parent b4bc8a31aa
commit 14716d36a6
11 changed files with 1592 additions and 779 deletions

View File

@@ -11,11 +11,10 @@ import (
"github.com/containrrr/shoutrrr"
"github.com/goccy/go-json"
"github.com/labstack/echo/v5"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase"
"github.com/pocketbase/pocketbase/apis"
"github.com/pocketbase/pocketbase/models"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/mailer"
"github.com/pocketbase/pocketbase/tools/types"
"github.com/spf13/cast"
@@ -48,8 +47,8 @@ type SystemAlertStats struct {
}
type SystemAlertData struct {
systemRecord *models.Record
alertRecord *models.Record
systemRecord *core.Record
alertRecord *core.Record
name string
unit string
val float64
@@ -68,12 +67,12 @@ func NewAlertManager(app *pocketbase.PocketBase) *AlertManager {
}
}
func (am *AlertManager) HandleSystemAlerts(systemRecord *models.Record, systemInfo system.Info, temperatures map[string]float64, extraFs map[string]*system.FsStats) error {
func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, systemInfo system.Info, temperatures map[string]float64, extraFs map[string]*system.FsStats) error {
// start := time.Now()
// defer func() {
// log.Println("alert stats took", time.Since(start))
// }()
alertRecords, err := am.app.Dao().FindRecordsByExpr("alerts",
alertRecords, err := am.app.FindAllRecords("alerts",
dbx.NewExp("system={:system}", dbx.Params{"system": systemRecord.Id}),
)
if err != nil || len(alertRecords) == 0 {
@@ -82,7 +81,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *models.Record, systemIn
}
var validAlerts []SystemAlertData
now := systemRecord.Updated.Time().UTC()
now := systemRecord.GetDateTime("updated").Time().UTC()
oldestTime := now
for _, alertRecord := range alertRecords {
@@ -155,7 +154,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *models.Record, systemIn
Created types.DateTime `db:"created"`
}{}
err = am.app.Dao().DB().
err = am.app.DB().
Select("stats", "created").
From("system_stats").
Where(dbx.NewExp(
@@ -325,28 +324,28 @@ func (am *AlertManager) sendSystemAlert(alert SystemAlertData) {
body := fmt.Sprintf("%s averaged %.2f%s for the previous %v %s.", alert.descriptor, alert.val, alert.unit, alert.min, minutesLabel)
alert.alertRecord.Set("triggered", alert.triggered)
if err := am.app.Dao().SaveRecord(alert.alertRecord); err != nil {
if err := am.app.Save(alert.alertRecord); err != nil {
// app.Logger().Error("failed to save alert record", "err", err.Error())
return
}
// expand the user relation and send the alert
if errs := am.app.Dao().ExpandRecord(alert.alertRecord, []string{"user"}, nil); len(errs) > 0 {
if errs := am.app.ExpandRecord(alert.alertRecord, []string{"user"}, nil); len(errs) > 0 {
// app.Logger().Error("failed to expand user relation", "errs", errs)
return
}
if user := alert.alertRecord.ExpandedOne("user"); user != nil {
am.sendAlert(AlertMessageData{
UserID: user.GetId(),
UserID: user.Id,
Title: subject,
Message: body,
Link: am.app.Settings().Meta.AppUrl + "/system/" + url.PathEscape(systemName),
Link: am.app.Settings().Meta.AppURL + "/system/" + url.PathEscape(systemName),
LinkText: "View " + systemName,
})
}
}
// todo: allow x minutes downtime before sending alert
func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *models.Record) error {
func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *core.Record) error {
var alertStatus string
switch newStatus {
case "up":
@@ -362,9 +361,9 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *mo
return nil
}
// check if use
alertRecords, err := am.app.Dao().FindRecordsByExpr("alerts",
alertRecords, err := am.app.FindAllRecords("alerts",
dbx.HashExp{
"system": oldSystemRecord.GetId(),
"system": oldSystemRecord.Id,
"name": "Status",
},
)
@@ -374,7 +373,7 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *mo
}
for _, alertRecord := range alertRecords {
// expand the user relation
if errs := am.app.Dao().ExpandRecord(alertRecord, []string{"user"}, nil); len(errs) > 0 {
if errs := am.app.ExpandRecord(alertRecord, []string{"user"}, nil); len(errs) > 0 {
return fmt.Errorf("failed to expand: %v", errs)
}
user := alertRecord.ExpandedOne("user")
@@ -388,10 +387,10 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *mo
// send alert
systemName := oldSystemRecord.GetString("name")
am.sendAlert(AlertMessageData{
UserID: user.GetId(),
UserID: user.Id,
Title: fmt.Sprintf("Connection to %s is %s %v", systemName, alertStatus, emoji),
Message: fmt.Sprintf("Connection to %s is %s", systemName, alertStatus),
Link: am.app.Settings().Meta.AppUrl + "/system/" + url.PathEscape(systemName),
Link: am.app.Settings().Meta.AppURL + "/system/" + url.PathEscape(systemName),
LinkText: "View " + systemName,
})
}
@@ -400,7 +399,7 @@ func (am *AlertManager) HandleStatusAlerts(newStatus string, oldSystemRecord *mo
func (am *AlertManager) sendAlert(data AlertMessageData) {
// get user settings
record, err := am.app.Dao().FindFirstRecordByFilter(
record, err := am.app.FindFirstRecordByFilter(
"user_settings", "user={:user}",
dbx.Params{"user": data.UserID},
)
@@ -512,19 +511,20 @@ func sliceContains(slice []string, item string) bool {
return false
}
func (am *AlertManager) SendTestNotification(c echo.Context) error {
requestData := apis.RequestInfo(c)
if requestData.AuthRecord == nil {
// todo: test
func (am *AlertManager) SendTestNotification(e *core.RequestEvent) error {
info, _ := e.RequestInfo()
if info.Auth == nil {
return apis.NewForbiddenError("Forbidden", nil)
}
url := c.QueryParam("url")
url := e.Request.URL.Query().Get("url")
// log.Println("url", url)
if url == "" {
return c.JSON(200, map[string]string{"err": "URL is required"})
return e.JSON(200, map[string]string{"err": "URL is required"})
}
err := am.SendShoutrrrAlert(url, "Test Alert", "This is a notification from Beszel.", am.app.Settings().Meta.AppUrl, "View Beszel")
err := am.SendShoutrrrAlert(url, "Test Alert", "This is a notification from Beszel.", am.app.Settings().Meta.AppURL, "View Beszel")
if err != nil {
return c.JSON(200, map[string]string{"err": err.Error()})
return e.JSON(200, map[string]string{"err": err.Error()})
}
return c.JSON(200, map[string]bool{"err": false})
return e.JSON(200, map[string]bool{"err": false})
}

View File

@@ -8,10 +8,9 @@ import (
"path/filepath"
"strconv"
"github.com/labstack/echo/v5"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/apis"
"github.com/pocketbase/pocketbase/models"
"github.com/pocketbase/pocketbase/core"
"github.com/spf13/cast"
"gopkg.in/yaml.v3"
)
@@ -46,11 +45,11 @@ func (h *Hub) syncSystemsWithConfig() error {
return nil
}
var firstUser *models.Record
var firstUser *core.Record
// Create a map of email to user ID
userEmailToID := make(map[string]string)
users, err := h.app.Dao().FindRecordsByExpr("users", dbx.NewExp("id != ''"))
users, err := h.app.FindAllRecords("users", dbx.NewExp("id != ''"))
if err != nil {
return err
}
@@ -85,13 +84,13 @@ func (h *Hub) syncSystemsWithConfig() error {
}
// Get existing systems
existingSystems, err := h.app.Dao().FindRecordsByExpr("systems", dbx.NewExp("id != ''"))
existingSystems, err := h.app.FindAllRecords("systems", dbx.NewExp("id != ''"))
if err != nil {
return err
}
// Create a map of existing systems for easy lookup
existingSystemsMap := make(map[string]*models.Record)
existingSystemsMap := make(map[string]*core.Record)
for _, system := range existingSystems {
key := system.GetString("host") + ":" + system.GetString("port")
existingSystemsMap[key] = system
@@ -105,24 +104,24 @@ func (h *Hub) syncSystemsWithConfig() error {
existingSystem.Set("name", sysConfig.Name)
existingSystem.Set("users", sysConfig.Users)
existingSystem.Set("port", sysConfig.Port)
if err := h.app.Dao().SaveRecord(existingSystem); err != nil {
if err := h.app.Save(existingSystem); err != nil {
return err
}
delete(existingSystemsMap, key)
} else {
// Create new system
systemsCollection, err := h.app.Dao().FindCollectionByNameOrId("systems")
systemsCollection, err := h.app.FindCollectionByNameOrId("systems")
if err != nil {
return fmt.Errorf("failed to find systems collection: %v", err)
}
newSystem := models.NewRecord(systemsCollection)
newSystem := core.NewRecord(systemsCollection)
newSystem.Set("name", sysConfig.Name)
newSystem.Set("host", sysConfig.Host)
newSystem.Set("port", sysConfig.Port)
newSystem.Set("users", sysConfig.Users)
newSystem.Set("info", system.Info{})
newSystem.Set("status", "pending")
if err := h.app.Dao().SaveRecord(newSystem); err != nil {
if err := h.app.Save(newSystem); err != nil {
return fmt.Errorf("failed to create new system: %v", err)
}
}
@@ -130,7 +129,7 @@ func (h *Hub) syncSystemsWithConfig() error {
// Delete systems not in config
for _, system := range existingSystemsMap {
if err := h.app.Dao().DeleteRecord(system); err != nil {
if err := h.app.Delete(system); err != nil {
return err
}
}
@@ -142,7 +141,7 @@ func (h *Hub) syncSystemsWithConfig() error {
// Generates content for the config.yml file as a YAML string
func (h *Hub) generateConfigYAML() (string, error) {
// Fetch all systems from the database
systems, err := h.app.Dao().FindRecordsByFilter("systems", "id != ''", "name", -1, 0)
systems, err := h.app.FindRecordsByFilter("systems", "id != ''", "name", -1, 0)
if err != nil {
return "", err
}
@@ -195,7 +194,7 @@ func (h *Hub) generateConfigYAML() (string, error) {
// New helper function to get a map of user IDs to emails
func (h *Hub) getUserEmailMap(userIDs []string) (map[string]string, error) {
users, err := h.app.Dao().FindRecordsByIds("users", userIDs)
users, err := h.app.FindRecordsByIds("users", userIDs)
if err != nil {
return nil, err
}
@@ -209,14 +208,15 @@ func (h *Hub) getUserEmailMap(userIDs []string) (map[string]string, error) {
}
// Returns the current config.yml file as a JSON object
func (h *Hub) getYamlConfig(c echo.Context) error {
requestData := apis.RequestInfo(c)
if requestData.AuthRecord == nil || requestData.AuthRecord.GetString("role") != "admin" {
func (h *Hub) getYamlConfig(e *core.RequestEvent) error {
info, _ := e.RequestInfo()
// todo: test
if info.Auth == nil || info.Auth.GetString("role") != "admin" {
return apis.NewForbiddenError("Forbidden", nil)
}
configContent, err := h.generateConfigYAML()
if err != nil {
return err
}
return c.JSON(200, map[string]string{"config": configContent})
return e.JSON(200, map[string]string{"config": configContent})
}

View File

@@ -24,13 +24,10 @@ import (
"time"
"github.com/goccy/go-json"
"github.com/labstack/echo/v5"
"github.com/pocketbase/pocketbase"
"github.com/pocketbase/pocketbase/apis"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/models"
"github.com/pocketbase/pocketbase/plugins/migratecmd"
"github.com/pocketbase/pocketbase/tools/cron"
"golang.org/x/crypto/ssh"
)
@@ -42,8 +39,8 @@ type Hub struct {
am *alerts.AlertManager
um *users.UserManager
rm *records.RecordManager
systemStats *models.Collection
containerStats *models.Collection
systemStats *core.Collection
containerStats *core.Collection
}
func NewHub(app *pocketbase.PocketBase) *Hub {
@@ -67,128 +64,127 @@ func (h *Hub) Run() {
})
// initial setup
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
h.app.OnServe().BindFunc(func(se *core.ServeEvent) error {
// create ssh client config
err := h.createSSHClientConfig()
if err != nil {
log.Fatal(err)
}
// set auth settings
usersCollection, err := h.app.Dao().FindCollectionByNameOrId("users")
usersCollection, err := h.app.FindCollectionByNameOrId("users")
if err != nil {
return err
}
usersAuthOptions := usersCollection.AuthOptions()
usersAuthOptions.AllowUsernameAuth = false
// disable email auth if DISABLE_PASSWORD_AUTH env var is set
// todo: test email login and DISABLE_PASSWORD_AUTH env var (do we need to save usersCollection?)
if os.Getenv("DISABLE_PASSWORD_AUTH") == "true" {
usersAuthOptions.AllowEmailAuth = false
usersCollection.PasswordAuth.Enabled = false
} else {
usersAuthOptions.AllowEmailAuth = true
}
usersCollection.SetOptions(usersAuthOptions)
if err := h.app.Dao().SaveCollection(usersCollection); err != nil {
return err
usersCollection.PasswordAuth.Enabled = true
usersCollection.PasswordAuth.IdentityFields = []string{"email"}
}
// sync systems with config
return h.syncSystemsWithConfig()
h.syncSystemsWithConfig()
return se.Next()
})
// serve web ui
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
h.app.OnServe().BindFunc(func(se *core.ServeEvent) error {
switch isGoRun {
case true:
proxy := httputil.NewSingleHostReverseProxy(&url.URL{
Scheme: "http",
Host: "localhost:5173",
})
e.Router.Any("/*", echo.WrapHandler(proxy))
se.Router.Any("/", func(e *core.RequestEvent) error {
proxy.ServeHTTP(e.Response, e.Request)
return nil
})
default:
csp, cspExists := os.LookupEnv("CSP")
e.Router.Any("/*", func(c echo.Context) error {
se.Router.Any("/", func(e *core.RequestEvent) error {
if cspExists {
c.Response().Header().Del("X-Frame-Options")
c.Response().Header().Set("Content-Security-Policy", csp)
e.Response.Header().Del("X-Frame-Options")
e.Response.Header().Set("Content-Security-Policy", csp)
}
indexFallback := !strings.HasPrefix(c.Request().URL.Path, "/static/")
return apis.StaticDirectoryHandler(site.Dist, indexFallback)(c)
indexFallback := !strings.HasPrefix(e.Request.URL.Path, "/static/")
return apis.Static(site.Dist, indexFallback)(e)
})
}
return nil
return se.Next()
})
// set up scheduled jobs / ticker for system updates
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
h.app.OnServe().BindFunc(func(se *core.ServeEvent) error {
// 15 second ticker for system updates
go h.startSystemUpdateTicker()
// set up cron jobs
scheduler := cron.New()
// delete old records once every hour
scheduler.MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords)
h.app.Cron().MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords)
// create longer records every 10 minutes
scheduler.MustAdd("create longer records", "*/10 * * * *", func() {
h.app.Cron().MustAdd("create longer records", "*/10 * * * *", func() {
if systemStats, containerStats, err := h.getCollections(); err == nil {
h.rm.CreateLongerRecords([]*models.Collection{systemStats, containerStats})
h.rm.CreateLongerRecords([]*core.Collection{systemStats, containerStats})
}
})
scheduler.Start()
return nil
return se.Next()
})
// custom api routes
h.app.OnBeforeServe().Add(func(e *core.ServeEvent) error {
h.app.OnServe().BindFunc(func(se *core.ServeEvent) error {
// returns public key
e.Router.GET("/api/beszel/getkey", func(c echo.Context) error {
requestData := apis.RequestInfo(c)
if requestData.AuthRecord == nil {
se.Router.GET("/api/beszel/getkey", func(e *core.RequestEvent) error {
info, _ := e.RequestInfo()
if info.Auth == nil {
return apis.NewForbiddenError("Forbidden", nil)
}
return c.JSON(http.StatusOK, map[string]string{"key": h.pubKey, "v": beszel.Version})
return e.JSON(http.StatusOK, map[string]string{"key": h.pubKey, "v": beszel.Version})
})
// check if first time setup on login page
e.Router.GET("/api/beszel/first-run", func(c echo.Context) error {
adminNum, err := h.app.Dao().TotalAdmins()
se.Router.GET("/api/beszel/first-run", func(e *core.RequestEvent) error {
// todo: test that adminNum is correct
adminNum, err := h.app.CountRecords(core.CollectionNameSuperusers)
if err != nil {
return err
}
return c.JSON(http.StatusOK, map[string]bool{"firstRun": adminNum == 0})
return e.JSON(http.StatusOK, map[string]bool{"firstRun": adminNum == 0})
})
// send test notification
e.Router.GET("/api/beszel/send-test-notification", h.am.SendTestNotification)
se.Router.GET("/api/beszel/send-test-notification", h.am.SendTestNotification)
// API endpoint to get config.yml content
e.Router.GET("/api/beszel/config-yaml", h.getYamlConfig)
return nil
se.Router.GET("/api/beszel/config-yaml", h.getYamlConfig)
return se.Next()
})
// system creation defaults
h.app.OnModelBeforeCreate("systems").Add(func(e *core.ModelEvent) error {
record := e.Model.(*models.Record)
record.Set("info", system.Info{})
record.Set("status", "pending")
return nil
h.app.OnRecordCreate("systems").BindFunc(func(e *core.RecordEvent) error {
e.Record.Set("info", system.Info{})
e.Record.Set("status", "pending")
return e.Next()
})
// immediately create connection for new systems
h.app.OnModelAfterCreate("systems").Add(func(e *core.ModelEvent) error {
go h.updateSystem(e.Model.(*models.Record))
return nil
h.app.OnRecordAfterCreateSuccess("systems").BindFunc(func(e *core.RecordEvent) error {
go h.updateSystem(e.Record)
return e.Next()
})
// handle default values for user / user_settings creation
h.app.OnModelBeforeCreate("users").Add(h.um.InitializeUserRole)
h.app.OnModelBeforeCreate("user_settings").Add(h.um.InitializeUserSettings)
h.app.OnRecordCreate("users").BindFunc(h.um.InitializeUserRole)
h.app.OnRecordCreate("user_settings").BindFunc(h.um.InitializeUserSettings)
// empty info for systems that are paused
h.app.OnModelBeforeUpdate("systems").Add(func(e *core.ModelEvent) error {
if e.Model.(*models.Record).GetString("status") == "paused" {
e.Model.(*models.Record).Set("info", system.Info{})
h.app.OnRecordUpdate("systems").BindFunc(func(e *core.RecordEvent) error {
if e.Record.GetString("status") == "paused" {
e.Record.Set("info", system.Info{})
}
return nil
return e.Next()
})
// do things after a systems record is updated
h.app.OnModelAfterUpdate("systems").Add(func(e *core.ModelEvent) error {
newRecord := e.Model.(*models.Record)
oldRecord := newRecord.OriginalCopy()
h.app.OnRecordAfterUpdateSuccess("systems").BindFunc(func(e *core.RecordEvent) error {
newRecord := e.Record.Fresh()
oldRecord := newRecord.Original()
newStatus := newRecord.GetString("status")
// if system is disconnected and connection exists, remove it
@@ -203,15 +199,13 @@ func (h *Hub) Run() {
h.am.HandleStatusAlerts(newStatus, oldRecord)
}
return nil
return e.Next()
})
// do things after a systems record is deleted
h.app.OnModelAfterDelete("systems").Add(func(e *core.ModelEvent) error {
// if system connection exists, close it
h.deleteSystemConnection(e.Model.(*models.Record))
return nil
// if system is deleted, close connection
h.app.OnRecordAfterDeleteSuccess("systems").BindFunc(func(e *core.RecordEvent) error {
h.deleteSystemConnection(e.Record)
return e.Next()
})
if err := h.app.Start(); err != nil {
@@ -227,7 +221,7 @@ func (h *Hub) startSystemUpdateTicker() {
}
func (h *Hub) updateSystems() {
records, err := h.app.Dao().FindRecordsByFilter(
records, err := h.app.FindRecordsByFilter(
"2hz5ncl8tizk5nx", // systems collection
"status != 'paused'", // filter
"updated", // sort
@@ -256,7 +250,7 @@ func (h *Hub) updateSystems() {
}
}
func (h *Hub) updateSystem(record *models.Record) {
func (h *Hub) updateSystem(record *core.Record) {
var client *ssh.Client
var err error
@@ -291,10 +285,9 @@ func (h *Hub) updateSystem(record *models.Record) {
return
}
// update system record
dao := h.app.Dao()
record.Set("status", "up")
record.Set("info", systemData.Info)
if err := dao.SaveRecord(record); err != nil {
if err := h.app.SaveNoValidate(record); err != nil {
h.app.Logger().Error("Failed to update record: ", "err", err.Error())
}
// add system_stats and container_stats records
@@ -302,20 +295,20 @@ func (h *Hub) updateSystem(record *models.Record) {
h.app.Logger().Error("Failed to get collections: ", "err", err.Error())
} else {
// add new system_stats record
systemStatsRecord := models.NewRecord(systemStats)
systemStatsRecord := core.NewRecord(systemStats)
systemStatsRecord.Set("system", record.Id)
systemStatsRecord.Set("stats", systemData.Stats)
systemStatsRecord.Set("type", "1m")
if err := dao.SaveRecord(systemStatsRecord); err != nil {
if err := h.app.SaveNoValidate(systemStatsRecord); err != nil {
h.app.Logger().Error("Failed to save record: ", "err", err.Error())
}
// add new container_stats record
if len(systemData.Containers) > 0 {
containerStatsRecord := models.NewRecord(containerStats)
containerStatsRecord := core.NewRecord(containerStats)
containerStatsRecord.Set("system", record.Id)
containerStatsRecord.Set("stats", systemData.Containers)
containerStatsRecord.Set("type", "1m")
if err := dao.SaveRecord(containerStatsRecord); err != nil {
if err := h.app.SaveNoValidate(containerStatsRecord); err != nil {
h.app.Logger().Error("Failed to save record: ", "err", err.Error())
}
}
@@ -328,16 +321,16 @@ func (h *Hub) updateSystem(record *models.Record) {
}
// return system_stats and container_stats collections
func (h *Hub) getCollections() (*models.Collection, *models.Collection, error) {
func (h *Hub) getCollections() (*core.Collection, *core.Collection, error) {
if h.systemStats == nil {
systemStats, err := h.app.Dao().FindCollectionByNameOrId("system_stats")
systemStats, err := h.app.FindCollectionByNameOrId("system_stats")
if err != nil {
return nil, nil, err
}
h.systemStats = systemStats
}
if h.containerStats == nil {
containerStats, err := h.app.Dao().FindCollectionByNameOrId("container_stats")
containerStats, err := h.app.FindCollectionByNameOrId("container_stats")
if err != nil {
return nil, nil, err
}
@@ -347,17 +340,17 @@ func (h *Hub) getCollections() (*models.Collection, *models.Collection, error) {
}
// set system to specified status and save record
func (h *Hub) updateSystemStatus(record *models.Record, status string) {
if record.GetString("status") != status {
func (h *Hub) updateSystemStatus(record *core.Record, status string) {
if record.Fresh().GetString("status") != status {
record.Set("status", status)
if err := h.app.Dao().SaveRecord(record); err != nil {
if err := h.app.SaveNoValidate(record); err != nil {
h.app.Logger().Error("Failed to update record: ", "err", err.Error())
}
}
}
// delete system connection from map and close connection
func (h *Hub) deleteSystemConnection(record *models.Record) {
func (h *Hub) deleteSystemConnection(record *core.Record) {
if client, ok := h.systemConnections.Load(record.Id); ok {
if sshClient := client.(*ssh.Client); sshClient != nil {
sshClient.Close()
@@ -366,12 +359,12 @@ func (h *Hub) deleteSystemConnection(record *models.Record) {
}
}
func (h *Hub) createSystemConnection(record *models.Record) (*ssh.Client, error) {
client, err := ssh.Dial("tcp", net.JoinHostPort(record.GetString("host"), record.GetString("port")), h.sshClientConfig)
if err != nil {
return nil, err
}
return client, nil
func (h *Hub) createSystemConnection(record *core.Record) (*ssh.Client, error) {
client, err := ssh.Dial("tcp", net.JoinHostPort(record.GetString("host"), record.GetString("port")), h.sshClientConfig)
if err != nil {
return nil, err
}
return client, nil
}
func (h *Hub) createSSHClientConfig() error {

View File

@@ -11,8 +11,7 @@ import (
"github.com/goccy/go-json"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase"
"github.com/pocketbase/pocketbase/daos"
"github.com/pocketbase/pocketbase/models"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/tools/types"
)
@@ -41,7 +40,7 @@ func NewRecordManager(app *pocketbase.PocketBase) *RecordManager {
}
// Create longer records by averaging shorter records
func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) {
func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
// start := time.Now()
longerRecordData := []LongerRecordData{
{
@@ -71,8 +70,8 @@ func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) {
},
}
// wrap the operations in a transaction
rm.app.Dao().RunInTransaction(func(txDao *daos.Dao) error {
activeSystems, err := txDao.FindRecordsByExpr("systems", dbx.NewExp("status = 'up'"))
rm.app.RunInTransaction(func(txApp core.App) error {
activeSystems, err := txApp.FindAllRecords("systems", dbx.NewExp("status = 'up'"))
if err != nil {
log.Println("failed to get active systems", "err", err.Error())
return err
@@ -92,7 +91,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) {
for _, collection := range collections {
// check creation time of last longer record if not 10m, since 10m is created every run
if recordData.longerType != "10m" {
lastLongerRecord, err := txDao.FindFirstRecordByFilter(
lastLongerRecord, err := txApp.FindFirstRecordByFilter(
collection.Id,
"type = {:type} && system = {:system} && created > {:created}",
dbx.Params{"type": recordData.longerType, "system": system.Id, "created": longerRecordPeriod},
@@ -106,7 +105,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) {
// get shorter records from the past x minutes
var stats RecordStats
err := txDao.DB().
err := txApp.DB().
Select("stats").
From(collection.Name).
AndWhere(dbx.NewExp(
@@ -125,7 +124,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) {
continue
}
// average the shorter records and create longer record
longerRecord := models.NewRecord(collection)
longerRecord := core.NewRecord(collection)
longerRecord.Set("system", system.Id)
longerRecord.Set("type", recordData.longerType)
switch collection.Name {
@@ -134,7 +133,7 @@ func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) {
case "container_stats":
longerRecord.Set("stats", rm.AverageContainerStats(stats))
}
if err := txDao.SaveRecord(longerRecord); err != nil {
if err := txApp.SaveNoValidate(longerRecord); err != nil {
log.Println("failed to save longer record", "err", err.Error())
}
}
@@ -354,7 +353,7 @@ func (rm *RecordManager) DeleteOldRecords() {
retention: 30 * 24 * time.Hour,
},
}
db := rm.app.Dao().NonconcurrentDB()
db := rm.app.NonconcurrentDB()
for _, recordData := range recordData {
for _, collectionSlug := range collections {
formattedDate := time.Now().UTC().Add(-recordData.retention).Format(types.DefaultDateLayout)

View File

@@ -6,7 +6,6 @@ import (
"github.com/pocketbase/pocketbase"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/models"
)
type UserManager struct {
@@ -26,16 +25,17 @@ func NewUserManager(app *pocketbase.PocketBase) *UserManager {
}
}
func (um *UserManager) InitializeUserRole(e *core.ModelEvent) error {
user := e.Model.(*models.Record)
if user.GetString("role") == "" {
user.Set("role", "user")
// todo: test
func (um *UserManager) InitializeUserRole(e *core.RecordEvent) error {
if e.Record.GetString("role") == "" {
e.Record.Set("role", "user")
}
return nil
return e.Next()
}
func (um *UserManager) InitializeUserSettings(e *core.ModelEvent) error {
record := e.Model.(*models.Record)
// todo: test
func (um *UserManager) InitializeUserSettings(e *core.RecordEvent) error {
record := e.Record
// intialize settings with defaults
settings := UserSettings{
// Language: "en",
@@ -46,7 +46,7 @@ func (um *UserManager) InitializeUserSettings(e *core.ModelEvent) error {
record.UnmarshalJSONField("settings", &settings)
if len(settings.NotificationEmails) == 0 {
// get user email from auth record
if errs := um.app.Dao().ExpandRecord(record, []string{"user"}, nil); len(errs) == 0 {
if errs := um.app.ExpandRecord(record, []string{"user"}, nil); len(errs) == 0 {
// app.Logger().Error("failed to expand user relation", "errs", errs)
if user := record.ExpandedOne("user"); user != nil {
settings.NotificationEmails = []string{user.GetString("email")}
@@ -61,5 +61,5 @@ func (um *UserManager) InitializeUserSettings(e *core.ModelEvent) error {
// settings.NotificationWebhooks = []string{""}
// }
record.Set("settings", settings)
return nil
return e.Next()
}