refactor: unify agent communication with Transport interface

- Introduce `Transport` interface to abstract WebSocket and SSH
communication
- Add generic `Data` field to `AgentResponse` for streamlined future
endpoints
- Maintain backward compatibility with legacy hubs and agents using
typed fields
- Unify fetch operations (SMART, systemd, containers) under a single
`request` method
- Improve `RequestManager` with deadline awareness and legacy response
support
- Refactor agent response routing into dedicated `agent/response.go`
- Update version to 0.18.0-beta.2
This commit is contained in:
henrygd
2026-01-05 13:13:55 -05:00
parent e7b4be3dc5
commit 4547ff7b5d
19 changed files with 623 additions and 457 deletions

View File

@@ -13,9 +13,11 @@ import (
"time"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/hub/transport"
"github.com/henrygd/beszel/internal/hub/ws"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/henrygd/beszel/internal/entities/smart"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/entities/systemd"
@@ -23,28 +25,30 @@ import (
"github.com/blang/semver"
"github.com/fxamacker/cbor/v2"
"github.com/lxzan/gws"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
"golang.org/x/crypto/ssh"
)
type System struct {
Id string `db:"id"`
Host string `db:"host"`
Port string `db:"port"`
Status string `db:"status"`
manager *SystemManager // Manager that this system belongs to
client *ssh.Client // SSH client for fetching data
data *system.CombinedData // system data from agent
ctx context.Context // Context for stopping the updater
cancel context.CancelFunc // Stops and removes system from updater
WsConn *ws.WsConn // Handler for agent WebSocket connection
agentVersion semver.Version // Agent version
updateTicker *time.Ticker // Ticker for updating the system
detailsFetched atomic.Bool // True if static system details have been fetched and saved
smartFetching atomic.Bool // True if SMART devices are currently being fetched
smartInterval time.Duration // Interval for periodic SMART data updates
lastSmartFetch atomic.Int64 // Unix milliseconds of last SMART data fetch
Id string `db:"id"`
Host string `db:"host"`
Port string `db:"port"`
Status string `db:"status"`
manager *SystemManager // Manager that this system belongs to
client *ssh.Client // SSH client for fetching data
sshTransport *transport.SSHTransport // SSH transport for requests
data *system.CombinedData // system data from agent
ctx context.Context // Context for stopping the updater
cancel context.CancelFunc // Stops and removes system from updater
WsConn *ws.WsConn // Handler for agent WebSocket connection
agentVersion semver.Version // Agent version
updateTicker *time.Ticker // Ticker for updating the system
detailsFetched atomic.Bool // True if static system details have been fetched and saved
smartFetching atomic.Bool // True if SMART devices are currently being fetched
smartInterval time.Duration // Interval for periodic SMART data updates
lastSmartFetch atomic.Int64 // Unix milliseconds of last SMART data fetch
}
func (sm *SystemManager) NewSystem(systemId string) *System {
@@ -359,8 +363,78 @@ func (sys *System) getContext() (context.Context, context.CancelFunc) {
return sys.ctx, sys.cancel
}
// fetchDataFromAgent attempts to fetch data from the agent,
// prioritizing WebSocket if available.
// request sends a request to the agent, trying WebSocket first, then SSH.
// This is the unified request method that uses the transport abstraction.
func (sys *System) request(ctx context.Context, action common.WebSocketAction, req any, dest any) error {
// Try WebSocket first
if sys.WsConn != nil && sys.WsConn.IsConnected() {
wsTransport := transport.NewWebSocketTransport(sys.WsConn)
if err := wsTransport.Request(ctx, action, req, dest); err == nil {
return nil
} else if !shouldFallbackToSSH(err) {
return err
} else if shouldCloseWebSocket(err) {
sys.closeWebSocketConnection()
}
}
// Fall back to SSH if WebSocket fails
if err := sys.ensureSSHTransport(); err != nil {
return err
}
err := sys.sshTransport.RequestWithRetry(ctx, action, req, dest, 1)
// Keep legacy SSH client/version fields in sync for other code paths.
if sys.sshTransport != nil {
sys.client = sys.sshTransport.GetClient()
sys.agentVersion = sys.sshTransport.GetAgentVersion()
}
return err
}
func shouldFallbackToSSH(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
if errors.Is(err, gws.ErrConnClosed) {
return true
}
return errors.Is(err, transport.ErrWebSocketNotConnected)
}
func shouldCloseWebSocket(err error) bool {
if err == nil {
return false
}
return errors.Is(err, gws.ErrConnClosed) || errors.Is(err, transport.ErrWebSocketNotConnected)
}
// ensureSSHTransport ensures the SSH transport is initialized and connected.
func (sys *System) ensureSSHTransport() error {
if sys.sshTransport == nil {
if sys.manager.sshConfig == nil {
if err := sys.manager.createSSHClientConfig(); err != nil {
return err
}
}
sys.sshTransport = transport.NewSSHTransport(transport.SSHTransportConfig{
Host: sys.Host,
Port: sys.Port,
Config: sys.manager.sshConfig,
Timeout: 4 * time.Second,
})
}
// Sync client state with transport
if sys.client != nil {
sys.sshTransport.SetClient(sys.client)
sys.sshTransport.SetAgentVersion(sys.agentVersion)
}
return nil
}
// fetchDataFromAgent attempts to fetch data from the agent, prioritizing WebSocket if available.
func (sys *System) fetchDataFromAgent(options common.DataRequestOptions) (*system.CombinedData, error) {
if sys.data == nil {
sys.data = &system.CombinedData{}
@@ -386,112 +460,47 @@ func (sys *System) fetchDataViaWebSocket(options common.DataRequestOptions) (*sy
if sys.WsConn == nil || !sys.WsConn.IsConnected() {
return nil, errors.New("no websocket connection")
}
err := sys.WsConn.RequestSystemData(context.Background(), sys.data, options)
wsTransport := transport.NewWebSocketTransport(sys.WsConn)
err := wsTransport.Request(context.Background(), common.GetData, options, sys.data)
if err != nil {
return nil, err
}
return sys.data, nil
}
// fetchStringFromAgentViaSSH is a generic function to fetch strings via SSH
func (sys *System) fetchStringFromAgentViaSSH(action common.WebSocketAction, requestData any, errorMsg string) (string, error) {
var result string
err := sys.runSSHOperation(4*time.Second, 1, func(session *ssh.Session) (bool, error) {
stdout, err := session.StdoutPipe()
if err != nil {
return false, err
}
stdin, stdinErr := session.StdinPipe()
if stdinErr != nil {
return false, stdinErr
}
if err := session.Shell(); err != nil {
return false, err
}
req := common.HubRequest[any]{Action: action, Data: requestData}
_ = cbor.NewEncoder(stdin).Encode(req)
_ = stdin.Close()
var resp common.AgentResponse
err = cbor.NewDecoder(stdout).Decode(&resp)
if err != nil {
return false, err
}
if resp.String == nil {
return false, errors.New(errorMsg)
}
result = *resp.String
return false, nil
})
return result, err
}
// FetchContainerInfoFromAgent fetches container info from the agent
func (sys *System) FetchContainerInfoFromAgent(containerID string) (string, error) {
// fetch via websocket
if sys.WsConn != nil && sys.WsConn.IsConnected() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return sys.WsConn.RequestContainerInfo(ctx, containerID)
}
// fetch via SSH
return sys.fetchStringFromAgentViaSSH(common.GetContainerInfo, common.ContainerInfoRequest{ContainerID: containerID}, "no info in response")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var result string
err := sys.request(ctx, common.GetContainerInfo, common.ContainerInfoRequest{ContainerID: containerID}, &result)
return result, err
}
// FetchContainerLogsFromAgent fetches container logs from the agent
func (sys *System) FetchContainerLogsFromAgent(containerID string) (string, error) {
// fetch via websocket
if sys.WsConn != nil && sys.WsConn.IsConnected() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return sys.WsConn.RequestContainerLogs(ctx, containerID)
}
// fetch via SSH
return sys.fetchStringFromAgentViaSSH(common.GetContainerLogs, common.ContainerLogsRequest{ContainerID: containerID}, "no logs in response")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var result string
err := sys.request(ctx, common.GetContainerLogs, common.ContainerLogsRequest{ContainerID: containerID}, &result)
return result, err
}
// FetchSystemdInfoFromAgent fetches detailed systemd service information from the agent
func (sys *System) FetchSystemdInfoFromAgent(serviceName string) (systemd.ServiceDetails, error) {
// fetch via websocket
if sys.WsConn != nil && sys.WsConn.IsConnected() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return sys.WsConn.RequestSystemdInfo(ctx, serviceName)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var result systemd.ServiceDetails
err := sys.runSSHOperation(5*time.Second, 1, func(session *ssh.Session) (bool, error) {
stdout, err := session.StdoutPipe()
if err != nil {
return false, err
}
stdin, stdinErr := session.StdinPipe()
if stdinErr != nil {
return false, stdinErr
}
if err := session.Shell(); err != nil {
return false, err
}
req := common.HubRequest[any]{Action: common.GetSystemdInfo, Data: common.SystemdInfoRequest{ServiceName: serviceName}}
if err := cbor.NewEncoder(stdin).Encode(req); err != nil {
return false, err
}
_ = stdin.Close()
var resp common.AgentResponse
if err := cbor.NewDecoder(stdout).Decode(&resp); err != nil {
return false, err
}
if resp.ServiceInfo == nil {
if resp.Error != "" {
return false, errors.New(resp.Error)
}
return false, errors.New("no systemd info in response")
}
result = resp.ServiceInfo
return false, nil
})
err := sys.request(ctx, common.GetSystemdInfo, common.SystemdInfoRequest{ServiceName: serviceName}, &result)
return result, err
}
// FetchSmartDataFromAgent fetches SMART data from the agent
func (sys *System) FetchSmartDataFromAgent() (map[string]smart.SmartData, error) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var result map[string]smart.SmartData
err := sys.request(ctx, common.GetSmartData, nil, &result)
return result, err
}
@@ -656,6 +665,9 @@ func (sys *System) createSessionWithTimeout(timeout time.Duration) (*ssh.Session
// closeSSHConnection closes the SSH connection but keeps the system in the manager
func (sys *System) closeSSHConnection() {
if sys.sshTransport != nil {
sys.sshTransport.Close()
}
if sys.client != nil {
sys.client.Close()
sys.client = nil

View File

@@ -1,54 +1,14 @@
package systems
import (
"context"
"database/sql"
"errors"
"strings"
"time"
"github.com/fxamacker/cbor/v2"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/smart"
"github.com/pocketbase/pocketbase/core"
"golang.org/x/crypto/ssh"
)
// FetchSmartDataFromAgent fetches SMART data from the agent
func (sys *System) FetchSmartDataFromAgent() (map[string]smart.SmartData, error) {
// fetch via websocket
if sys.WsConn != nil && sys.WsConn.IsConnected() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return sys.WsConn.RequestSmartData(ctx)
}
// fetch via SSH
var result map[string]smart.SmartData
err := sys.runSSHOperation(5*time.Second, 1, func(session *ssh.Session) (bool, error) {
stdout, err := session.StdoutPipe()
if err != nil {
return false, err
}
stdin, stdinErr := session.StdinPipe()
if stdinErr != nil {
return false, stdinErr
}
if err := session.Shell(); err != nil {
return false, err
}
req := common.HubRequest[any]{Action: common.GetSmartData}
_ = cbor.NewEncoder(stdin).Encode(req)
_ = stdin.Close()
var resp common.AgentResponse
if err := cbor.NewDecoder(stdout).Decode(&resp); err != nil {
return false, err
}
result = resp.SmartData
return false, nil
})
return result, err
}
// FetchAndSaveSmartDevices fetches SMART data from the agent and saves it to the database
func (sys *System) FetchAndSaveSmartDevices() error {
smartData, err := sys.FetchSmartDataFromAgent()

View File

@@ -0,0 +1,227 @@
package transport
import (
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"time"
"github.com/blang/semver"
"github.com/fxamacker/cbor/v2"
"github.com/henrygd/beszel/internal/common"
"golang.org/x/crypto/ssh"
)
// SSHTransport implements Transport over SSH connections.
type SSHTransport struct {
client *ssh.Client
config *ssh.ClientConfig
host string
port string
agentVersion semver.Version
timeout time.Duration
}
// SSHTransportConfig holds configuration for creating an SSH transport.
type SSHTransportConfig struct {
Host string
Port string
Config *ssh.ClientConfig
AgentVersion semver.Version
Timeout time.Duration
}
// NewSSHTransport creates a new SSH transport with the given configuration.
func NewSSHTransport(cfg SSHTransportConfig) *SSHTransport {
timeout := cfg.Timeout
if timeout == 0 {
timeout = 4 * time.Second
}
return &SSHTransport{
config: cfg.Config,
host: cfg.Host,
port: cfg.Port,
agentVersion: cfg.AgentVersion,
timeout: timeout,
}
}
// SetClient sets the SSH client for reuse across requests.
func (t *SSHTransport) SetClient(client *ssh.Client) {
t.client = client
}
// SetAgentVersion sets the agent version (extracted from SSH handshake).
func (t *SSHTransport) SetAgentVersion(version semver.Version) {
t.agentVersion = version
}
// GetClient returns the current SSH client (for connection management).
func (t *SSHTransport) GetClient() *ssh.Client {
return t.client
}
// GetAgentVersion returns the agent version.
func (t *SSHTransport) GetAgentVersion() semver.Version {
return t.agentVersion
}
// Request sends a request to the agent via SSH and unmarshals the response.
func (t *SSHTransport) Request(ctx context.Context, action common.WebSocketAction, req any, dest any) error {
if t.client == nil {
if err := t.connect(); err != nil {
return err
}
}
session, err := t.createSessionWithTimeout(ctx)
if err != nil {
return err
}
defer session.Close()
stdout, err := session.StdoutPipe()
if err != nil {
return err
}
stdin, err := session.StdinPipe()
if err != nil {
return err
}
if err := session.Shell(); err != nil {
return err
}
// Send request
hubReq := common.HubRequest[any]{Action: action, Data: req}
if err := cbor.NewEncoder(stdin).Encode(hubReq); err != nil {
return fmt.Errorf("failed to encode request: %w", err)
}
stdin.Close()
// Read response
var resp common.AgentResponse
if err := cbor.NewDecoder(stdout).Decode(&resp); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}
if resp.Error != "" {
return errors.New(resp.Error)
}
if err := session.Wait(); err != nil {
return err
}
return UnmarshalResponse(resp, action, dest)
}
// IsConnected returns true if the SSH connection is active.
func (t *SSHTransport) IsConnected() bool {
return t.client != nil
}
// Close terminates the SSH connection.
func (t *SSHTransport) Close() {
if t.client != nil {
t.client.Close()
t.client = nil
}
}
// connect establishes a new SSH connection.
func (t *SSHTransport) connect() error {
if t.config == nil {
return errors.New("SSH config not set")
}
network := "tcp"
host := t.host
if strings.HasPrefix(host, "/") {
network = "unix"
} else {
host = net.JoinHostPort(host, t.port)
}
client, err := ssh.Dial(network, host, t.config)
if err != nil {
return err
}
t.client = client
// Extract agent version from server version string
t.agentVersion, _ = extractAgentVersion(string(client.Conn.ServerVersion()))
return nil
}
// createSessionWithTimeout creates a new SSH session with a timeout.
func (t *SSHTransport) createSessionWithTimeout(ctx context.Context) (*ssh.Session, error) {
if t.client == nil {
return nil, errors.New("client not initialized")
}
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
sessionChan := make(chan *ssh.Session, 1)
errChan := make(chan error, 1)
go func() {
session, err := t.client.NewSession()
if err != nil {
errChan <- err
} else {
sessionChan <- session
}
}()
select {
case session := <-sessionChan:
return session, nil
case err := <-errChan:
return nil, err
case <-ctx.Done():
return nil, errors.New("timeout creating session")
}
}
// extractAgentVersion extracts the beszel version from SSH server version string.
func extractAgentVersion(versionString string) (semver.Version, error) {
_, after, _ := strings.Cut(versionString, "_")
return semver.Parse(after)
}
// RequestWithRetry sends a request with automatic retry on connection failures.
func (t *SSHTransport) RequestWithRetry(ctx context.Context, action common.WebSocketAction, req any, dest any, retries int) error {
var lastErr error
for attempt := 0; attempt <= retries; attempt++ {
err := t.Request(ctx, action, req, dest)
if err == nil {
return nil
}
lastErr = err
// Check if it's a connection error that warrants a retry
if isConnectionError(err) && attempt < retries {
t.Close()
continue
}
return err
}
return lastErr
}
// isConnectionError checks if an error indicates a connection problem.
func isConnectionError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "connection") ||
strings.Contains(errStr, "EOF") ||
strings.Contains(errStr, "closed") ||
errors.Is(err, io.EOF)
}

View File

@@ -0,0 +1,112 @@
// Package transport provides a unified abstraction for hub-agent communication
// over different transports (WebSocket, SSH).
package transport
import (
"context"
"errors"
"fmt"
"github.com/fxamacker/cbor/v2"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/smart"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/entities/systemd"
)
// Transport defines the interface for hub-agent communication.
// Both WebSocket and SSH transports implement this interface.
type Transport interface {
// Request sends a request to the agent and unmarshals the response into dest.
// The dest parameter should be a pointer to the expected response type.
Request(ctx context.Context, action common.WebSocketAction, req any, dest any) error
// IsConnected returns true if the transport connection is active.
IsConnected() bool
// Close terminates the transport connection.
Close()
}
// UnmarshalResponse unmarshals an AgentResponse into the destination type.
// It first checks the generic Data field (0.19+ agents), then falls back
// to legacy typed fields for backward compatibility with 0.18.0 agents.
func UnmarshalResponse(resp common.AgentResponse, action common.WebSocketAction, dest any) error {
if dest == nil {
return errors.New("nil destination")
}
// Try generic Data field first (0.19+)
if len(resp.Data) > 0 {
if err := cbor.Unmarshal(resp.Data, dest); err != nil {
return fmt.Errorf("failed to unmarshal generic response data: %w", err)
}
return nil
}
// Fall back to legacy typed fields for older agents/hubs.
return unmarshalLegacyResponse(resp, action, dest)
}
// unmarshalLegacyResponse handles legacy responses that use typed fields.
func unmarshalLegacyResponse(resp common.AgentResponse, action common.WebSocketAction, dest any) error {
switch action {
case common.GetData:
d, ok := dest.(*system.CombinedData)
if !ok {
return fmt.Errorf("unexpected dest type for GetData: %T", dest)
}
if resp.SystemData == nil {
return errors.New("no system data in response")
}
*d = *resp.SystemData
return nil
case common.CheckFingerprint:
d, ok := dest.(*common.FingerprintResponse)
if !ok {
return fmt.Errorf("unexpected dest type for CheckFingerprint: %T", dest)
}
if resp.Fingerprint == nil {
return errors.New("no fingerprint in response")
}
*d = *resp.Fingerprint
return nil
case common.GetContainerLogs:
d, ok := dest.(*string)
if !ok {
return fmt.Errorf("unexpected dest type for GetContainerLogs: %T", dest)
}
if resp.String == nil {
return errors.New("no logs in response")
}
*d = *resp.String
return nil
case common.GetContainerInfo:
d, ok := dest.(*string)
if !ok {
return fmt.Errorf("unexpected dest type for GetContainerInfo: %T", dest)
}
if resp.String == nil {
return errors.New("no info in response")
}
*d = *resp.String
return nil
case common.GetSmartData:
d, ok := dest.(*map[string]smart.SmartData)
if !ok {
return fmt.Errorf("unexpected dest type for GetSmartData: %T", dest)
}
if resp.SmartData == nil {
return errors.New("no SMART data in response")
}
*d = resp.SmartData
return nil
case common.GetSystemdInfo:
d, ok := dest.(*systemd.ServiceDetails)
if !ok {
return fmt.Errorf("unexpected dest type for GetSystemdInfo: %T", dest)
}
if resp.ServiceInfo == nil {
return errors.New("no systemd info in response")
}
*d = resp.ServiceInfo
return nil
}
return fmt.Errorf("unsupported action: %d", action)
}

View File

@@ -0,0 +1,74 @@
package transport
import (
"context"
"errors"
"github.com/fxamacker/cbor/v2"
"github.com/henrygd/beszel"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/hub/ws"
)
// ErrWebSocketNotConnected indicates a WebSocket transport is not currently connected.
var ErrWebSocketNotConnected = errors.New("websocket not connected")
// WebSocketTransport implements Transport over WebSocket connections.
type WebSocketTransport struct {
wsConn *ws.WsConn
}
// NewWebSocketTransport creates a new WebSocket transport wrapper.
func NewWebSocketTransport(wsConn *ws.WsConn) *WebSocketTransport {
return &WebSocketTransport{wsConn: wsConn}
}
// Request sends a request to the agent via WebSocket and unmarshals the response.
func (t *WebSocketTransport) Request(ctx context.Context, action common.WebSocketAction, req any, dest any) error {
if !t.IsConnected() {
return ErrWebSocketNotConnected
}
pendingReq, err := t.wsConn.SendRequest(ctx, action, req)
if err != nil {
return err
}
// Wait for response
select {
case message := <-pendingReq.ResponseCh:
defer message.Close()
defer pendingReq.Cancel()
// Legacy agents (< MinVersionAgentResponse) respond with a raw payload instead of an AgentResponse wrapper.
if t.wsConn.AgentVersion().LT(beszel.MinVersionAgentResponse) {
return cbor.Unmarshal(message.Data.Bytes(), dest)
}
var agentResponse common.AgentResponse
if err := cbor.Unmarshal(message.Data.Bytes(), &agentResponse); err != nil {
return err
}
if agentResponse.Error != "" {
return errors.New(agentResponse.Error)
}
return UnmarshalResponse(agentResponse, action, dest)
case <-pendingReq.Context.Done():
return pendingReq.Context.Err()
}
}
// IsConnected returns true if the WebSocket connection is active.
func (t *WebSocketTransport) IsConnected() bool {
return t.wsConn != nil && t.wsConn.IsConnected()
}
// Close terminates the WebSocket connection.
func (t *WebSocketTransport) Close() {
if t.wsConn != nil {
t.wsConn.Close(nil)
}
}

View File

@@ -6,14 +6,12 @@ import (
"github.com/fxamacker/cbor/v2"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/smart"
"github.com/henrygd/beszel/internal/entities/system"
"github.com/henrygd/beszel/internal/entities/systemd"
"github.com/lxzan/gws"
"golang.org/x/crypto/ssh"
)
// ResponseHandler defines interface for handling agent responses
// ResponseHandler defines interface for handling agent responses.
// This is used by handleAgentRequest for legacy response handling.
type ResponseHandler interface {
Handle(agentResponse common.AgentResponse) error
HandleLegacy(rawData []byte) error
@@ -27,167 +25,7 @@ func (h *BaseHandler) HandleLegacy(rawData []byte) error {
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// systemDataHandler implements ResponseHandler for system data requests
type systemDataHandler struct {
data *system.CombinedData
}
func (h *systemDataHandler) HandleLegacy(rawData []byte) error {
return cbor.Unmarshal(rawData, h.data)
}
func (h *systemDataHandler) Handle(agentResponse common.AgentResponse) error {
if agentResponse.SystemData != nil {
*h.data = *agentResponse.SystemData
}
return nil
}
// RequestSystemData requests system metrics from the agent and unmarshals the response.
func (ws *WsConn) RequestSystemData(ctx context.Context, data *system.CombinedData, options common.DataRequestOptions) error {
if !ws.IsConnected() {
return gws.ErrConnClosed
}
req, err := ws.requestManager.SendRequest(ctx, common.GetData, options)
if err != nil {
return err
}
handler := &systemDataHandler{data: data}
return ws.handleAgentRequest(req, handler)
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// stringResponseHandler is a generic handler for string responses from agents
type stringResponseHandler struct {
BaseHandler
value string
errorMsg string
}
func (h *stringResponseHandler) Handle(agentResponse common.AgentResponse) error {
if agentResponse.String == nil {
return errors.New(h.errorMsg)
}
h.value = *agentResponse.String
return nil
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// requestContainerStringViaWS is a generic function to request container-related strings via WebSocket
func (ws *WsConn) requestContainerStringViaWS(ctx context.Context, action common.WebSocketAction, requestData any, errorMsg string) (string, error) {
if !ws.IsConnected() {
return "", gws.ErrConnClosed
}
req, err := ws.requestManager.SendRequest(ctx, action, requestData)
if err != nil {
return "", err
}
handler := &stringResponseHandler{errorMsg: errorMsg}
if err := ws.handleAgentRequest(req, handler); err != nil {
return "", err
}
return handler.value, nil
}
// RequestContainerLogs requests logs for a specific container via WebSocket.
func (ws *WsConn) RequestContainerLogs(ctx context.Context, containerID string) (string, error) {
return ws.requestContainerStringViaWS(ctx, common.GetContainerLogs, common.ContainerLogsRequest{ContainerID: containerID}, "no logs in response")
}
// RequestContainerInfo requests information about a specific container via WebSocket.
func (ws *WsConn) RequestContainerInfo(ctx context.Context, containerID string) (string, error) {
return ws.requestContainerStringViaWS(ctx, common.GetContainerInfo, common.ContainerInfoRequest{ContainerID: containerID}, "no info in response")
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// RequestSystemdInfo requests detailed information about a systemd service via WebSocket.
func (ws *WsConn) RequestSystemdInfo(ctx context.Context, serviceName string) (systemd.ServiceDetails, error) {
if !ws.IsConnected() {
return nil, gws.ErrConnClosed
}
req, err := ws.requestManager.SendRequest(ctx, common.GetSystemdInfo, common.SystemdInfoRequest{ServiceName: serviceName})
if err != nil {
return nil, err
}
var result systemd.ServiceDetails
handler := &systemdInfoHandler{result: &result}
if err := ws.handleAgentRequest(req, handler); err != nil {
return nil, err
}
return result, nil
}
// systemdInfoHandler parses ServiceDetails from AgentResponse
type systemdInfoHandler struct {
BaseHandler
result *systemd.ServiceDetails
}
func (h *systemdInfoHandler) Handle(agentResponse common.AgentResponse) error {
if agentResponse.ServiceInfo == nil {
return errors.New("no systemd info in response")
}
*h.result = agentResponse.ServiceInfo
return nil
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// RequestSmartData requests SMART data via WebSocket.
func (ws *WsConn) RequestSmartData(ctx context.Context) (map[string]smart.SmartData, error) {
if !ws.IsConnected() {
return nil, gws.ErrConnClosed
}
req, err := ws.requestManager.SendRequest(ctx, common.GetSmartData, nil)
if err != nil {
return nil, err
}
var result map[string]smart.SmartData
handler := ResponseHandler(&smartDataHandler{result: &result})
if err := ws.handleAgentRequest(req, handler); err != nil {
return nil, err
}
return result, nil
}
// smartDataHandler parses SMART data map from AgentResponse
type smartDataHandler struct {
BaseHandler
result *map[string]smart.SmartData
}
func (h *smartDataHandler) Handle(agentResponse common.AgentResponse) error {
if agentResponse.SmartData == nil {
return errors.New("no SMART data in response")
}
*h.result = agentResponse.SmartData
return nil
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// Fingerprint handling (used for WebSocket authentication)
////////////////////////////////////////////////////////////////////////////
// fingerprintHandler implements ResponseHandler for fingerprint requests

View File

@@ -1,75 +0,0 @@
//go:build testing
package ws
import (
"testing"
"github.com/henrygd/beszel/internal/common"
"github.com/henrygd/beszel/internal/entities/systemd"
"github.com/stretchr/testify/assert"
)
func TestSystemdInfoHandlerSuccess(t *testing.T) {
handler := &systemdInfoHandler{
result: &systemd.ServiceDetails{},
}
// Test successful handling with valid ServiceInfo
testDetails := systemd.ServiceDetails{
"Id": "nginx.service",
"ActiveState": "active",
"SubState": "running",
"Description": "A high performance web server",
"ExecMainPID": 1234,
"MemoryCurrent": 1024000,
}
response := common.AgentResponse{
ServiceInfo: testDetails,
}
err := handler.Handle(response)
assert.NoError(t, err)
assert.Equal(t, testDetails, *handler.result)
}
func TestSystemdInfoHandlerError(t *testing.T) {
handler := &systemdInfoHandler{
result: &systemd.ServiceDetails{},
}
// Test error handling when ServiceInfo is nil
response := common.AgentResponse{
ServiceInfo: nil,
Error: "service not found",
}
err := handler.Handle(response)
assert.Error(t, err)
assert.Equal(t, "no systemd info in response", err.Error())
}
func TestSystemdInfoHandlerEmptyResponse(t *testing.T) {
handler := &systemdInfoHandler{
result: &systemd.ServiceDetails{},
}
// Test with completely empty response
response := common.AgentResponse{}
err := handler.Handle(response)
assert.Error(t, err)
assert.Equal(t, "no systemd info in response", err.Error())
}
func TestSystemdInfoHandlerLegacyNotSupported(t *testing.T) {
handler := &systemdInfoHandler{
result: &systemd.ServiceDetails{},
}
// Test that legacy format is not supported
err := handler.HandleLegacy([]byte("some data"))
assert.Error(t, err)
assert.Equal(t, "legacy format not supported", err.Error())
}

View File

@@ -45,7 +45,15 @@ func NewRequestManager(conn *gws.Conn) *RequestManager {
func (rm *RequestManager) SendRequest(ctx context.Context, action common.WebSocketAction, data any) (*PendingRequest, error) {
reqID := RequestID(rm.nextID.Add(1))
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Respect any caller-provided deadline. If none is set, apply a reasonable default
// so pending requests don't live forever if the agent never responds.
reqCtx := ctx
var cancel context.CancelFunc
if _, hasDeadline := ctx.Deadline(); hasDeadline {
reqCtx, cancel = context.WithCancel(ctx)
} else {
reqCtx, cancel = context.WithTimeout(ctx, 5*time.Second)
}
req := &PendingRequest{
ID: reqID,
@@ -100,6 +108,11 @@ func (rm *RequestManager) handleResponse(message *gws.Message) {
return
}
if response.Id == nil {
rm.routeLegacyResponse(message)
return
}
reqID := RequestID(*response.Id)
rm.RLock()

View File

@@ -1,6 +1,7 @@
package ws
import (
"context"
"errors"
"time"
"weak"
@@ -161,3 +162,14 @@ func (ws *WsConn) handleAgentRequest(req *PendingRequest, handler ResponseHandle
func (ws *WsConn) IsConnected() bool {
return ws.conn != nil
}
// AgentVersion returns the connected agent's version (as reported during handshake).
func (ws *WsConn) AgentVersion() semver.Version {
return ws.agentVersion
}
// SendRequest sends a request to the agent and returns a pending request handle.
// This is used by the transport layer to send requests.
func (ws *WsConn) SendRequest(ctx context.Context, action common.WebSocketAction, data any) (*PendingRequest, error) {
return ws.requestManager.SendRequest(ctx, action, data)
}

View File

@@ -184,14 +184,18 @@ func TestCommonActions(t *testing.T) {
assert.Equal(t, common.WebSocketAction(2), common.GetContainerLogs, "GetLogs should be action 2")
}
func TestLogsHandler(t *testing.T) {
h := &stringResponseHandler{errorMsg: "no logs in response"}
func TestFingerprintHandler(t *testing.T) {
var result common.FingerprintResponse
h := &fingerprintHandler{result: &result}
logValue := "test logs"
resp := common.AgentResponse{String: &logValue}
resp := common.AgentResponse{Fingerprint: &common.FingerprintResponse{
Fingerprint: "test-fingerprint",
Hostname: "test-host",
}}
err := h.Handle(resp)
assert.NoError(t, err)
assert.Equal(t, logValue, h.value)
assert.Equal(t, "test-fingerprint", result.Fingerprint)
assert.Equal(t, "test-host", result.Hostname)
}
// TestHandler tests that we can create a Handler