From 2ab2cc83de516d21112d8e50f890ff1d9bd397a5 Mon Sep 17 00:00:00 2001 From: henrygd Date: Tue, 11 Feb 2025 19:17:20 -0500 Subject: [PATCH] refactor(hub): embed pocketbase fields in hub struct --- beszel/cmd/hub/hub.go | 21 +---- beszel/internal/alerts/alerts.go | 6 +- beszel/internal/hub/config.go | 18 ++-- beszel/internal/hub/hub.go | 134 ++++++++++++++++------------- beszel/internal/records/records.go | 5 +- beszel/internal/users/users.go | 5 +- 6 files changed, 90 insertions(+), 99 deletions(-) diff --git a/beszel/cmd/hub/hub.go b/beszel/cmd/hub/hub.go index 1565ffd1..10ba6261 100644 --- a/beszel/cmd/hub/hub.go +++ b/beszel/cmd/hub/hub.go @@ -1,29 +1,10 @@ package main import ( - "beszel" "beszel/internal/hub" - _ "beszel/migrations" - - "github.com/pocketbase/pocketbase" - "github.com/spf13/cobra" ) func main() { - app := pocketbase.NewWithConfig(pocketbase.Config{ - DefaultDataDir: beszel.AppName + "_data", - }) - app.RootCmd.Version = beszel.Version - app.RootCmd.Use = beszel.AppName - app.RootCmd.Short = "" - - // add update command - app.RootCmd.AddCommand(&cobra.Command{ - Use: "update", - Short: "Update " + beszel.AppName + " to the latest version", - Run: hub.Update, - }) - - hub.NewHub(app).Run() + hub.NewHub().Run() } diff --git a/beszel/internal/alerts/alerts.go b/beszel/internal/alerts/alerts.go index 13d99624..f5975bdf 100644 --- a/beszel/internal/alerts/alerts.go +++ b/beszel/internal/alerts/alerts.go @@ -12,7 +12,6 @@ import ( "github.com/containrrr/shoutrrr" "github.com/goccy/go-json" "github.com/pocketbase/dbx" - "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/apis" "github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/tools/mailer" @@ -21,7 +20,7 @@ import ( ) type AlertManager struct { - app *pocketbase.PocketBase + app core.App } type AlertMessageData struct { @@ -61,7 +60,7 @@ type SystemAlertData struct { descriptor string // override descriptor in notification body (for temp sensor, disk partition, etc) } -func NewAlertManager(app *pocketbase.PocketBase) *AlertManager { +func NewAlertManager(app core.App) *AlertManager { return &AlertManager{ app: app, } @@ -167,7 +166,6 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, systemInfo )). OrderBy("created"). All(&systemStats) - if err != nil { return err } diff --git a/beszel/internal/hub/config.go b/beszel/internal/hub/config.go index 7e7dfbd7..c59ff5ff 100644 --- a/beszel/internal/hub/config.go +++ b/beszel/internal/hub/config.go @@ -28,7 +28,7 @@ type SystemConfig struct { // Syncs systems with the config.yml file func (h *Hub) syncSystemsWithConfig() error { - configPath := filepath.Join(h.app.DataDir(), "config.yml") + configPath := filepath.Join(h.DataDir(), "config.yml") configData, err := os.ReadFile(configPath) if err != nil { return nil @@ -49,7 +49,7 @@ func (h *Hub) syncSystemsWithConfig() error { // Create a map of email to user ID userEmailToID := make(map[string]string) - users, err := h.app.FindAllRecords("users", dbx.NewExp("id != ''")) + users, err := h.FindAllRecords("users", dbx.NewExp("id != ''")) if err != nil { return err } @@ -84,7 +84,7 @@ func (h *Hub) syncSystemsWithConfig() error { } // Get existing systems - existingSystems, err := h.app.FindAllRecords("systems", dbx.NewExp("id != ''")) + existingSystems, err := h.FindAllRecords("systems", dbx.NewExp("id != ''")) if err != nil { return err } @@ -104,13 +104,13 @@ func (h *Hub) syncSystemsWithConfig() error { existingSystem.Set("name", sysConfig.Name) existingSystem.Set("users", sysConfig.Users) existingSystem.Set("port", sysConfig.Port) - if err := h.app.Save(existingSystem); err != nil { + if err := h.Save(existingSystem); err != nil { return err } delete(existingSystemsMap, key) } else { // Create new system - systemsCollection, err := h.app.FindCollectionByNameOrId("systems") + systemsCollection, err := h.FindCollectionByNameOrId("systems") if err != nil { return fmt.Errorf("failed to find systems collection: %v", err) } @@ -121,7 +121,7 @@ func (h *Hub) syncSystemsWithConfig() error { newSystem.Set("users", sysConfig.Users) newSystem.Set("info", system.Info{}) newSystem.Set("status", "pending") - if err := h.app.Save(newSystem); err != nil { + if err := h.Save(newSystem); err != nil { return fmt.Errorf("failed to create new system: %v", err) } } @@ -129,7 +129,7 @@ func (h *Hub) syncSystemsWithConfig() error { // Delete systems not in config for _, system := range existingSystemsMap { - if err := h.app.Delete(system); err != nil { + if err := h.Delete(system); err != nil { return err } } @@ -141,7 +141,7 @@ func (h *Hub) syncSystemsWithConfig() error { // Generates content for the config.yml file as a YAML string func (h *Hub) generateConfigYAML() (string, error) { // Fetch all systems from the database - systems, err := h.app.FindRecordsByFilter("systems", "id != ''", "name", -1, 0) + systems, err := h.FindRecordsByFilter("systems", "id != ''", "name", -1, 0) if err != nil { return "", err } @@ -194,7 +194,7 @@ func (h *Hub) generateConfigYAML() (string, error) { // New helper function to get a map of user IDs to emails func (h *Hub) getUserEmailMap(userIDs []string) (map[string]string, error) { - users, err := h.app.FindRecordsByIds("users", userIDs) + users, err := h.FindRecordsByIds("users", userIDs) if err != nil { return nil, err } diff --git a/beszel/internal/hub/hub.go b/beszel/internal/hub/hub.go index 8b8bff4f..b8d062f5 100644 --- a/beszel/internal/hub/hub.go +++ b/beszel/internal/hub/hub.go @@ -27,11 +27,12 @@ import ( "github.com/pocketbase/pocketbase/apis" "github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/plugins/migratecmd" + "github.com/spf13/cobra" "golang.org/x/crypto/ssh" ) type Hub struct { - app *pocketbase.PocketBase + *pocketbase.PocketBase sshClientConfig *ssh.ClientConfig pubKey string am *alerts.AlertManager @@ -42,15 +43,28 @@ type Hub struct { appURL string } -func NewHub(app *pocketbase.PocketBase) *Hub { - hub := &Hub{ - app: app, - am: alerts.NewAlertManager(app), - um: users.NewUserManager(app), - rm: records.NewRecordManager(app), - } +// NewHub creates a new Hub instance with default configuration +func NewHub() *Hub { + var hub Hub + hub.PocketBase = pocketbase.NewWithConfig(pocketbase.Config{ + DefaultDataDir: beszel.AppName + "_data", + }) + + hub.RootCmd.Version = beszel.Version + hub.RootCmd.Use = beszel.AppName + hub.RootCmd.Short = "" + // add update command + hub.RootCmd.AddCommand(&cobra.Command{ + Use: "update", + Short: "Update " + beszel.AppName + " to the latest version", + Run: Update, + }) + + hub.am = alerts.NewAlertManager(hub) + hub.um = users.NewUserManager(hub) + hub.rm = records.NewRecordManager(hub) hub.appURL, _ = GetEnv("APP_URL") - return hub + return &hub } // GetEnv retrieves an environment variable with a "BESZEL_HUB_" prefix, or falls back to the unprefixed key. @@ -67,21 +81,21 @@ func (h *Hub) Run() { isGoRun := strings.HasPrefix(os.Args[0], os.TempDir()) // enable auto creation of migration files when making collection changes in the Admin UI - migratecmd.MustRegister(h.app, h.app.RootCmd, migratecmd.Config{ + migratecmd.MustRegister(h, h.RootCmd, migratecmd.Config{ // (the isGoRun check is to enable it only during development) Automigrate: isGoRun, Dir: "../../migrations", }) // initial setup - h.app.OnServe().BindFunc(func(se *core.ServeEvent) error { + h.OnServe().BindFunc(func(se *core.ServeEvent) error { // create ssh client config err := h.createSSHClientConfig() if err != nil { log.Fatal(err) } // set general settings - settings := h.app.Settings() + settings := h.Settings() // batch requests (for global alerts) settings.Batch.Enabled = true // set URL if BASE_URL env is set @@ -89,7 +103,7 @@ func (h *Hub) Run() { settings.Meta.AppURL = h.appURL } // set auth settings - usersCollection, err := h.app.FindCollectionByNameOrId("users") + usersCollection, err := h.FindCollectionByNameOrId("users") if err != nil { return err } @@ -108,7 +122,7 @@ func (h *Hub) Run() { } else { usersCollection.CreateRule = nil } - if err := h.app.Save(usersCollection); err != nil { + if err := h.Save(usersCollection); err != nil { return err } // sync systems with config @@ -117,7 +131,7 @@ func (h *Hub) Run() { }) // serve web ui - h.app.OnServe().BindFunc(func(se *core.ServeEvent) error { + h.OnServe().BindFunc(func(se *core.ServeEvent) error { switch isGoRun { case true: proxy := httputil.NewSingleHostReverseProxy(&url.URL{ @@ -163,14 +177,14 @@ func (h *Hub) Run() { }) // set up scheduled jobs / ticker for system updates - h.app.OnServe().BindFunc(func(se *core.ServeEvent) error { + h.OnServe().BindFunc(func(se *core.ServeEvent) error { // 15 second ticker for system updates go h.startSystemUpdateTicker() // set up cron jobs // delete old records once every hour - h.app.Cron().MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords) + h.Cron().MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords) // create longer records every 10 minutes - h.app.Cron().MustAdd("create longer records", "*/10 * * * *", func() { + h.Cron().MustAdd("create longer records", "*/10 * * * *", func() { if systemStats, containerStats, err := h.getCollections(); err == nil { h.rm.CreateLongerRecords([]*core.Collection{systemStats, containerStats}) } @@ -179,7 +193,7 @@ func (h *Hub) Run() { }) // custom api routes - h.app.OnServe().BindFunc(func(se *core.ServeEvent) error { + h.OnServe().BindFunc(func(se *core.ServeEvent) error { // returns public key se.Router.GET("/api/beszel/getkey", func(e *core.RequestEvent) error { info, _ := e.RequestInfo() @@ -190,7 +204,7 @@ func (h *Hub) Run() { }) // check if first time setup on login page se.Router.GET("/api/beszel/first-run", func(e *core.RequestEvent) error { - total, err := h.app.CountRecords("users") + total, err := h.CountRecords("users") return e.JSON(http.StatusOK, map[string]bool{"firstRun": err == nil && total == 0}) }) // send test notification @@ -198,31 +212,31 @@ func (h *Hub) Run() { // API endpoint to get config.yml content se.Router.GET("/api/beszel/config-yaml", h.getYamlConfig) // create first user endpoint only needed if no users exist - if totalUsers, _ := h.app.CountRecords("users"); totalUsers == 0 { + if totalUsers, _ := h.CountRecords("users"); totalUsers == 0 { se.Router.POST("/api/beszel/create-user", h.um.CreateFirstUser) } return se.Next() }) // system creation defaults - h.app.OnRecordCreate("systems").BindFunc(func(e *core.RecordEvent) error { + h.OnRecordCreate("systems").BindFunc(func(e *core.RecordEvent) error { e.Record.Set("info", system.Info{}) e.Record.Set("status", "pending") return e.Next() }) // immediately create connection for new systems - h.app.OnRecordAfterCreateSuccess("systems").BindFunc(func(e *core.RecordEvent) error { + h.OnRecordAfterCreateSuccess("systems").BindFunc(func(e *core.RecordEvent) error { go h.updateSystem(e.Record) return e.Next() }) // handle default values for user / user_settings creation - h.app.OnRecordCreate("users").BindFunc(h.um.InitializeUserRole) - h.app.OnRecordCreate("user_settings").BindFunc(h.um.InitializeUserSettings) + h.OnRecordCreate("users").BindFunc(h.um.InitializeUserRole) + h.OnRecordCreate("user_settings").BindFunc(h.um.InitializeUserSettings) // empty info for systems that are paused - h.app.OnRecordUpdate("systems").BindFunc(func(e *core.RecordEvent) error { + h.OnRecordUpdate("systems").BindFunc(func(e *core.RecordEvent) error { if e.Record.GetString("status") == "paused" { e.Record.Set("info", system.Info{}) } @@ -230,7 +244,7 @@ func (h *Hub) Run() { }) // do things after a systems record is updated - h.app.OnRecordAfterUpdateSuccess("systems").BindFunc(func(e *core.RecordEvent) error { + h.OnRecordAfterUpdateSuccess("systems").BindFunc(func(e *core.RecordEvent) error { newRecord := e.Record.Fresh() oldRecord := newRecord.Original() newStatus := newRecord.GetString("status") @@ -250,12 +264,12 @@ func (h *Hub) Run() { }) // if system is deleted, close connection - h.app.OnRecordAfterDeleteSuccess("systems").BindFunc(func(e *core.RecordEvent) error { + h.OnRecordAfterDeleteSuccess("systems").BindFunc(func(e *core.RecordEvent) error { h.deleteSystemConnection(e.Record) return e.Next() }) - if err := h.app.Start(); err != nil { + if err := h.Start(); err != nil { log.Fatal(err) } } @@ -268,7 +282,7 @@ func (h *Hub) startSystemUpdateTicker() { } func (h *Hub) updateSystems() { - records, err := h.app.FindRecordsByFilter( + records, err := h.FindRecordsByFilter( "2hz5ncl8tizk5nx", // systems collection "status != 'paused'", // filter "updated", // sort @@ -277,7 +291,7 @@ func (h *Hub) updateSystems() { ) // log.Println("records", len(records)) if err != nil || len(records) == 0 { - // h.app.Logger().Error("Failed to query systems") + // h.Logger().Error("Failed to query systems") return } fiftySecondsAgo := time.Now().UTC().Add(-50 * time.Second) @@ -302,52 +316,52 @@ func (h *Hub) updateSystem(record *core.Record) { var err error // check if system connection exists - if existingClient, ok := h.app.Store().GetOk(record.Id); ok { + if existingClient, ok := h.Store().GetOk(record.Id); ok { client = existingClient.(*ssh.Client) } else { // create system connection client, err = h.createSystemConnection(record) if err != nil { if record.GetString("status") != "down" { - h.app.Logger().Error("Failed to connect:", "err", err.Error(), "system", record.GetString("host"), "port", record.GetString("port")) + h.Logger().Error("Failed to connect:", "err", err.Error(), "system", record.GetString("host"), "port", record.GetString("port")) h.updateSystemStatus(record, "down") } return } - h.app.Store().Set(record.Id, client) + h.Store().Set(record.Id, client) } // get system stats from agent var systemData system.CombinedData if err := h.requestJsonFromAgent(client, &systemData); err != nil { if err.Error() == "bad client" { // if previous connection was closed, try again - h.app.Logger().Error("Existing SSH connection closed. Retrying...", "host", record.GetString("host"), "port", record.GetString("port")) + h.Logger().Error("Existing SSH connection closed. Retrying...", "host", record.GetString("host"), "port", record.GetString("port")) h.deleteSystemConnection(record) time.Sleep(time.Millisecond * 100) h.updateSystem(record) return } - h.app.Logger().Error("Failed to get system stats: ", "err", err.Error()) + h.Logger().Error("Failed to get system stats: ", "err", err.Error()) h.updateSystemStatus(record, "down") return } // update system record record.Set("status", "up") record.Set("info", systemData.Info) - if err := h.app.SaveNoValidate(record); err != nil { - h.app.Logger().Error("Failed to update record: ", "err", err.Error()) + if err := h.SaveNoValidate(record); err != nil { + h.Logger().Error("Failed to update record: ", "err", err.Error()) } // add system_stats and container_stats records if systemStats, containerStats, err := h.getCollections(); err != nil { - h.app.Logger().Error("Failed to get collections: ", "err", err.Error()) + h.Logger().Error("Failed to get collections: ", "err", err.Error()) } else { // add new system_stats record systemStatsRecord := core.NewRecord(systemStats) systemStatsRecord.Set("system", record.Id) systemStatsRecord.Set("stats", systemData.Stats) systemStatsRecord.Set("type", "1m") - if err := h.app.SaveNoValidate(systemStatsRecord); err != nil { - h.app.Logger().Error("Failed to save record: ", "err", err.Error()) + if err := h.SaveNoValidate(systemStatsRecord); err != nil { + h.Logger().Error("Failed to save record: ", "err", err.Error()) } // add new container_stats record if len(systemData.Containers) > 0 { @@ -355,29 +369,29 @@ func (h *Hub) updateSystem(record *core.Record) { containerStatsRecord.Set("system", record.Id) containerStatsRecord.Set("stats", systemData.Containers) containerStatsRecord.Set("type", "1m") - if err := h.app.SaveNoValidate(containerStatsRecord); err != nil { - h.app.Logger().Error("Failed to save record: ", "err", err.Error()) + if err := h.SaveNoValidate(containerStatsRecord); err != nil { + h.Logger().Error("Failed to save record: ", "err", err.Error()) } } } // system info alerts if err := h.am.HandleSystemAlerts(record, systemData.Info, systemData.Stats.Temperatures, systemData.Stats.ExtraFs); err != nil { - h.app.Logger().Error("System alerts error", "err", err.Error()) + h.Logger().Error("System alerts error", "err", err.Error()) } } // return system_stats and container_stats collections func (h *Hub) getCollections() (*core.Collection, *core.Collection, error) { if h.systemStats == nil { - systemStats, err := h.app.FindCollectionByNameOrId("system_stats") + systemStats, err := h.FindCollectionByNameOrId("system_stats") if err != nil { return nil, nil, err } h.systemStats = systemStats } if h.containerStats == nil { - containerStats, err := h.app.FindCollectionByNameOrId("container_stats") + containerStats, err := h.FindCollectionByNameOrId("container_stats") if err != nil { return nil, nil, err } @@ -390,19 +404,19 @@ func (h *Hub) getCollections() (*core.Collection, *core.Collection, error) { func (h *Hub) updateSystemStatus(record *core.Record, status string) { if record.Fresh().GetString("status") != status { record.Set("status", status) - if err := h.app.SaveNoValidate(record); err != nil { - h.app.Logger().Error("Failed to update record: ", "err", err.Error()) + if err := h.SaveNoValidate(record); err != nil { + h.Logger().Error("Failed to update record: ", "err", err.Error()) } } } // delete system connection from map and close connection func (h *Hub) deleteSystemConnection(record *core.Record) { - if client, ok := h.app.Store().GetOk(record.Id); ok { + if client, ok := h.Store().GetOk(record.Id); ok { if sshClient := client.(*ssh.Client); sshClient != nil { sshClient.Close() } - h.app.Store().Remove(record.Id) + h.Store().Remove(record.Id) } } @@ -417,7 +431,7 @@ func (h *Hub) createSystemConnection(record *core.Record) (*ssh.Client, error) { func (h *Hub) createSSHClientConfig() error { key, err := h.getSSHKey() if err != nil { - h.app.Logger().Error("Failed to get SSH key: ", "err", err.Error()) + h.Logger().Error("Failed to get SSH key: ", "err", err.Error()) return err } @@ -494,11 +508,11 @@ func newSessionWithTimeout(client *ssh.Client, timeout time.Duration) (*ssh.Sess } func (h *Hub) getSSHKey() ([]byte, error) { - dataDir := h.app.DataDir() + dataDir := h.DataDir() // check if the key pair already exists existingKey, err := os.ReadFile(dataDir + "/id_ed25519") if err == nil { - if pubKey, err := os.ReadFile(h.app.DataDir() + "/id_ed25519.pub"); err == nil { + if pubKey, err := os.ReadFile(h.DataDir() + "/id_ed25519.pub"); err == nil { h.pubKey = strings.TrimSuffix(string(pubKey), "\n") } // return existing private key @@ -508,27 +522,27 @@ func (h *Hub) getSSHKey() ([]byte, error) { // Generate the Ed25519 key pair pubKey, privKey, err := ed25519.GenerateKey(nil) if err != nil { - // h.app.Logger().Error("Error generating key pair:", "err", err.Error()) + // h.Logger().Error("Error generating key pair:", "err", err.Error()) return nil, err } // Get the private key in OpenSSH format privKeyBytes, err := ssh.MarshalPrivateKey(privKey, "") if err != nil { - // h.app.Logger().Error("Error marshaling private key:", "err", err.Error()) + // h.Logger().Error("Error marshaling private key:", "err", err.Error()) return nil, err } // Save the private key to a file privateFile, err := os.Create(dataDir + "/id_ed25519") if err != nil { - // h.app.Logger().Error("Error creating private key file:", "err", err.Error()) + // h.Logger().Error("Error creating private key file:", "err", err.Error()) return nil, err } defer privateFile.Close() if err := pem.Encode(privateFile, privKeyBytes); err != nil { - // h.app.Logger().Error("Error writing private key to file:", "err", err.Error()) + // h.Logger().Error("Error writing private key to file:", "err", err.Error()) return nil, err } @@ -552,9 +566,9 @@ func (h *Hub) getSSHKey() ([]byte, error) { return nil, err } - h.app.Logger().Info("ed25519 SSH key pair generated successfully.") - h.app.Logger().Info("Private key saved to: " + dataDir + "/id_ed25519") - h.app.Logger().Info("Public key saved to: " + dataDir + "/id_ed25519.pub") + h.Logger().Info("ed25519 SSH key pair generated successfully.") + h.Logger().Info("Private key saved to: " + dataDir + "/id_ed25519") + h.Logger().Info("Public key saved to: " + dataDir + "/id_ed25519.pub") existingKey, err = os.ReadFile(dataDir + "/id_ed25519") if err == nil { diff --git a/beszel/internal/records/records.go b/beszel/internal/records/records.go index 82e8ebf3..9a99adc7 100644 --- a/beszel/internal/records/records.go +++ b/beszel/internal/records/records.go @@ -10,13 +10,12 @@ import ( "github.com/goccy/go-json" "github.com/pocketbase/dbx" - "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/tools/types" ) type RecordManager struct { - app *pocketbase.PocketBase + app core.App } type LongerRecordData struct { @@ -35,7 +34,7 @@ type RecordStats []struct { Stats []byte `db:"stats"` } -func NewRecordManager(app *pocketbase.PocketBase) *RecordManager { +func NewRecordManager(app core.App) *RecordManager { return &RecordManager{app} } diff --git a/beszel/internal/users/users.go b/beszel/internal/users/users.go index c2cfacb9..e200664f 100644 --- a/beszel/internal/users/users.go +++ b/beszel/internal/users/users.go @@ -6,12 +6,11 @@ import ( "log" "net/http" - "github.com/pocketbase/pocketbase" "github.com/pocketbase/pocketbase/core" ) type UserManager struct { - app *pocketbase.PocketBase + app core.App } type UserSettings struct { @@ -21,7 +20,7 @@ type UserSettings struct { // Language string `json:"lang"` } -func NewUserManager(app *pocketbase.PocketBase) *UserManager { +func NewUserManager(app core.App) *UserManager { return &UserManager{ app: app, }