fleet/server/service/async/collect.go

194 lines
4.6 KiB
Go

package async
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"sync"
"time"
"github.com/fleetdm/fleet/v4/server/datastore/redis"
"github.com/fleetdm/fleet/v4/server/fleet"
redigo "github.com/gomodule/redigo/redis"
)
type collectorHandlerFunc func(context.Context, fleet.Datastore, fleet.RedisPool, *collectorExecStats) error
type collector struct {
// immutable after creation
name string
pool fleet.RedisPool
ds fleet.Datastore
execInterval time.Duration
jitterPct int
lockTimeout time.Duration
handler collectorHandlerFunc
errHandler func(string, error)
// mutable, must be protected by mutex
mu sync.RWMutex
stats collectorStats
}
type collectorStats struct {
SkipCount int
ExecCount int
FailuresCount int
MinExecDuration time.Duration
MaxExecDuration time.Duration
LastExecDuration time.Duration
MinExecKeys int
MaxExecKeys int
LastExecKeys int
MinExecItems int
MaxExecItems int
LastExecItems int
MinExecInserts int
MaxExecInserts int
LastExecInserts int
MinExecUpdates int
MaxExecUpdates int
LastExecUpdates int
MinExecDeletes int
MaxExecDeletes int
LastExecDeletes int
MinExecRedisCmds int
MaxExecRedisCmds int
LastExecRedisCmds int
}
type collectorExecStats struct {
Duration time.Duration
Keys int
Items int
Inserts int
Updates int
Deletes int
RedisCmds int // script counts as 1, does not include ZSCAN of active hosts iteration commands
Failed bool
}
func (c *collector) Start(ctx context.Context) {
for {
wait := c.nextRunAfter()
select {
case <-time.After(wait):
c.exec(ctx)
case <-ctx.Done():
return
}
}
}
func (c *collector) exec(ctx context.Context) {
keyLock := fmt.Sprintf(collectorLockKey, c.name)
conn := redis.ConfigureDoer(c.pool, c.pool.Get())
defer conn.Close()
if _, err := redigo.String(conn.Do("SET", keyLock, 1, "NX", "EX", int(c.lockTimeout.Seconds()))); err != nil {
var failed bool
// either redis failure or this collector didn't acquire the lock
if !errors.Is(err, redigo.ErrNil) {
failed = true
if c.errHandler != nil {
c.errHandler(c.name, err)
}
}
c.addSkipStats(failed)
return
}
defer conn.Do("DEL", keyLock)
// at this point, the lock has been acquired, execute the collector handler
ctx, cancel := context.WithTimeout(ctx, time.Duration(c.lockTimeout.Seconds())*time.Second)
defer cancel()
var stats collectorExecStats
start := time.Now()
if err := c.handler(ctx, c.ds, c.pool, &stats); err != nil {
stats.Failed = true
if c.errHandler != nil {
c.errHandler(c.name, err)
}
}
stats.Duration = time.Since(start)
c.addStats(&stats)
}
func (c *collector) ReadStats() collectorStats {
c.mu.RLock()
defer c.mu.RUnlock()
return c.stats
}
func (c *collector) addStats(stats *collectorExecStats) {
minMaxLastInt := func(min, max, last *int, val int) {
*last = val
if val > *max {
*max = val
}
if val < *min || (val > 0 && (*min == 0)) {
*min = val
}
}
c.mu.Lock()
defer c.mu.Unlock()
c.stats.ExecCount++
if stats.Failed {
c.stats.FailuresCount++
}
minMaxLastInt(&c.stats.MinExecItems, &c.stats.MaxExecItems, &c.stats.LastExecItems, stats.Items)
minMaxLastInt(&c.stats.MinExecKeys, &c.stats.MaxExecKeys, &c.stats.LastExecKeys, stats.Keys)
minMaxLastInt(&c.stats.MinExecInserts, &c.stats.MaxExecInserts, &c.stats.LastExecInserts, stats.Inserts)
minMaxLastInt(&c.stats.MinExecUpdates, &c.stats.MaxExecUpdates, &c.stats.LastExecUpdates, stats.Updates)
minMaxLastInt(&c.stats.MinExecDeletes, &c.stats.MaxExecDeletes, &c.stats.LastExecDeletes, stats.Deletes)
minMaxLastInt(&c.stats.MinExecRedisCmds, &c.stats.MaxExecRedisCmds, &c.stats.LastExecRedisCmds, stats.RedisCmds)
c.stats.LastExecDuration = stats.Duration
if stats.Duration > c.stats.MaxExecDuration {
c.stats.MaxExecDuration = stats.Duration
}
if stats.Duration < c.stats.MinExecDuration || (stats.Duration > 0 && c.stats.MinExecDuration == 0) {
c.stats.MinExecDuration = stats.Duration
}
}
func (c *collector) addSkipStats(failed bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.stats.SkipCount++
if failed {
c.stats.FailuresCount++
}
}
func (c *collector) nextRunAfter() time.Duration {
var jitter time.Duration
if c.jitterPct > 0 {
maxJitter := time.Duration(c.jitterPct) * c.execInterval / time.Duration(100.0)
randDuration, err := rand.Int(rand.Reader, big.NewInt(int64(maxJitter)))
if err == nil {
jitter = time.Duration(randDuration.Int64())
}
}
// randomize running after or before the jitter based on even/odd
if jitter%2 == 0 {
return c.execInterval - jitter
}
return c.execInterval + jitter
}