Compare commits

...

3 Commits

Author SHA1 Message Date
Henry Dollman
06e4dd10e0 0.0.1-alpha.9 2024-07-23 22:41:33 -04:00
Henry Dollman
af4d5137d6 lower 55 sec system update check to 50 sec 2024-07-23 22:41:05 -04:00
Henry Dollman
5e255f8f69 use semaphore to limit concurrency in agent
subtract mem cache from container stats
2024-07-23 22:40:39 -04:00
2 changed files with 52 additions and 27 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"
sshServer "github.com/gliderlabs/ssh"
@@ -21,11 +22,20 @@ import (
psutilNet "github.com/shirou/gopsutil/v4/net"
)
var Version = "0.0.1-alpha.8"
var Version = "0.0.1-alpha.9"
var containerCpuMap = make(map[string][2]uint64)
var containerCpuMutex = &sync.Mutex{}
// var containerCpuMutex = &sync.Mutex{}
var sem = make(chan struct{}, 15)
func acquireSemaphore() {
sem <- struct{}{}
}
func releaseSemaphore() {
<-sem
}
var diskIoStats = DiskIoStats{
Read: 0,
@@ -48,11 +58,11 @@ var client = &http.Client{
Dial: func(proto, addr string) (net.Conn, error) {
return net.Dial("unix", "/var/run/docker.sock")
},
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxIdleConns: 10,
DisableKeepAlives: false,
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxIdleConnsPerHost: 50,
DisableKeepAlives: false,
},
}
@@ -156,27 +166,35 @@ func getDockerStats() ([]*ContainerStats, error) {
containerStats := make([]*ContainerStats, 0, len(containers))
// store valid ids to clean up old container ids from map
validIds := make(map[string]struct{}, len(containers))
var wg sync.WaitGroup
for _, ctr := range containers {
ctr.IdShort = ctr.ID[:12]
cstats, err := getContainerStats(ctr)
if err != nil {
// retry once
cstats, err = getContainerStats(ctr)
validIds[ctr.IdShort] = struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
cstats, err := getContainerStats(ctr)
if err != nil {
log.Printf("Error getting container stats: %+v\n", err)
continue
// retry once
cstats, err = getContainerStats(ctr)
if err != nil {
log.Printf("Error getting container stats: %+v\n", err)
return
}
}
}
containerStats = append(containerStats, cstats)
containerStats = append(containerStats, cstats)
}()
}
// clean up old container ids from map
validIds := make(map[string]struct{}, len(containers))
for _, ctr := range containers {
validIds[ctr.IdShort] = struct{}{}
}
wg.Wait()
for id := range containerCpuMap {
if _, exists := validIds[id]; !exists {
// log.Printf("Removing container cpu map entry: %+v\n", id)
delete(containerCpuMap, id)
}
}
@@ -185,6 +203,9 @@ func getDockerStats() ([]*ContainerStats, error) {
}
func getContainerStats(ctr *Container) (*ContainerStats, error) {
// use semaphore to limit concurrency
acquireSemaphore()
defer releaseSemaphore()
resp, err := client.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1")
if err != nil {
return &ContainerStats{}, err
@@ -198,14 +219,18 @@ func getContainerStats(ctr *Container) (*ContainerStats, error) {
name := ctr.Names[0][1:]
// memory
usedMemory := statsJson.MemoryStats.Usage - statsJson.MemoryStats.Cache
// memory (https://docs.docker.com/reference/cli/docker/container/stats/)
memCache := statsJson.MemoryStats.Stats["inactive_file"]
if memCache == 0 {
memCache = statsJson.MemoryStats.Stats["cache"]
}
usedMemory := statsJson.MemoryStats.Usage - memCache
// pctMemory := float64(usedMemory) / float64(statsJson.MemoryStats.Limit) * 100
// cpu
// add default values to containerCpu if it doesn't exist
// containerCpuMutex.Lock()
// defer containerCpuMutex.Unlock()
containerCpuMutex.Lock()
defer containerCpuMutex.Unlock()
if _, ok := containerCpuMap[ctr.IdShort]; !ok {
containerCpuMap[ctr.IdShort] = [2]uint64{0, 0}
}

View File

@@ -30,7 +30,7 @@ import (
"golang.org/x/crypto/ssh"
)
var Version = "0.0.1-alpha.8"
var Version = "0.0.1-alpha.9"
var app *pocketbase.PocketBase
var serverConnections = make(map[string]*Server)
@@ -221,10 +221,10 @@ func updateSystems() {
// app.Logger().Error("Failed to query systems")
return
}
fiftyFiveSecondsAgo := time.Now().UTC().Add(-55 * time.Second)
fiftySecondsAgo := time.Now().UTC().Add(-50 * time.Second)
batchSize := len(records)/4 + 1
for i := 0; i < batchSize; i++ {
if records[i].Get("updated").(types.DateTime).Time().After(fiftyFiveSecondsAgo) {
if records[i].Get("updated").(types.DateTime).Time().After(fiftySecondsAgo) {
break
}
// log.Println("updating", records[i].Get(("name")))