This commit is contained in:
henrygd
2026-03-03 14:23:42 -05:00
parent 48503f9f99
commit bdbd135fdd
10 changed files with 132 additions and 132 deletions

View File

@@ -101,7 +101,7 @@ func NewAgent(dataDir ...string) (agent *Agent, err error) {
agent.dockerManager = newDockerManager()
// initialize pve manager
agent.pveManager = newPVEManager(agent)
agent.pveManager = newPVEManager()
// initialize system info
agent.refreshSystemDetails()

View File

@@ -14,6 +14,10 @@ import (
)
func createTestCacheData() *system.CombinedData {
var stats = container.Stats{}
stats.Name = "test-container"
stats.Cpu = 10.5
stats.Mem = 1073741824 // 1GB
return &system.CombinedData{
Stats: system.Stats{
Cpu: 50.5,
@@ -24,10 +28,7 @@ func createTestCacheData() *system.CombinedData {
AgentVersion: "0.12.0",
},
Containers: []*container.Stats{
{
Name: "test-container",
Cpu: 25.0,
},
&stats,
},
}
}

View File

@@ -397,11 +397,12 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeM
// add empty values if they doesn't exist in map
stats, initialized := dm.containerStatsMap[ctr.IdShort]
if !initialized {
stats = &container.Stats{Name: name, Id: ctr.IdShort, Image: ctr.Image}
stats = &container.Stats{Image: ctr.Image}
dm.containerStatsMap[ctr.IdShort] = stats
}
stats.Id = ctr.IdShort
stats.Name = name
statusText, health := parseDockerStatus(ctr.Status)
stats.Status = statusText

View File

@@ -269,17 +269,16 @@ func TestValidateCpuPercentage(t *testing.T) {
}
func TestUpdateContainerStatsValues(t *testing.T) {
stats := &container.Stats{
Name: "test-container",
Cpu: 0.0,
Mem: 0.0,
NetworkSent: 0.0,
NetworkRecv: 0.0,
PrevReadTime: time.Time{},
}
var stats = container.Stats{}
stats.Name = "test-container"
stats.Cpu = 0.0
stats.Mem = 0.0
stats.NetworkSent = 0.0
stats.NetworkRecv = 0.0
stats.PrevReadTime = time.Time{}
testTime := time.Now()
updateContainerStatsValues(stats, 75.5, 1048576, 524288, 262144, testTime)
updateContainerStatsValues(&stats, 75.5, 1048576, 524288, 262144, testTime)
// Check CPU percentage (should be rounded to 2 decimals)
assert.Equal(t, 75.5, stats.Cpu)
@@ -446,12 +445,11 @@ func TestCalculateNetworkStats(t *testing.T) {
},
}
stats := &container.Stats{
PrevReadTime: time.Now().Add(-time.Second), // 1 second ago
}
var stats = container.Stats{}
stats.PrevReadTime = time.Now().Add(-time.Second) // 1 second ago
// Test with initialized container
sent, recv := dm.calculateNetworkStats(ctr, apiStats, stats, true, "test-container", cacheTimeMs)
sent, recv := dm.calculateNetworkStats(ctr, apiStats, &stats, true, "test-container", cacheTimeMs)
// Should return calculated byte rates per second
assert.GreaterOrEqual(t, sent, uint64(0))
@@ -460,7 +458,7 @@ func TestCalculateNetworkStats(t *testing.T) {
// Cycle and test one-direction change (Tx only) is reflected independently
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
apiStats.Networks["eth0"] = container.NetworkStats{TxBytes: 2500, RxBytes: 1800} // +500 Tx only
sent, recv = dm.calculateNetworkStats(ctr, apiStats, stats, true, "test-container", cacheTimeMs)
sent, recv = dm.calculateNetworkStats(ctr, apiStats, &stats, true, "test-container", cacheTimeMs)
assert.Greater(t, sent, uint64(0))
assert.Equal(t, uint64(0), recv)
}
@@ -726,7 +724,8 @@ func TestMemoryStatsEdgeCases(t *testing.T) {
}
func TestContainerStatsInitialization(t *testing.T) {
stats := &container.Stats{Name: "test-container"}
var stats = container.Stats{}
stats.Name = "test-container"
// Verify initial values
assert.Equal(t, "test-container", stats.Name)
@@ -738,7 +737,7 @@ func TestContainerStatsInitialization(t *testing.T) {
// Test updating values
testTime := time.Now()
updateContainerStatsValues(stats, 45.67, 2097152, 1048576, 524288, testTime)
updateContainerStatsValues(&stats, 45.67, 2097152, 1048576, 524288, testTime)
assert.Equal(t, 45.67, stats.Cpu)
assert.Equal(t, 2.0, stats.Mem)
@@ -816,12 +815,11 @@ func TestNetworkStatsCalculationWithRealData(t *testing.T) {
// Use exact timing for deterministic results
exactly1000msAgo := time.Now().Add(-1000 * time.Millisecond)
stats := &container.Stats{
PrevReadTime: exactly1000msAgo,
}
var stats = container.Stats{}
stats.PrevReadTime = exactly1000msAgo
// First call sets baseline
sent1, recv1 := dm.calculateNetworkStats(ctr, apiStats1, stats, true, "test", cacheTimeMs)
sent1, recv1 := dm.calculateNetworkStats(ctr, apiStats1, &stats, true, "test", cacheTimeMs)
assert.Equal(t, uint64(0), sent1)
assert.Equal(t, uint64(0), recv1)
@@ -836,7 +834,7 @@ func TestNetworkStatsCalculationWithRealData(t *testing.T) {
expectedRecvRate := deltaRecv * 1000 / expectedElapsedMs // Should be exactly 1000000
// Second call with changed data
sent2, recv2 := dm.calculateNetworkStats(ctr, apiStats2, stats, true, "test", cacheTimeMs)
sent2, recv2 := dm.calculateNetworkStats(ctr, apiStats2, &stats, true, "test", cacheTimeMs)
// Should be exactly the expected rates (no tolerance needed)
assert.Equal(t, expectedSentRate, sent2)
@@ -847,9 +845,9 @@ func TestNetworkStatsCalculationWithRealData(t *testing.T) {
stats.PrevReadTime = time.Now().Add(-1 * time.Millisecond)
apiStats1.Networks["eth0"] = container.NetworkStats{TxBytes: 0, RxBytes: 0}
apiStats2.Networks["eth0"] = container.NetworkStats{TxBytes: 10 * 1024 * 1024 * 1024, RxBytes: 0} // 10GB delta
_, _ = dm.calculateNetworkStats(ctr, apiStats1, stats, true, "test", cacheTimeMs) // baseline
_, _ = dm.calculateNetworkStats(ctr, apiStats1, &stats, true, "test", cacheTimeMs) // baseline
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
sent3, recv3 := dm.calculateNetworkStats(ctr, apiStats2, stats, true, "test", cacheTimeMs)
sent3, recv3 := dm.calculateNetworkStats(ctr, apiStats2, &stats, true, "test", cacheTimeMs)
assert.Equal(t, uint64(0), sent3)
assert.Equal(t, uint64(0), recv3)
}
@@ -883,8 +881,9 @@ func TestContainerStatsEndToEndWithRealData(t *testing.T) {
}
// Initialize container stats
stats := &container.Stats{Name: "jellyfin"}
dm.containerStatsMap[ctr.IdShort] = stats
var stats = container.Stats{}
stats.Name = "jellyfin"
dm.containerStatsMap[ctr.IdShort] = &stats
// Test individual components that we can verify
usedMemory, memErr := calculateMemoryUsage(&apiStats, false)

View File

@@ -14,16 +14,15 @@ import (
)
type pveManager struct {
client *proxmox.Client // Client to query PVE API
nodeName string // Cluster node name
cpuCount int // CPU count on node
containerStatsMap map[string]*container.Stats // Keeps track of container stats
client *proxmox.Client // Client to query PVE API
nodeName string // Cluster node name
cpuCount int // CPU count on node
nodeStatsMap map[string]*container.PveNodeStats // Keeps track of pve node stats
}
// Returns stats for all running VMs/LXCs
func (pm *pveManager) getPVEStats() ([]*container.Stats, error) {
func (pm *pveManager) getPVEStats() ([]*container.PveNodeStats, error) {
if pm.client == nil {
slog.Info("PVE client not configured")
return nil, errors.New("PVE client not configured")
}
cluster, err := pm.client.Cluster(context.Background())
@@ -31,42 +30,37 @@ func (pm *pveManager) getPVEStats() ([]*container.Stats, error) {
slog.Error("Error getting cluster", "err", err)
return nil, err
}
slog.Info("PVE cluster", "cluster", cluster)
resources, err := cluster.Resources(context.Background(), "vm")
if err != nil {
slog.Error("Error getting resources", "err", err, "resources", resources)
return nil, err
}
slog.Info("PVE resources", "resources", resources)
containersLength := len(resources)
slog.Info("PVE containers length", "containersLength", containersLength)
containerIds := make(map[string]struct{}, containersLength)
// only include running vms and lxcs on selected node
for _, resource := range resources {
if resource.Node == pm.nodeName && resource.Status == "running" {
slog.Info("PVE resource", "resource", resource)
containerIds[resource.ID] = struct{}{}
}
}
// remove invalid container stats
for id := range pm.containerStatsMap {
for id := range pm.nodeStatsMap {
if _, exists := containerIds[id]; !exists {
delete(pm.containerStatsMap, id)
delete(pm.nodeStatsMap, id)
}
}
// populate stats
stats := make([]*container.Stats, 0, len(containerIds))
stats := make([]*container.PveNodeStats, 0, len(containerIds))
for _, resource := range resources {
// slog.Info("PVE resource", "resource", resource)
if _, exists := containerIds[resource.ID]; !exists {
continue
}
resourceStats, initialized := pm.containerStatsMap[resource.ID]
resourceStats, initialized := pm.nodeStatsMap[resource.ID]
if !initialized {
resourceStats = &container.Stats{}
pm.containerStatsMap[resource.ID] = resourceStats
resourceStats = &container.PveNodeStats{}
pm.nodeStatsMap[resource.ID] = resourceStats
}
// reset current stats
resourceStats.Cpu = 0
@@ -77,7 +71,7 @@ func (pm *pveManager) getPVEStats() ([]*container.Stats, error) {
// Store resource ID (e.g. "qemu/100") in .Id (cbor key 7, json:"-")
resourceStats.Id = resource.ID
// Store type (e.g. "qemu" or "lxc") in .Image (cbor key 8, json:"-")
resourceStats.Image = resource.Type
resourceStats.Type = resource.Type
// PVE limits (cbor-only, for pve_vms table)
resourceStats.MaxCPU = resource.MaxCPU
resourceStats.MaxMem = resource.MaxMem
@@ -98,30 +92,30 @@ func (pm *pveManager) getPVEStats() ([]*container.Stats, error) {
resourceStats.Mem = float64(resource.Mem)
resourceStats.Bandwidth = [2]uint64{uint64(sent_delta), uint64(recv_delta)}
slog.Info("PVE resource stats", "resourceStats", resourceStats)
stats = append(stats, resourceStats)
}
slog.Info("PVE stats", "stats", stats)
return stats, nil
}
// Creates a new PVE manager
func newPVEManager(_ *Agent) *pveManager {
slog.Info("Creating new PVE manager")
// Creates a new PVE manager - may return nil if required environment variables are not set or if there is an error connecting to the API
func newPVEManager() *pveManager {
url, exists := GetEnv("PROXMOX_URL")
if !exists {
url = "https://localhost:8006/api2/json"
}
nodeName, nodeNameExists := GetEnv("PROXMOX_NODE")
tokenID, tokenIDExists := GetEnv("PROXMOX_TOKENID")
secret, secretExists := GetEnv("PROXMOX_SECRET")
const nodeEnvVar = "PROXMOX_NODE"
const tokenIDEnvVar = "PROXMOX_TOKENID"
const secretEnvVar = "PROXMOX_SECRET"
slog.Info("PROXMOX_URL", "url", url)
slog.Info("PROXMOX_NODE", "nodeName", nodeName)
slog.Info("PROXMOX_TOKENID", "tokenID", tokenID)
slog.Info("PROXMOX_SECRET", "secret", secret)
nodeName, nodeNameExists := GetEnv(nodeEnvVar)
tokenID, tokenIDExists := GetEnv(tokenIDEnvVar)
secret, secretExists := GetEnv(secretEnvVar)
if !nodeNameExists || !tokenIDExists || !secretExists {
slog.Debug("Proxmox env vars unset", nodeEnvVar, nodeNameExists, tokenIDEnvVar, tokenIDExists, secretEnvVar, secretExists)
return nil
}
// PROXMOX_INSECURE_TLS defaults to true; set to "false" to enable TLS verification
insecureTLS := true
@@ -129,40 +123,32 @@ func newPVEManager(_ *Agent) *pveManager {
insecureTLS = val != "false"
}
var client *proxmox.Client
if nodeNameExists && tokenIDExists && secretExists {
httpClient := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: insecureTLS,
},
httpClient := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: insecureTLS,
},
}
client = proxmox.NewClient(url,
proxmox.WithHTTPClient(&httpClient),
proxmox.WithAPIToken(tokenID, secret),
)
} else {
slog.Error("Env variables not set")
client = nil
},
}
client := proxmox.NewClient(url,
proxmox.WithHTTPClient(&httpClient),
proxmox.WithAPIToken(tokenID, secret),
)
pveManager := pveManager{
client: client,
nodeName: nodeName,
nodeStatsMap: make(map[string]*container.PveNodeStats),
}
pveManager := &pveManager{
client: client,
nodeName: nodeName,
containerStatsMap: make(map[string]*container.Stats),
}
// Retrieve node cpu count
if client != nil {
slog.Info("Getting node CPU count", "nodeName", nodeName)
node, err := client.Node(context.Background(), nodeName)
if err != nil {
pveManager.client = nil
slog.Error("Error getting node", "err", err)
} else {
pveManager.cpuCount = node.CPUInfo.CPUs
slog.Info("Node CPU count", "cpuCount", pveManager.cpuCount)
}
node, err := client.Node(context.Background(), nodeName)
if err != nil {
slog.Error("Error connecting to Proxmox", "err", err)
return nil
} else {
pveManager.cpuCount = node.CPUInfo.CPUs
}
return pveManager
return &pveManager
}

View File

@@ -559,6 +559,10 @@ func TestWriteToSessionEncoding(t *testing.T) {
// Helper function to create test data for encoding tests
func createTestCombinedData() *system.CombinedData {
var stats = container.Stats{}
stats.Name = "test-container"
stats.Cpu = 10.5
stats.Mem = 1073741824 // 1GB
return &system.CombinedData{
Stats: system.Stats{
Cpu: 25.5,
@@ -577,11 +581,7 @@ func createTestCombinedData() *system.CombinedData {
AgentVersion: "0.12.0",
},
Containers: []*container.Stats{
{
Name: "test-container",
Cpu: 10.5,
Mem: 1073741824, // 1GB
},
&stats,
},
}
}

View File

@@ -127,25 +127,38 @@ var DockerHealthStrings = map[string]DockerHealth{
"unhealthy": DockerHealthUnhealthy,
}
// Docker container stats
type Stats struct {
Name string `json:"n" cbor:"0,keyasint"`
Cpu float64 `json:"c" cbor:"1,keyasint"`
Mem float64 `json:"m" cbor:"2,keyasint"`
NetworkSent float64 `json:"ns,omitzero" cbor:"3,keyasint,omitzero"` // deprecated 0.18.3 (MB) - keep field for old agents/records
NetworkRecv float64 `json:"nr,omitzero" cbor:"4,keyasint,omitzero"` // deprecated 0.18.3 (MB) - keep field for old agents/records
Bandwidth [2]uint64 `json:"b,omitzero" cbor:"9,keyasint,omitzero"` // [sent bytes, recv bytes]
Health DockerHealth `json:"-" cbor:"5,keyasint"`
Status string `json:"-" cbor:"6,keyasint"`
Id string `json:"-" cbor:"7,keyasint"`
Image string `json:"-" cbor:"8,keyasint"`
MaxCPU uint64 `json:"-" cbor:"10,keyasint,omitzero"` // PVE: max vCPU count
MaxMem uint64 `json:"-" cbor:"11,keyasint,omitzero"` // PVE: max memory bytes
Uptime uint64 `json:"-" cbor:"12,keyasint,omitzero"` // PVE: uptime in seconds
// PrevCpu [2]uint64 `json:"-"`
CpuSystem uint64 `json:"-"`
CpuContainer uint64 `json:"-"`
// SharedCoreMetrics contains fields that are common to both container Stats and PveNodeStats
type SharedCoreMetrics struct {
Name string `json:"n" cbor:"0,keyasint"`
Cpu float64 `json:"c" cbor:"1,keyasint"`
Mem float64 `json:"m" cbor:"2,keyasint"`
NetworkSent float64 `json:"ns,omitzero" cbor:"3,keyasint,omitzero"` // deprecated 0.18.3 (MB) - keep field for old agents/records
NetworkRecv float64 `json:"nr,omitzero" cbor:"4,keyasint,omitzero"` // deprecated 0.18.3 (MB) - keep field for old agents/records
Id string `json:"-" cbor:"7,keyasint"`
Bandwidth [2]uint64 `json:"b,omitzero" cbor:"9,keyasint,omitzero"` // [sent bytes, recv bytes]
PrevNet prevNetStats `json:"-"`
PrevReadTime time.Time `json:"-"`
}
// Stats holds data specific to docker containers for the containers table
type Stats struct {
SharedCoreMetrics // used to populate stats field in container_stats
// fields used for containers table
Health DockerHealth `json:"-" cbor:"5,keyasint"`
Status string `json:"-" cbor:"6,keyasint"`
Image string `json:"-" cbor:"8,keyasint"`
}
// PveNodeStats holds data specific to PVE nodes for the pve_vms table
type PveNodeStats struct {
SharedCoreMetrics // used to populate stats field in pve_stats
// fields used for pve_vms table
MaxCPU uint64 `json:"-" cbor:"10,keyasint,omitzero"` // PVE: max vCPU count
MaxMem uint64 `json:"-" cbor:"11,keyasint,omitzero"` // PVE: max memory bytes
Uptime uint64 `json:"-" cbor:"12,keyasint,omitzero"` // PVE: uptime in seconds
Type string `json:"-" cbor:"13,keyasint,omitzero"` // PVE: resource type (e.g. "qemu" or "lxc")
}

View File

@@ -170,10 +170,10 @@ type Details struct {
// Final data structure to return to the hub
type CombinedData struct {
Stats Stats `json:"stats" cbor:"0,keyasint"`
Info Info `json:"info" cbor:"1,keyasint"`
Containers []*container.Stats `json:"container" cbor:"2,keyasint"`
SystemdServices []*systemd.Service `json:"systemd,omitempty" cbor:"3,keyasint,omitempty"`
Details *Details `cbor:"4,keyasint,omitempty"`
PVEStats []*container.Stats `json:"pve,omitempty" cbor:"5,keyasint,omitempty"`
Stats Stats `json:"stats" cbor:"0,keyasint"`
Info Info `json:"info" cbor:"1,keyasint"`
Containers []*container.Stats `json:"container" cbor:"2,keyasint"`
SystemdServices []*systemd.Service `json:"systemd,omitempty" cbor:"3,keyasint,omitempty"`
Details *Details `cbor:"4,keyasint,omitempty"`
PVEStats []*container.PveNodeStats `json:"pve,omitempty" cbor:"5,keyasint,omitempty"`
}

View File

@@ -355,7 +355,7 @@ func createContainerRecords(app core.App, data []*container.Stats, systemId stri
}
// createPVEVMRecords creates or updates pve_vms records
func createPVEVMRecords(app core.App, data []*container.Stats, systemId string) error {
func createPVEVMRecords(app core.App, data []*container.PveNodeStats, systemId string) error {
if len(data) == 0 {
return nil
}
@@ -370,7 +370,7 @@ func createPVEVMRecords(app core.App, data []*container.Stats, systemId string)
valueStrings = append(valueStrings, fmt.Sprintf("({:id%[1]s}, {:system}, {:name%[1]s}, {:type%[1]s}, {:cpu%[1]s}, {:mem%[1]s}, {:net%[1]s}, {:maxcpu%[1]s}, {:maxmem%[1]s}, {:uptime%[1]s}, {:updated})", suffix))
params["id"+suffix] = makeStableHashId(systemId, vm.Id)
params["name"+suffix] = vm.Name
params["type"+suffix] = vm.Image // "qemu" or "lxc"
params["type"+suffix] = vm.Type // "qemu" or "lxc"
params["cpu"+suffix] = vm.Cpu
params["mem"+suffix] = vm.Mem
params["maxcpu"+suffix] = vm.MaxCPU

View File

@@ -42,11 +42,11 @@ type StatsRecord struct {
// global variables for reusing allocations
var (
statsRecord StatsRecord
containerStats []container.Stats
containerStats []container.SharedCoreMetrics
sumStats system.Stats
tempStats system.Stats
queryParams = make(dbx.Params, 1)
containerSums = make(map[string]*container.Stats)
containerSums = make(map[string]*container.SharedCoreMetrics)
)
// Create longer records by averaging shorter records
@@ -441,7 +441,7 @@ func (rm *RecordManager) AverageSystemStats(db dbx.Builder, records RecordIds) *
}
// Calculate the average stats of a list of container_stats or pve_stats records
func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds, collectionName string) []container.Stats {
func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds, collectionName string) []container.SharedCoreMetrics {
// Clear global map for reuse
for k := range containerSums {
delete(containerSums, k)
@@ -461,12 +461,12 @@ func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds
db.NewQuery(fmt.Sprintf("SELECT stats FROM %s WHERE id = {:id}", collectionName)).Bind(queryParams).One(&statsRecord)
if err := json.Unmarshal(statsRecord.Stats, &containerStats); err != nil {
return []container.Stats{}
return []container.SharedCoreMetrics{}
}
for i := range containerStats {
stat := containerStats[i]
if _, ok := sums[stat.Name]; !ok {
sums[stat.Name] = &container.Stats{Name: stat.Name}
sums[stat.Name] = &container.SharedCoreMetrics{Name: stat.Name}
}
sums[stat.Name].Cpu += stat.Cpu
sums[stat.Name].Mem += stat.Mem
@@ -481,9 +481,9 @@ func (rm *RecordManager) AverageContainerStats(db dbx.Builder, records RecordIds
}
}
result := make([]container.Stats, 0, len(sums))
result := make([]container.SharedCoreMetrics, 0, len(sums))
for _, value := range sums {
result = append(result, container.Stats{
result = append(result, container.SharedCoreMetrics{
Name: value.Name,
Cpu: twoDecimals(value.Cpu / count),
Mem: twoDecimals(value.Mem / count),