diff --git a/agent/client.go b/agent/client.go index 3523117a..08984696 100644 --- a/agent/client.go +++ b/agent/client.go @@ -15,9 +15,6 @@ import ( "github.com/henrygd/beszel" "github.com/henrygd/beszel/internal/common" - "github.com/henrygd/beszel/internal/entities/smart" - "github.com/henrygd/beszel/internal/entities/system" - "github.com/henrygd/beszel/internal/entities/systemd" "github.com/fxamacker/cbor/v2" "github.com/lxzan/gws" @@ -259,40 +256,16 @@ func (client *WebSocketClient) sendMessage(data any) error { return err } -// sendResponse sends a response with optional request ID for the new protocol +// sendResponse sends a response with optional request ID. +// For ID-based requests, we must populate legacy typed fields for backward +// compatibility with older hubs (<= 0.17) that don't read the generic Data field. func (client *WebSocketClient) sendResponse(data any, requestID *uint32) error { if requestID != nil { - // New format with ID - use typed fields - response := common.AgentResponse{ - Id: requestID, - } - - // Set the appropriate typed field based on data type - switch v := data.(type) { - case *system.CombinedData: - response.SystemData = v - case *common.FingerprintResponse: - response.Fingerprint = v - case string: - response.String = &v - case map[string]smart.SmartData: - response.SmartData = v - case systemd.ServiceDetails: - response.ServiceInfo = v - // case []byte: - // response.RawBytes = v - // case string: - // response.RawBytes = []byte(v) - default: - // For any other type, convert to error - response.Error = fmt.Sprintf("unsupported response type: %T", data) - } - + response := newAgentResponse(data, requestID) return client.sendMessage(response) - } else { - // Legacy format - send data directly - return client.sendMessage(data) } + // Legacy format - send data directly + return client.sendMessage(data) } // getUserAgent returns one of two User-Agent strings based on current time. diff --git a/agent/response.go b/agent/response.go new file mode 100644 index 00000000..5bede047 --- /dev/null +++ b/agent/response.go @@ -0,0 +1,31 @@ +package agent + +import ( + "github.com/fxamacker/cbor/v2" + "github.com/henrygd/beszel/internal/common" + "github.com/henrygd/beszel/internal/entities/smart" + "github.com/henrygd/beszel/internal/entities/system" + "github.com/henrygd/beszel/internal/entities/systemd" +) + +// newAgentResponse creates an AgentResponse using legacy typed fields. +// This maintains backward compatibility with <= 0.17 hubs that expect specific fields. +func newAgentResponse(data any, requestID *uint32) common.AgentResponse { + response := common.AgentResponse{Id: requestID} + switch v := data.(type) { + case *system.CombinedData: + response.SystemData = v + case *common.FingerprintResponse: + response.Fingerprint = v + case string: + response.String = &v + case map[string]smart.SmartData: + response.SmartData = v + case systemd.ServiceDetails: + response.ServiceInfo = v + default: + // For unknown types, use the generic Data field + response.Data, _ = cbor.Marshal(data) + } + return response +} diff --git a/agent/server.go b/agent/server.go index b1c15abf..c700bbf3 100644 --- a/agent/server.go +++ b/agent/server.go @@ -13,9 +13,7 @@ import ( "github.com/henrygd/beszel" "github.com/henrygd/beszel/internal/common" - "github.com/henrygd/beszel/internal/entities/smart" "github.com/henrygd/beszel/internal/entities/system" - "github.com/henrygd/beszel/internal/entities/systemd" "github.com/blang/semver" "github.com/fxamacker/cbor/v2" @@ -165,20 +163,9 @@ func (a *Agent) handleSSHRequest(w io.Writer, req *common.HubRequest[cbor.RawMes } // responder that writes AgentResponse to stdout + // Uses legacy typed fields for backward compatibility with <= 0.17 sshResponder := func(data any, requestID *uint32) error { - response := common.AgentResponse{Id: requestID} - switch v := data.(type) { - case *system.CombinedData: - response.SystemData = v - case string: - response.String = &v - case map[string]smart.SmartData: - response.SmartData = v - case systemd.ServiceDetails: - response.ServiceInfo = v - default: - response.Error = fmt.Sprintf("unsupported response type: %T", data) - } + response := newAgentResponse(data, requestID) return cbor.NewEncoder(w).Encode(response) } diff --git a/beszel.go b/beszel.go index d71abac4..f3d4d9b7 100644 --- a/beszel.go +++ b/beszel.go @@ -6,7 +6,7 @@ import "github.com/blang/semver" const ( // Version is the current version of the application. - Version = "0.18.0-beta.1" + Version = "0.18.0-beta.2" // AppName is the name of the application. AppName = "beszel" ) diff --git a/internal/common/common-ws.go b/internal/common/common-ws.go index 08237362..4f2f5d22 100644 --- a/internal/common/common-ws.go +++ b/internal/common/common-ws.go @@ -1,6 +1,7 @@ package common import ( + "github.com/fxamacker/cbor/v2" "github.com/henrygd/beszel/internal/entities/smart" "github.com/henrygd/beszel/internal/entities/system" "github.com/henrygd/beszel/internal/entities/systemd" @@ -34,14 +35,14 @@ type HubRequest[T any] struct { // AgentResponse defines the structure for responses sent from agent to hub. type AgentResponse struct { Id *uint32 `cbor:"0,keyasint,omitempty"` - SystemData *system.CombinedData `cbor:"1,keyasint,omitempty,omitzero"` - Fingerprint *FingerprintResponse `cbor:"2,keyasint,omitempty,omitzero"` + SystemData *system.CombinedData `cbor:"1,keyasint,omitempty,omitzero"` // Legacy (<= 0.17) + Fingerprint *FingerprintResponse `cbor:"2,keyasint,omitempty,omitzero"` // Legacy (<= 0.17) Error string `cbor:"3,keyasint,omitempty,omitzero"` - String *string `cbor:"4,keyasint,omitempty,omitzero"` - SmartData map[string]smart.SmartData `cbor:"5,keyasint,omitempty,omitzero"` - ServiceInfo systemd.ServiceDetails `cbor:"6,keyasint,omitempty,omitzero"` - // Logs *LogsPayload `cbor:"4,keyasint,omitempty,omitzero"` - // RawBytes []byte `cbor:"4,keyasint,omitempty,omitzero"` + String *string `cbor:"4,keyasint,omitempty,omitzero"` // Legacy (<= 0.17) + SmartData map[string]smart.SmartData `cbor:"5,keyasint,omitempty,omitzero"` // Legacy (<= 0.17) + ServiceInfo systemd.ServiceDetails `cbor:"6,keyasint,omitempty,omitzero"` // Legacy (<= 0.17) + // Data is the generic response payload for new endpoints (0.18+) + Data cbor.RawMessage `cbor:"7,keyasint,omitempty,omitzero"` } type FingerprintRequest struct { diff --git a/internal/hub/systems/system.go b/internal/hub/systems/system.go index 89632515..0c02e61d 100644 --- a/internal/hub/systems/system.go +++ b/internal/hub/systems/system.go @@ -13,9 +13,11 @@ import ( "time" "github.com/henrygd/beszel/internal/common" + "github.com/henrygd/beszel/internal/hub/transport" "github.com/henrygd/beszel/internal/hub/ws" "github.com/henrygd/beszel/internal/entities/container" + "github.com/henrygd/beszel/internal/entities/smart" "github.com/henrygd/beszel/internal/entities/system" "github.com/henrygd/beszel/internal/entities/systemd" @@ -23,28 +25,30 @@ import ( "github.com/blang/semver" "github.com/fxamacker/cbor/v2" + "github.com/lxzan/gws" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/core" "golang.org/x/crypto/ssh" ) type System struct { - Id string `db:"id"` - Host string `db:"host"` - Port string `db:"port"` - Status string `db:"status"` - manager *SystemManager // Manager that this system belongs to - client *ssh.Client // SSH client for fetching data - data *system.CombinedData // system data from agent - ctx context.Context // Context for stopping the updater - cancel context.CancelFunc // Stops and removes system from updater - WsConn *ws.WsConn // Handler for agent WebSocket connection - agentVersion semver.Version // Agent version - updateTicker *time.Ticker // Ticker for updating the system - detailsFetched atomic.Bool // True if static system details have been fetched and saved - smartFetching atomic.Bool // True if SMART devices are currently being fetched - smartInterval time.Duration // Interval for periodic SMART data updates - lastSmartFetch atomic.Int64 // Unix milliseconds of last SMART data fetch + Id string `db:"id"` + Host string `db:"host"` + Port string `db:"port"` + Status string `db:"status"` + manager *SystemManager // Manager that this system belongs to + client *ssh.Client // SSH client for fetching data + sshTransport *transport.SSHTransport // SSH transport for requests + data *system.CombinedData // system data from agent + ctx context.Context // Context for stopping the updater + cancel context.CancelFunc // Stops and removes system from updater + WsConn *ws.WsConn // Handler for agent WebSocket connection + agentVersion semver.Version // Agent version + updateTicker *time.Ticker // Ticker for updating the system + detailsFetched atomic.Bool // True if static system details have been fetched and saved + smartFetching atomic.Bool // True if SMART devices are currently being fetched + smartInterval time.Duration // Interval for periodic SMART data updates + lastSmartFetch atomic.Int64 // Unix milliseconds of last SMART data fetch } func (sm *SystemManager) NewSystem(systemId string) *System { @@ -359,8 +363,78 @@ func (sys *System) getContext() (context.Context, context.CancelFunc) { return sys.ctx, sys.cancel } -// fetchDataFromAgent attempts to fetch data from the agent, -// prioritizing WebSocket if available. +// request sends a request to the agent, trying WebSocket first, then SSH. +// This is the unified request method that uses the transport abstraction. +func (sys *System) request(ctx context.Context, action common.WebSocketAction, req any, dest any) error { + // Try WebSocket first + if sys.WsConn != nil && sys.WsConn.IsConnected() { + wsTransport := transport.NewWebSocketTransport(sys.WsConn) + if err := wsTransport.Request(ctx, action, req, dest); err == nil { + return nil + } else if !shouldFallbackToSSH(err) { + return err + } else if shouldCloseWebSocket(err) { + sys.closeWebSocketConnection() + } + } + + // Fall back to SSH if WebSocket fails + if err := sys.ensureSSHTransport(); err != nil { + return err + } + err := sys.sshTransport.RequestWithRetry(ctx, action, req, dest, 1) + // Keep legacy SSH client/version fields in sync for other code paths. + if sys.sshTransport != nil { + sys.client = sys.sshTransport.GetClient() + sys.agentVersion = sys.sshTransport.GetAgentVersion() + } + return err +} + +func shouldFallbackToSSH(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return true + } + if errors.Is(err, gws.ErrConnClosed) { + return true + } + return errors.Is(err, transport.ErrWebSocketNotConnected) +} + +func shouldCloseWebSocket(err error) bool { + if err == nil { + return false + } + return errors.Is(err, gws.ErrConnClosed) || errors.Is(err, transport.ErrWebSocketNotConnected) +} + +// ensureSSHTransport ensures the SSH transport is initialized and connected. +func (sys *System) ensureSSHTransport() error { + if sys.sshTransport == nil { + if sys.manager.sshConfig == nil { + if err := sys.manager.createSSHClientConfig(); err != nil { + return err + } + } + sys.sshTransport = transport.NewSSHTransport(transport.SSHTransportConfig{ + Host: sys.Host, + Port: sys.Port, + Config: sys.manager.sshConfig, + Timeout: 4 * time.Second, + }) + } + // Sync client state with transport + if sys.client != nil { + sys.sshTransport.SetClient(sys.client) + sys.sshTransport.SetAgentVersion(sys.agentVersion) + } + return nil +} + +// fetchDataFromAgent attempts to fetch data from the agent, prioritizing WebSocket if available. func (sys *System) fetchDataFromAgent(options common.DataRequestOptions) (*system.CombinedData, error) { if sys.data == nil { sys.data = &system.CombinedData{} @@ -386,112 +460,47 @@ func (sys *System) fetchDataViaWebSocket(options common.DataRequestOptions) (*sy if sys.WsConn == nil || !sys.WsConn.IsConnected() { return nil, errors.New("no websocket connection") } - err := sys.WsConn.RequestSystemData(context.Background(), sys.data, options) + wsTransport := transport.NewWebSocketTransport(sys.WsConn) + err := wsTransport.Request(context.Background(), common.GetData, options, sys.data) if err != nil { return nil, err } return sys.data, nil } -// fetchStringFromAgentViaSSH is a generic function to fetch strings via SSH -func (sys *System) fetchStringFromAgentViaSSH(action common.WebSocketAction, requestData any, errorMsg string) (string, error) { - var result string - err := sys.runSSHOperation(4*time.Second, 1, func(session *ssh.Session) (bool, error) { - stdout, err := session.StdoutPipe() - if err != nil { - return false, err - } - stdin, stdinErr := session.StdinPipe() - if stdinErr != nil { - return false, stdinErr - } - if err := session.Shell(); err != nil { - return false, err - } - req := common.HubRequest[any]{Action: action, Data: requestData} - _ = cbor.NewEncoder(stdin).Encode(req) - _ = stdin.Close() - var resp common.AgentResponse - err = cbor.NewDecoder(stdout).Decode(&resp) - if err != nil { - return false, err - } - if resp.String == nil { - return false, errors.New(errorMsg) - } - result = *resp.String - return false, nil - }) - return result, err -} - // FetchContainerInfoFromAgent fetches container info from the agent func (sys *System) FetchContainerInfoFromAgent(containerID string) (string, error) { - // fetch via websocket - if sys.WsConn != nil && sys.WsConn.IsConnected() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return sys.WsConn.RequestContainerInfo(ctx, containerID) - } - // fetch via SSH - return sys.fetchStringFromAgentViaSSH(common.GetContainerInfo, common.ContainerInfoRequest{ContainerID: containerID}, "no info in response") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var result string + err := sys.request(ctx, common.GetContainerInfo, common.ContainerInfoRequest{ContainerID: containerID}, &result) + return result, err } // FetchContainerLogsFromAgent fetches container logs from the agent func (sys *System) FetchContainerLogsFromAgent(containerID string) (string, error) { - // fetch via websocket - if sys.WsConn != nil && sys.WsConn.IsConnected() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return sys.WsConn.RequestContainerLogs(ctx, containerID) - } - // fetch via SSH - return sys.fetchStringFromAgentViaSSH(common.GetContainerLogs, common.ContainerLogsRequest{ContainerID: containerID}, "no logs in response") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var result string + err := sys.request(ctx, common.GetContainerLogs, common.ContainerLogsRequest{ContainerID: containerID}, &result) + return result, err } // FetchSystemdInfoFromAgent fetches detailed systemd service information from the agent func (sys *System) FetchSystemdInfoFromAgent(serviceName string) (systemd.ServiceDetails, error) { - // fetch via websocket - if sys.WsConn != nil && sys.WsConn.IsConnected() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return sys.WsConn.RequestSystemdInfo(ctx, serviceName) - } - + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() var result systemd.ServiceDetails - err := sys.runSSHOperation(5*time.Second, 1, func(session *ssh.Session) (bool, error) { - stdout, err := session.StdoutPipe() - if err != nil { - return false, err - } - stdin, stdinErr := session.StdinPipe() - if stdinErr != nil { - return false, stdinErr - } - if err := session.Shell(); err != nil { - return false, err - } - - req := common.HubRequest[any]{Action: common.GetSystemdInfo, Data: common.SystemdInfoRequest{ServiceName: serviceName}} - if err := cbor.NewEncoder(stdin).Encode(req); err != nil { - return false, err - } - _ = stdin.Close() - - var resp common.AgentResponse - if err := cbor.NewDecoder(stdout).Decode(&resp); err != nil { - return false, err - } - if resp.ServiceInfo == nil { - if resp.Error != "" { - return false, errors.New(resp.Error) - } - return false, errors.New("no systemd info in response") - } - result = resp.ServiceInfo - return false, nil - }) + err := sys.request(ctx, common.GetSystemdInfo, common.SystemdInfoRequest{ServiceName: serviceName}, &result) + return result, err +} +// FetchSmartDataFromAgent fetches SMART data from the agent +func (sys *System) FetchSmartDataFromAgent() (map[string]smart.SmartData, error) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + var result map[string]smart.SmartData + err := sys.request(ctx, common.GetSmartData, nil, &result) return result, err } @@ -656,6 +665,9 @@ func (sys *System) createSessionWithTimeout(timeout time.Duration) (*ssh.Session // closeSSHConnection closes the SSH connection but keeps the system in the manager func (sys *System) closeSSHConnection() { + if sys.sshTransport != nil { + sys.sshTransport.Close() + } if sys.client != nil { sys.client.Close() sys.client = nil diff --git a/internal/hub/systems/system_smart.go b/internal/hub/systems/system_smart.go index c3393464..ca38f5b9 100644 --- a/internal/hub/systems/system_smart.go +++ b/internal/hub/systems/system_smart.go @@ -1,54 +1,14 @@ package systems import ( - "context" "database/sql" "errors" "strings" - "time" - "github.com/fxamacker/cbor/v2" - "github.com/henrygd/beszel/internal/common" "github.com/henrygd/beszel/internal/entities/smart" "github.com/pocketbase/pocketbase/core" - "golang.org/x/crypto/ssh" ) -// FetchSmartDataFromAgent fetches SMART data from the agent -func (sys *System) FetchSmartDataFromAgent() (map[string]smart.SmartData, error) { - // fetch via websocket - if sys.WsConn != nil && sys.WsConn.IsConnected() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return sys.WsConn.RequestSmartData(ctx) - } - // fetch via SSH - var result map[string]smart.SmartData - err := sys.runSSHOperation(5*time.Second, 1, func(session *ssh.Session) (bool, error) { - stdout, err := session.StdoutPipe() - if err != nil { - return false, err - } - stdin, stdinErr := session.StdinPipe() - if stdinErr != nil { - return false, stdinErr - } - if err := session.Shell(); err != nil { - return false, err - } - req := common.HubRequest[any]{Action: common.GetSmartData} - _ = cbor.NewEncoder(stdin).Encode(req) - _ = stdin.Close() - var resp common.AgentResponse - if err := cbor.NewDecoder(stdout).Decode(&resp); err != nil { - return false, err - } - result = resp.SmartData - return false, nil - }) - return result, err -} - // FetchAndSaveSmartDevices fetches SMART data from the agent and saves it to the database func (sys *System) FetchAndSaveSmartDevices() error { smartData, err := sys.FetchSmartDataFromAgent() diff --git a/internal/hub/transport/ssh.go b/internal/hub/transport/ssh.go new file mode 100644 index 00000000..840c5564 --- /dev/null +++ b/internal/hub/transport/ssh.go @@ -0,0 +1,227 @@ +package transport + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "strings" + "time" + + "github.com/blang/semver" + "github.com/fxamacker/cbor/v2" + "github.com/henrygd/beszel/internal/common" + "golang.org/x/crypto/ssh" +) + +// SSHTransport implements Transport over SSH connections. +type SSHTransport struct { + client *ssh.Client + config *ssh.ClientConfig + host string + port string + agentVersion semver.Version + timeout time.Duration +} + +// SSHTransportConfig holds configuration for creating an SSH transport. +type SSHTransportConfig struct { + Host string + Port string + Config *ssh.ClientConfig + AgentVersion semver.Version + Timeout time.Duration +} + +// NewSSHTransport creates a new SSH transport with the given configuration. +func NewSSHTransport(cfg SSHTransportConfig) *SSHTransport { + timeout := cfg.Timeout + if timeout == 0 { + timeout = 4 * time.Second + } + return &SSHTransport{ + config: cfg.Config, + host: cfg.Host, + port: cfg.Port, + agentVersion: cfg.AgentVersion, + timeout: timeout, + } +} + +// SetClient sets the SSH client for reuse across requests. +func (t *SSHTransport) SetClient(client *ssh.Client) { + t.client = client +} + +// SetAgentVersion sets the agent version (extracted from SSH handshake). +func (t *SSHTransport) SetAgentVersion(version semver.Version) { + t.agentVersion = version +} + +// GetClient returns the current SSH client (for connection management). +func (t *SSHTransport) GetClient() *ssh.Client { + return t.client +} + +// GetAgentVersion returns the agent version. +func (t *SSHTransport) GetAgentVersion() semver.Version { + return t.agentVersion +} + +// Request sends a request to the agent via SSH and unmarshals the response. +func (t *SSHTransport) Request(ctx context.Context, action common.WebSocketAction, req any, dest any) error { + if t.client == nil { + if err := t.connect(); err != nil { + return err + } + } + + session, err := t.createSessionWithTimeout(ctx) + if err != nil { + return err + } + defer session.Close() + + stdout, err := session.StdoutPipe() + if err != nil { + return err + } + stdin, err := session.StdinPipe() + if err != nil { + return err + } + if err := session.Shell(); err != nil { + return err + } + + // Send request + hubReq := common.HubRequest[any]{Action: action, Data: req} + if err := cbor.NewEncoder(stdin).Encode(hubReq); err != nil { + return fmt.Errorf("failed to encode request: %w", err) + } + stdin.Close() + + // Read response + var resp common.AgentResponse + if err := cbor.NewDecoder(stdout).Decode(&resp); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + + if resp.Error != "" { + return errors.New(resp.Error) + } + + if err := session.Wait(); err != nil { + return err + } + + return UnmarshalResponse(resp, action, dest) +} + +// IsConnected returns true if the SSH connection is active. +func (t *SSHTransport) IsConnected() bool { + return t.client != nil +} + +// Close terminates the SSH connection. +func (t *SSHTransport) Close() { + if t.client != nil { + t.client.Close() + t.client = nil + } +} + +// connect establishes a new SSH connection. +func (t *SSHTransport) connect() error { + if t.config == nil { + return errors.New("SSH config not set") + } + + network := "tcp" + host := t.host + if strings.HasPrefix(host, "/") { + network = "unix" + } else { + host = net.JoinHostPort(host, t.port) + } + + client, err := ssh.Dial(network, host, t.config) + if err != nil { + return err + } + t.client = client + + // Extract agent version from server version string + t.agentVersion, _ = extractAgentVersion(string(client.Conn.ServerVersion())) + return nil +} + +// createSessionWithTimeout creates a new SSH session with a timeout. +func (t *SSHTransport) createSessionWithTimeout(ctx context.Context) (*ssh.Session, error) { + if t.client == nil { + return nil, errors.New("client not initialized") + } + + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + + sessionChan := make(chan *ssh.Session, 1) + errChan := make(chan error, 1) + + go func() { + session, err := t.client.NewSession() + if err != nil { + errChan <- err + } else { + sessionChan <- session + } + }() + + select { + case session := <-sessionChan: + return session, nil + case err := <-errChan: + return nil, err + case <-ctx.Done(): + return nil, errors.New("timeout creating session") + } +} + +// extractAgentVersion extracts the beszel version from SSH server version string. +func extractAgentVersion(versionString string) (semver.Version, error) { + _, after, _ := strings.Cut(versionString, "_") + return semver.Parse(after) +} + +// RequestWithRetry sends a request with automatic retry on connection failures. +func (t *SSHTransport) RequestWithRetry(ctx context.Context, action common.WebSocketAction, req any, dest any, retries int) error { + var lastErr error + for attempt := 0; attempt <= retries; attempt++ { + err := t.Request(ctx, action, req, dest) + if err == nil { + return nil + } + lastErr = err + + // Check if it's a connection error that warrants a retry + if isConnectionError(err) && attempt < retries { + t.Close() + continue + } + return err + } + return lastErr +} + +// isConnectionError checks if an error indicates a connection problem. +func isConnectionError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "connection") || + strings.Contains(errStr, "EOF") || + strings.Contains(errStr, "closed") || + errors.Is(err, io.EOF) +} diff --git a/internal/hub/transport/transport.go b/internal/hub/transport/transport.go new file mode 100644 index 00000000..3b92e1f1 --- /dev/null +++ b/internal/hub/transport/transport.go @@ -0,0 +1,112 @@ +// Package transport provides a unified abstraction for hub-agent communication +// over different transports (WebSocket, SSH). +package transport + +import ( + "context" + "errors" + "fmt" + + "github.com/fxamacker/cbor/v2" + "github.com/henrygd/beszel/internal/common" + "github.com/henrygd/beszel/internal/entities/smart" + "github.com/henrygd/beszel/internal/entities/system" + "github.com/henrygd/beszel/internal/entities/systemd" +) + +// Transport defines the interface for hub-agent communication. +// Both WebSocket and SSH transports implement this interface. +type Transport interface { + // Request sends a request to the agent and unmarshals the response into dest. + // The dest parameter should be a pointer to the expected response type. + Request(ctx context.Context, action common.WebSocketAction, req any, dest any) error + // IsConnected returns true if the transport connection is active. + IsConnected() bool + // Close terminates the transport connection. + Close() +} + +// UnmarshalResponse unmarshals an AgentResponse into the destination type. +// It first checks the generic Data field (0.19+ agents), then falls back +// to legacy typed fields for backward compatibility with 0.18.0 agents. +func UnmarshalResponse(resp common.AgentResponse, action common.WebSocketAction, dest any) error { + if dest == nil { + return errors.New("nil destination") + } + // Try generic Data field first (0.19+) + if len(resp.Data) > 0 { + if err := cbor.Unmarshal(resp.Data, dest); err != nil { + return fmt.Errorf("failed to unmarshal generic response data: %w", err) + } + return nil + } + // Fall back to legacy typed fields for older agents/hubs. + return unmarshalLegacyResponse(resp, action, dest) +} + +// unmarshalLegacyResponse handles legacy responses that use typed fields. +func unmarshalLegacyResponse(resp common.AgentResponse, action common.WebSocketAction, dest any) error { + switch action { + case common.GetData: + d, ok := dest.(*system.CombinedData) + if !ok { + return fmt.Errorf("unexpected dest type for GetData: %T", dest) + } + if resp.SystemData == nil { + return errors.New("no system data in response") + } + *d = *resp.SystemData + return nil + case common.CheckFingerprint: + d, ok := dest.(*common.FingerprintResponse) + if !ok { + return fmt.Errorf("unexpected dest type for CheckFingerprint: %T", dest) + } + if resp.Fingerprint == nil { + return errors.New("no fingerprint in response") + } + *d = *resp.Fingerprint + return nil + case common.GetContainerLogs: + d, ok := dest.(*string) + if !ok { + return fmt.Errorf("unexpected dest type for GetContainerLogs: %T", dest) + } + if resp.String == nil { + return errors.New("no logs in response") + } + *d = *resp.String + return nil + case common.GetContainerInfo: + d, ok := dest.(*string) + if !ok { + return fmt.Errorf("unexpected dest type for GetContainerInfo: %T", dest) + } + if resp.String == nil { + return errors.New("no info in response") + } + *d = *resp.String + return nil + case common.GetSmartData: + d, ok := dest.(*map[string]smart.SmartData) + if !ok { + return fmt.Errorf("unexpected dest type for GetSmartData: %T", dest) + } + if resp.SmartData == nil { + return errors.New("no SMART data in response") + } + *d = resp.SmartData + return nil + case common.GetSystemdInfo: + d, ok := dest.(*systemd.ServiceDetails) + if !ok { + return fmt.Errorf("unexpected dest type for GetSystemdInfo: %T", dest) + } + if resp.ServiceInfo == nil { + return errors.New("no systemd info in response") + } + *d = resp.ServiceInfo + return nil + } + return fmt.Errorf("unsupported action: %d", action) +} diff --git a/internal/hub/transport/websocket.go b/internal/hub/transport/websocket.go new file mode 100644 index 00000000..05aae84e --- /dev/null +++ b/internal/hub/transport/websocket.go @@ -0,0 +1,74 @@ +package transport + +import ( + "context" + "errors" + + "github.com/fxamacker/cbor/v2" + "github.com/henrygd/beszel" + "github.com/henrygd/beszel/internal/common" + "github.com/henrygd/beszel/internal/hub/ws" +) + +// ErrWebSocketNotConnected indicates a WebSocket transport is not currently connected. +var ErrWebSocketNotConnected = errors.New("websocket not connected") + +// WebSocketTransport implements Transport over WebSocket connections. +type WebSocketTransport struct { + wsConn *ws.WsConn +} + +// NewWebSocketTransport creates a new WebSocket transport wrapper. +func NewWebSocketTransport(wsConn *ws.WsConn) *WebSocketTransport { + return &WebSocketTransport{wsConn: wsConn} +} + +// Request sends a request to the agent via WebSocket and unmarshals the response. +func (t *WebSocketTransport) Request(ctx context.Context, action common.WebSocketAction, req any, dest any) error { + if !t.IsConnected() { + return ErrWebSocketNotConnected + } + + pendingReq, err := t.wsConn.SendRequest(ctx, action, req) + if err != nil { + return err + } + + // Wait for response + select { + case message := <-pendingReq.ResponseCh: + defer message.Close() + defer pendingReq.Cancel() + + // Legacy agents (< MinVersionAgentResponse) respond with a raw payload instead of an AgentResponse wrapper. + if t.wsConn.AgentVersion().LT(beszel.MinVersionAgentResponse) { + return cbor.Unmarshal(message.Data.Bytes(), dest) + } + + var agentResponse common.AgentResponse + if err := cbor.Unmarshal(message.Data.Bytes(), &agentResponse); err != nil { + return err + } + + if agentResponse.Error != "" { + return errors.New(agentResponse.Error) + } + + return UnmarshalResponse(agentResponse, action, dest) + + case <-pendingReq.Context.Done(): + return pendingReq.Context.Err() + } +} + +// IsConnected returns true if the WebSocket connection is active. +func (t *WebSocketTransport) IsConnected() bool { + return t.wsConn != nil && t.wsConn.IsConnected() +} + +// Close terminates the WebSocket connection. +func (t *WebSocketTransport) Close() { + if t.wsConn != nil { + t.wsConn.Close(nil) + } +} diff --git a/internal/hub/ws/handlers.go b/internal/hub/ws/handlers.go index d9472279..b67c728f 100644 --- a/internal/hub/ws/handlers.go +++ b/internal/hub/ws/handlers.go @@ -6,14 +6,12 @@ import ( "github.com/fxamacker/cbor/v2" "github.com/henrygd/beszel/internal/common" - "github.com/henrygd/beszel/internal/entities/smart" - "github.com/henrygd/beszel/internal/entities/system" - "github.com/henrygd/beszel/internal/entities/systemd" "github.com/lxzan/gws" "golang.org/x/crypto/ssh" ) -// ResponseHandler defines interface for handling agent responses +// ResponseHandler defines interface for handling agent responses. +// This is used by handleAgentRequest for legacy response handling. type ResponseHandler interface { Handle(agentResponse common.AgentResponse) error HandleLegacy(rawData []byte) error @@ -27,167 +25,7 @@ func (h *BaseHandler) HandleLegacy(rawData []byte) error { } //////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// - -// systemDataHandler implements ResponseHandler for system data requests -type systemDataHandler struct { - data *system.CombinedData -} - -func (h *systemDataHandler) HandleLegacy(rawData []byte) error { - return cbor.Unmarshal(rawData, h.data) -} - -func (h *systemDataHandler) Handle(agentResponse common.AgentResponse) error { - if agentResponse.SystemData != nil { - *h.data = *agentResponse.SystemData - } - return nil -} - -// RequestSystemData requests system metrics from the agent and unmarshals the response. -func (ws *WsConn) RequestSystemData(ctx context.Context, data *system.CombinedData, options common.DataRequestOptions) error { - if !ws.IsConnected() { - return gws.ErrConnClosed - } - - req, err := ws.requestManager.SendRequest(ctx, common.GetData, options) - if err != nil { - return err - } - - handler := &systemDataHandler{data: data} - return ws.handleAgentRequest(req, handler) -} - -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// - -// stringResponseHandler is a generic handler for string responses from agents -type stringResponseHandler struct { - BaseHandler - value string - errorMsg string -} - -func (h *stringResponseHandler) Handle(agentResponse common.AgentResponse) error { - if agentResponse.String == nil { - return errors.New(h.errorMsg) - } - h.value = *agentResponse.String - return nil -} - -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// - -// requestContainerStringViaWS is a generic function to request container-related strings via WebSocket -func (ws *WsConn) requestContainerStringViaWS(ctx context.Context, action common.WebSocketAction, requestData any, errorMsg string) (string, error) { - if !ws.IsConnected() { - return "", gws.ErrConnClosed - } - - req, err := ws.requestManager.SendRequest(ctx, action, requestData) - if err != nil { - return "", err - } - - handler := &stringResponseHandler{errorMsg: errorMsg} - if err := ws.handleAgentRequest(req, handler); err != nil { - return "", err - } - - return handler.value, nil -} - -// RequestContainerLogs requests logs for a specific container via WebSocket. -func (ws *WsConn) RequestContainerLogs(ctx context.Context, containerID string) (string, error) { - return ws.requestContainerStringViaWS(ctx, common.GetContainerLogs, common.ContainerLogsRequest{ContainerID: containerID}, "no logs in response") -} - -// RequestContainerInfo requests information about a specific container via WebSocket. -func (ws *WsConn) RequestContainerInfo(ctx context.Context, containerID string) (string, error) { - return ws.requestContainerStringViaWS(ctx, common.GetContainerInfo, common.ContainerInfoRequest{ContainerID: containerID}, "no info in response") -} - -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// - -// RequestSystemdInfo requests detailed information about a systemd service via WebSocket. -func (ws *WsConn) RequestSystemdInfo(ctx context.Context, serviceName string) (systemd.ServiceDetails, error) { - if !ws.IsConnected() { - return nil, gws.ErrConnClosed - } - - req, err := ws.requestManager.SendRequest(ctx, common.GetSystemdInfo, common.SystemdInfoRequest{ServiceName: serviceName}) - if err != nil { - return nil, err - } - - var result systemd.ServiceDetails - handler := &systemdInfoHandler{result: &result} - if err := ws.handleAgentRequest(req, handler); err != nil { - return nil, err - } - - return result, nil -} - -// systemdInfoHandler parses ServiceDetails from AgentResponse -type systemdInfoHandler struct { - BaseHandler - result *systemd.ServiceDetails -} - -func (h *systemdInfoHandler) Handle(agentResponse common.AgentResponse) error { - if agentResponse.ServiceInfo == nil { - return errors.New("no systemd info in response") - } - *h.result = agentResponse.ServiceInfo - return nil -} - -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// - -// RequestSmartData requests SMART data via WebSocket. -func (ws *WsConn) RequestSmartData(ctx context.Context) (map[string]smart.SmartData, error) { - if !ws.IsConnected() { - return nil, gws.ErrConnClosed - } - req, err := ws.requestManager.SendRequest(ctx, common.GetSmartData, nil) - if err != nil { - return nil, err - } - var result map[string]smart.SmartData - handler := ResponseHandler(&smartDataHandler{result: &result}) - if err := ws.handleAgentRequest(req, handler); err != nil { - return nil, err - } - return result, nil -} - -// smartDataHandler parses SMART data map from AgentResponse -type smartDataHandler struct { - BaseHandler - result *map[string]smart.SmartData -} - -func (h *smartDataHandler) Handle(agentResponse common.AgentResponse) error { - if agentResponse.SmartData == nil { - return errors.New("no SMART data in response") - } - *h.result = agentResponse.SmartData - return nil -} - -//////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////// +// Fingerprint handling (used for WebSocket authentication) //////////////////////////////////////////////////////////////////////////// // fingerprintHandler implements ResponseHandler for fingerprint requests diff --git a/internal/hub/ws/handlers_test.go b/internal/hub/ws/handlers_test.go deleted file mode 100644 index 0ca31d94..00000000 --- a/internal/hub/ws/handlers_test.go +++ /dev/null @@ -1,75 +0,0 @@ -//go:build testing - -package ws - -import ( - "testing" - - "github.com/henrygd/beszel/internal/common" - "github.com/henrygd/beszel/internal/entities/systemd" - "github.com/stretchr/testify/assert" -) - -func TestSystemdInfoHandlerSuccess(t *testing.T) { - handler := &systemdInfoHandler{ - result: &systemd.ServiceDetails{}, - } - - // Test successful handling with valid ServiceInfo - testDetails := systemd.ServiceDetails{ - "Id": "nginx.service", - "ActiveState": "active", - "SubState": "running", - "Description": "A high performance web server", - "ExecMainPID": 1234, - "MemoryCurrent": 1024000, - } - - response := common.AgentResponse{ - ServiceInfo: testDetails, - } - - err := handler.Handle(response) - assert.NoError(t, err) - assert.Equal(t, testDetails, *handler.result) -} - -func TestSystemdInfoHandlerError(t *testing.T) { - handler := &systemdInfoHandler{ - result: &systemd.ServiceDetails{}, - } - - // Test error handling when ServiceInfo is nil - response := common.AgentResponse{ - ServiceInfo: nil, - Error: "service not found", - } - - err := handler.Handle(response) - assert.Error(t, err) - assert.Equal(t, "no systemd info in response", err.Error()) -} - -func TestSystemdInfoHandlerEmptyResponse(t *testing.T) { - handler := &systemdInfoHandler{ - result: &systemd.ServiceDetails{}, - } - - // Test with completely empty response - response := common.AgentResponse{} - - err := handler.Handle(response) - assert.Error(t, err) - assert.Equal(t, "no systemd info in response", err.Error()) -} - -func TestSystemdInfoHandlerLegacyNotSupported(t *testing.T) { - handler := &systemdInfoHandler{ - result: &systemd.ServiceDetails{}, - } - - // Test that legacy format is not supported - err := handler.HandleLegacy([]byte("some data")) - assert.Error(t, err) - assert.Equal(t, "legacy format not supported", err.Error()) -} diff --git a/internal/hub/ws/request_manager.go b/internal/hub/ws/request_manager.go index 28dab40d..41fe7711 100644 --- a/internal/hub/ws/request_manager.go +++ b/internal/hub/ws/request_manager.go @@ -45,7 +45,15 @@ func NewRequestManager(conn *gws.Conn) *RequestManager { func (rm *RequestManager) SendRequest(ctx context.Context, action common.WebSocketAction, data any) (*PendingRequest, error) { reqID := RequestID(rm.nextID.Add(1)) - reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + // Respect any caller-provided deadline. If none is set, apply a reasonable default + // so pending requests don't live forever if the agent never responds. + reqCtx := ctx + var cancel context.CancelFunc + if _, hasDeadline := ctx.Deadline(); hasDeadline { + reqCtx, cancel = context.WithCancel(ctx) + } else { + reqCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + } req := &PendingRequest{ ID: reqID, @@ -100,6 +108,11 @@ func (rm *RequestManager) handleResponse(message *gws.Message) { return } + if response.Id == nil { + rm.routeLegacyResponse(message) + return + } + reqID := RequestID(*response.Id) rm.RLock() diff --git a/internal/hub/ws/ws.go b/internal/hub/ws/ws.go index 0539ec7d..d80e8e67 100644 --- a/internal/hub/ws/ws.go +++ b/internal/hub/ws/ws.go @@ -1,6 +1,7 @@ package ws import ( + "context" "errors" "time" "weak" @@ -161,3 +162,14 @@ func (ws *WsConn) handleAgentRequest(req *PendingRequest, handler ResponseHandle func (ws *WsConn) IsConnected() bool { return ws.conn != nil } + +// AgentVersion returns the connected agent's version (as reported during handshake). +func (ws *WsConn) AgentVersion() semver.Version { + return ws.agentVersion +} + +// SendRequest sends a request to the agent and returns a pending request handle. +// This is used by the transport layer to send requests. +func (ws *WsConn) SendRequest(ctx context.Context, action common.WebSocketAction, data any) (*PendingRequest, error) { + return ws.requestManager.SendRequest(ctx, action, data) +} diff --git a/internal/hub/ws/ws_test.go b/internal/hub/ws/ws_test.go index 3094152b..5efcfe8c 100644 --- a/internal/hub/ws/ws_test.go +++ b/internal/hub/ws/ws_test.go @@ -184,14 +184,18 @@ func TestCommonActions(t *testing.T) { assert.Equal(t, common.WebSocketAction(2), common.GetContainerLogs, "GetLogs should be action 2") } -func TestLogsHandler(t *testing.T) { - h := &stringResponseHandler{errorMsg: "no logs in response"} +func TestFingerprintHandler(t *testing.T) { + var result common.FingerprintResponse + h := &fingerprintHandler{result: &result} - logValue := "test logs" - resp := common.AgentResponse{String: &logValue} + resp := common.AgentResponse{Fingerprint: &common.FingerprintResponse{ + Fingerprint: "test-fingerprint", + Hostname: "test-host", + }} err := h.Handle(resp) assert.NoError(t, err) - assert.Equal(t, logValue, h.value) + assert.Equal(t, "test-fingerprint", result.Fingerprint) + assert.Equal(t, "test-host", result.Hostname) } // TestHandler tests that we can create a Handler diff --git a/internal/site/package.json b/internal/site/package.json index 4a0f5aae..66dc7c29 100644 --- a/internal/site/package.json +++ b/internal/site/package.json @@ -1,7 +1,7 @@ { "name": "beszel", "private": true, - "version": "0.18.0-beta.1", + "version": "0.18.0-beta.2", "type": "module", "scripts": { "dev": "vite --host", diff --git a/internal/site/src/components/routes/system.tsx b/internal/site/src/components/routes/system.tsx index 7c12ad9f..d20237b3 100644 --- a/internal/site/src/components/routes/system.tsx +++ b/internal/site/src/components/routes/system.tsx @@ -16,7 +16,7 @@ import MemChart from "@/components/charts/mem-chart" import SwapChart from "@/components/charts/swap-chart" import TemperatureChart from "@/components/charts/temperature-chart" import { getPbTimestamp, pb } from "@/lib/api" -import { ChartType, Os, SystemStatus, Unit } from "@/lib/enums" +import { ChartType, SystemStatus, Unit } from "@/lib/enums" import { batteryStateTranslations } from "@/lib/i18n" import { $allSystemsById, @@ -222,7 +222,6 @@ export default memo(function SystemDetail({ id }: { id: string }) { }, [system.id]) // subscribe to realtime metrics if chart time is 1m - // biome-ignore lint/correctness/useExhaustiveDependencies: not necessary useEffect(() => { let unsub = () => {} if (!system.id || chartTime !== "1m") { @@ -260,7 +259,6 @@ export default memo(function SystemDetail({ id }: { id: string }) { } }, [chartTime, system.id]) - // biome-ignore lint/correctness/useExhaustiveDependencies: not necessary const chartData: ChartData = useMemo(() => { const lastCreated = Math.max( (systemStats.at(-1)?.created as number) ?? 0, @@ -300,7 +298,6 @@ export default memo(function SystemDetail({ id }: { id: string }) { }, []) // get stats - // biome-ignore lint/correctness/useExhaustiveDependencies: not necessary useEffect(() => { if (!system.id || !chartTime || chartTime === "1m") { return @@ -407,7 +404,7 @@ export default memo(function SystemDetail({ id }: { id: string }) { const hasGpuData = lastGpuVals.length > 0 const hasGpuPowerData = lastGpuVals.some((gpu) => gpu.p !== undefined || gpu.pp !== undefined) const hasGpuEnginesData = lastGpuVals.some((gpu) => gpu.e !== undefined) - const isLinux = (details?.os ?? system.info?.os) === Os.Linux + const isLinux = !(details?.os ?? system.info?.os) const isPodman = details?.podman ?? system.info?.p ?? false return ( diff --git a/internal/site/src/components/systems-table/systems-table-columns.tsx b/internal/site/src/components/systems-table/systems-table-columns.tsx index a714a824..d3e28c44 100644 --- a/internal/site/src/components/systems-table/systems-table-columns.tsx +++ b/internal/site/src/components/systems-table/systems-table-columns.tsx @@ -287,12 +287,12 @@ export function SystemsTableColumns(viewMode: "table" | "grid"): ColumnDef) { export function BatteryLowIcon(props: SVGProps) { return ( - + ) }