add nvtop integration and introduce GPU_COLLECTOR env var

This commit is contained in:
henrygd
2026-02-13 17:10:16 -05:00
parent 1f1a448aef
commit 14ecb1b069
6 changed files with 834 additions and 164 deletions

View File

@@ -21,13 +21,10 @@ const (
// Commands
nvidiaSmiCmd string = "nvidia-smi"
rocmSmiCmd string = "rocm-smi"
amdgpuCmd string = "amdgpu" // internal cmd for sysfs collection
tegraStatsCmd string = "tegrastats"
nvtopCmd string = "nvtop"
noGPUFoundMsg string = "no GPU found - see https://beszel.dev/guide/gpu"
// Polling intervals
nvidiaSmiInterval string = "4" // in seconds
tegraStatsInterval string = "3700" // in milliseconds
rocmSmiInterval time.Duration = 4300 * time.Millisecond
// Command retry and timeout constants
retryWaitTime time.Duration = 5 * time.Second
maxFailureRetries int = 5
@@ -40,13 +37,7 @@ const (
// GPUManager manages data collection for GPUs (either Nvidia or AMD)
type GPUManager struct {
sync.Mutex
nvidiaSmi bool
rocmSmi bool
amdgpu bool
tegrastats bool
intelGpuStats bool
nvml bool
GpuDataMap map[string]*system.GPUData
GpuDataMap map[string]*system.GPUData
// lastAvgData stores the last calculated averages for each GPU
// Used when a collection happens before new data arrives (Count == 0)
lastAvgData map[string]system.GPUData
@@ -87,6 +78,51 @@ type gpuCollector struct {
var errNoValidData = fmt.Errorf("no valid GPU data found") // Error for missing data
// collectorSource identifies a selectable GPU collector in GPU_COLLECTOR.
type collectorSource string
const (
collectorSourceNVTop collectorSource = collectorSource(nvtopCmd)
collectorSourceNVML collectorSource = "nvml"
collectorSourceNvidiaSMI collectorSource = collectorSource(nvidiaSmiCmd)
collectorSourceIntelGpuTop collectorSource = collectorSource(intelGpuStatsCmd)
collectorSourceAmdSysfs collectorSource = "amd_sysfs"
collectorSourceRocmSMI collectorSource = collectorSource(rocmSmiCmd)
collectorGroupNvidia string = "nvidia"
collectorGroupIntel string = "intel"
collectorGroupAmd string = "amd"
)
func isValidCollectorSource(source collectorSource) bool {
switch source {
case collectorSourceNVTop,
collectorSourceNVML,
collectorSourceNvidiaSMI,
collectorSourceIntelGpuTop,
collectorSourceAmdSysfs,
collectorSourceRocmSMI:
return true
}
return false
}
// gpuCapabilities describes detected GPU tooling and sysfs support on the host.
type gpuCapabilities struct {
hasNvidiaSmi bool
hasRocmSmi bool
hasAmdSysfs bool
hasTegrastats bool
hasIntelGpuTop bool
hasNvtop bool
}
type collectorDefinition struct {
group string
available bool
start func(onFailure func()) bool
deprecationWarning string
}
// starts and manages the ongoing collection of GPU data for the specified GPU management utility
func (c *gpuCollector) start() {
for {
@@ -392,93 +428,257 @@ func (gm *GPUManager) storeSnapshot(id string, gpu *system.GPUData, cacheKey uin
gm.lastSnapshots[cacheKey][id] = snapshot
}
// detectGPUs checks for the presence of GPU management tools (nvidia-smi, rocm-smi, tegrastats)
// in the system path. It sets the corresponding flags in the GPUManager struct if any of these
// tools are found. If none of the tools are found, it returns an error indicating that no GPU
// management tools are available.
func (gm *GPUManager) detectGPUs() error {
// discoverGpuCapabilities checks for available GPU tooling and sysfs support.
// It only reports capability presence and does not apply policy decisions.
func (gm *GPUManager) discoverGpuCapabilities() gpuCapabilities {
caps := gpuCapabilities{
hasAmdSysfs: gm.hasAmdSysfs(),
}
if _, err := exec.LookPath(nvidiaSmiCmd); err == nil {
gm.nvidiaSmi = true
caps.hasNvidiaSmi = true
}
if _, err := exec.LookPath(rocmSmiCmd); err == nil {
if val, _ := GetEnv("AMD_SYSFS"); val == "true" {
gm.amdgpu = true
} else {
gm.rocmSmi = true
}
} else if gm.hasAmdSysfs() {
gm.amdgpu = true
caps.hasRocmSmi = true
}
if _, err := exec.LookPath(tegraStatsCmd); err == nil {
gm.tegrastats = true
gm.nvidiaSmi = false
caps.hasTegrastats = true
}
if _, err := exec.LookPath(intelGpuStatsCmd); err == nil {
gm.intelGpuStats = true
caps.hasIntelGpuTop = true
}
if gm.nvidiaSmi || gm.rocmSmi || gm.amdgpu || gm.tegrastats || gm.intelGpuStats || gm.nvml {
return nil
if _, err := exec.LookPath(nvtopCmd); err == nil {
caps.hasNvtop = true
}
return fmt.Errorf("no GPU found - install nvidia-smi, rocm-smi, or intel_gpu_top")
return caps
}
// startCollector starts the appropriate GPU data collector based on the command
func (gm *GPUManager) startCollector(command string) {
collector := gpuCollector{
name: command,
bufSize: 10 * 1024,
}
switch command {
case intelGpuStatsCmd:
go func() {
failures := 0
for {
if err := gm.collectIntelStats(); err != nil {
failures++
if failures > maxFailureRetries {
break
}
slog.Warn("Error collecting Intel GPU data; see https://beszel.dev/guide/gpu", "err", err)
time.Sleep(retryWaitTime)
continue
func hasAnyGpuCollector(caps gpuCapabilities) bool {
return caps.hasNvidiaSmi || caps.hasRocmSmi || caps.hasAmdSysfs || caps.hasTegrastats || caps.hasIntelGpuTop || caps.hasNvtop
}
func (gm *GPUManager) startIntelCollector() {
go func() {
failures := 0
for {
if err := gm.collectIntelStats(); err != nil {
failures++
if failures > maxFailureRetries {
break
}
slog.Warn("Error collecting Intel GPU data; see https://beszel.dev/guide/gpu", "err", err)
time.Sleep(retryWaitTime)
continue
}
}()
case nvidiaSmiCmd:
collector.cmdArgs = []string{
"-l", nvidiaSmiInterval,
}
}()
}
func (gm *GPUManager) startNvidiaSmiCollector(intervalSeconds string) {
collector := gpuCollector{
name: nvidiaSmiCmd,
bufSize: 10 * 1024,
cmdArgs: []string{
"-l", intervalSeconds,
"--query-gpu=index,name,temperature.gpu,memory.used,memory.total,utilization.gpu,power.draw",
"--format=csv,noheader,nounits",
}
collector.parse = gm.parseNvidiaData
go collector.start()
case tegraStatsCmd:
collector.cmdArgs = []string{"--interval", tegraStatsInterval}
collector.parse = gm.getJetsonParser()
go collector.start()
case amdgpuCmd:
go func() {
if err := gm.collectAmdStats(); err != nil {
slog.Warn("Error collecting AMD GPU data via sysfs", "err", err)
}
}()
case rocmSmiCmd:
collector.cmdArgs = []string{"--showid", "--showtemp", "--showuse", "--showpower", "--showproductname", "--showmeminfo", "vram", "--json"}
collector.parse = gm.parseAmdData
go func() {
failures := 0
for {
if err := collector.collect(); err != nil {
failures++
if failures > maxFailureRetries {
break
}
slog.Warn("Error collecting AMD GPU data via rocm-smi", "err", err)
}
time.Sleep(rocmSmiInterval)
}
}()
},
parse: gm.parseNvidiaData,
}
go collector.start()
}
func (gm *GPUManager) startTegraStatsCollector(intervalMilliseconds string) {
collector := gpuCollector{
name: tegraStatsCmd,
bufSize: 10 * 1024,
cmdArgs: []string{"--interval", intervalMilliseconds},
parse: gm.getJetsonParser(),
}
go collector.start()
}
func (gm *GPUManager) startRocmSmiCollector(pollInterval time.Duration) {
collector := gpuCollector{
name: rocmSmiCmd,
bufSize: 10 * 1024,
cmdArgs: []string{"--showid", "--showtemp", "--showuse", "--showpower", "--showproductname", "--showmeminfo", "vram", "--json"},
parse: gm.parseAmdData,
}
go func() {
failures := 0
for {
if err := collector.collect(); err != nil {
failures++
if failures > maxFailureRetries {
break
}
slog.Warn("Error collecting AMD GPU data via rocm-smi", "err", err)
}
time.Sleep(pollInterval)
}
}()
}
func (gm *GPUManager) collectorDefinitions(caps gpuCapabilities) map[collectorSource]collectorDefinition {
return map[collectorSource]collectorDefinition{
collectorSourceNVML: {
group: collectorGroupNvidia,
available: caps.hasNvidiaSmi,
start: func(_ func()) bool {
return gm.startNvmlCollector()
},
},
collectorSourceNvidiaSMI: {
group: collectorGroupNvidia,
available: caps.hasNvidiaSmi,
start: func(_ func()) bool {
gm.startNvidiaSmiCollector("4") // seconds
return true
},
},
collectorSourceIntelGpuTop: {
group: collectorGroupIntel,
available: caps.hasIntelGpuTop,
start: func(_ func()) bool {
gm.startIntelCollector()
return true
},
},
collectorSourceAmdSysfs: {
group: collectorGroupAmd,
available: caps.hasAmdSysfs,
start: func(_ func()) bool {
return gm.startAmdSysfsCollector()
},
},
collectorSourceRocmSMI: {
group: collectorGroupAmd,
available: caps.hasRocmSmi,
deprecationWarning: "rocm-smi is deprecated and may be removed in a future release",
start: func(_ func()) bool {
gm.startRocmSmiCollector(4300 * time.Millisecond)
return true
},
},
collectorSourceNVTop: {
available: caps.hasNvtop,
start: func(onFailure func()) bool {
gm.startNvtopCollector("30", onFailure) // tens of milliseconds
return true
},
},
}
}
// parseCollectorPriority parses GPU_COLLECTOR and returns valid ordered entries.
func parseCollectorPriority(value string) []collectorSource {
parts := strings.Split(value, ",")
priorities := make([]collectorSource, 0, len(parts))
for _, raw := range parts {
name := collectorSource(strings.TrimSpace(strings.ToLower(raw)))
if !isValidCollectorSource(name) {
if name != "" {
slog.Warn("Ignoring unknown GPU collector", "collector", name)
}
continue
}
priorities = append(priorities, name)
}
return priorities
}
// startNvmlCollector initializes NVML and starts its polling loop.
func (gm *GPUManager) startNvmlCollector() bool {
collector := &nvmlCollector{gm: gm}
if err := collector.init(); err != nil {
slog.Warn("Failed to initialize NVML", "err", err)
return false
}
go collector.start()
return true
}
// startAmdSysfsCollector starts AMD GPU collection via sysfs.
func (gm *GPUManager) startAmdSysfsCollector() bool {
go func() {
if err := gm.collectAmdStats(); err != nil {
slog.Warn("Error collecting AMD GPU data via sysfs", "err", err)
}
}()
return true
}
// startCollectorsByPriority starts collectors in order with one source per vendor group.
func (gm *GPUManager) startCollectorsByPriority(priorities []collectorSource, caps gpuCapabilities) int {
definitions := gm.collectorDefinitions(caps)
selectedGroups := make(map[string]bool, 3)
started := 0
for i, source := range priorities {
definition, ok := definitions[source]
if !ok || !definition.available {
continue
}
// nvtop is not a vendor-specific collector, so should only be used if no other collectors are selected or it is first in GPU_COLLECTOR.
if source == collectorSourceNVTop {
if len(selectedGroups) > 0 {
slog.Warn("Skipping nvtop because other collectors are selected")
continue
}
// if nvtop fails, fall back to remaining collectors.
remaining := append([]collectorSource(nil), priorities[i+1:]...)
if definition.start(func() {
gm.startCollectorsByPriority(remaining, caps)
}) {
started++
return started
}
}
group := definition.group
if group == "" || selectedGroups[group] {
continue
}
if definition.deprecationWarning != "" {
slog.Warn(definition.deprecationWarning)
}
if definition.start(nil) {
selectedGroups[group] = true
started++
}
}
return started
}
// resolveLegacyCollectorPriority builds the default collector order when GPU_COLLECTOR is unset.
func (gm *GPUManager) resolveLegacyCollectorPriority(caps gpuCapabilities) []collectorSource {
priorities := make([]collectorSource, 0, 4)
if caps.hasNvidiaSmi && !caps.hasTegrastats {
if nvml, _ := GetEnv("NVML"); nvml == "true" {
priorities = append(priorities, collectorSourceNVML, collectorSourceNvidiaSMI)
} else {
priorities = append(priorities, collectorSourceNvidiaSMI)
}
}
if caps.hasRocmSmi {
if val, _ := GetEnv("AMD_SYSFS"); val == "true" {
priorities = append(priorities, collectorSourceAmdSysfs)
} else {
priorities = append(priorities, collectorSourceRocmSMI)
}
} else if caps.hasAmdSysfs {
priorities = append(priorities, collectorSourceAmdSysfs)
}
if caps.hasIntelGpuTop {
priorities = append(priorities, collectorSourceIntelGpuTop)
}
// Keep nvtop as a legacy last resort only when no vendor collector exists.
if len(priorities) == 0 && caps.hasNvtop {
priorities = append(priorities, collectorSourceNVTop)
}
return priorities
}
// NewGPUManager creates and initializes a new GPUManager
@@ -487,38 +687,30 @@ func NewGPUManager() (*GPUManager, error) {
return nil, nil
}
var gm GPUManager
if err := gm.detectGPUs(); err != nil {
return nil, err
caps := gm.discoverGpuCapabilities()
if !hasAnyGpuCollector(caps) {
return nil, fmt.Errorf(noGPUFoundMsg)
}
gm.GpuDataMap = make(map[string]*system.GPUData)
if gm.nvidiaSmi {
if nvml, _ := GetEnv("NVML"); nvml == "true" {
gm.nvml = true
gm.nvidiaSmi = false
collector := &nvmlCollector{gm: &gm}
if err := collector.init(); err == nil {
go collector.start()
} else {
slog.Warn("Failed to initialize NVML, falling back to nvidia-smi", "err", err)
gm.nvidiaSmi = true
gm.startCollector(nvidiaSmiCmd)
}
} else {
gm.startCollector(nvidiaSmiCmd)
// Jetson devices should always use tegrastats (ignore GPU_COLLECTOR).
if caps.hasTegrastats {
gm.startTegraStatsCollector("3700")
return &gm, nil
}
// if GPU_COLLECTOR is set, start user-defined collectors.
if collectorConfig, ok := GetEnv("GPU_COLLECTOR"); ok && strings.TrimSpace(collectorConfig) != "" {
priorities := parseCollectorPriority(collectorConfig)
if gm.startCollectorsByPriority(priorities, caps) == 0 {
return nil, fmt.Errorf("no configured GPU collectors are available")
}
return &gm, nil
}
if gm.rocmSmi {
gm.startCollector(rocmSmiCmd)
}
if gm.amdgpu {
gm.startCollector(amdgpuCmd)
}
if gm.tegrastats {
gm.startCollector(tegraStatsCmd)
}
if gm.intelGpuStats {
gm.startCollector(intelGpuStatsCmd)
// auto-detect and start collectors when GPU_COLLECTOR is unset.
if gm.startCollectorsByPriority(gm.resolveLegacyCollectorPriority(caps), caps) == 0 {
return nil, fmt.Errorf(noGPUFoundMsg)
}
return &gm, nil