Optim osquery-perf buffering (#16746)

#16423

- We need to consume less memory while Fleet is down (otherwise we might
bring the load test environment down, local tests showed that in 30m I
got to 600 MB of memory usage) so am generating the results when needed
only.
- Also fixing the release of fasthttp requests and responses (according
to their docs).
This commit is contained in:
Lucas Manuel Rodriguez 2024-02-12 19:06:58 -03:00 committed by GitHub
parent d6640a792d
commit 21e0515f78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,6 +15,7 @@ import (
"log"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"
@ -377,7 +378,7 @@ type agent struct {
scheduledQueriesMu sync.Mutex // protects the below members
scheduledQueries []string
scheduledQueryData []scheduledQuery
bufferedResults []json.RawMessage
bufferedResults []resultLog
}
type entityCount struct {
@ -566,13 +567,17 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
defer logTicker.Stop()
for range logTicker.C {
// check if we have any scheduled queries that should be returning results
var results []json.RawMessage
var results []resultLog
now := time.Now().Unix()
a.scheduledQueriesMu.Lock()
prevCount := len(a.bufferedResults)
for i, query := range a.scheduledQueryData {
if query.nextRun == 0 || now >= int64(query.nextRun) {
results = append(results, a.scheduledQueryResults(query.packName, query.Name, int(query.numRows)))
results = append(results, resultLog{
packName: query.packName,
queryName: query.Name,
numRows: int(query.numRows),
})
a.scheduledQueryData[i].nextRun = float64(now + int64(query.ScheduleInterval))
}
}
@ -588,6 +593,16 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
}
}
type resultLog struct {
packName string
queryName string
numRows int
}
func (r resultLog) emit() json.RawMessage {
return scheduledQueryResults(r.packName, r.queryName, r.numRows)
}
// sendLogsBatch sends up to loggerTLSMaxLines logs and updates the buffer.
func (a *agent) sendLogsBatch() {
if len(a.bufferedResults) == 0 {
@ -599,7 +614,11 @@ func (a *agent) sendLogsBatch() {
batchSize = len(a.bufferedResults)
}
batch := a.bufferedResults[:batchSize]
if err := a.submitLogs(batch); err != nil {
batchLogs := make([]json.RawMessage, 0, len(batch))
for _, result := range batch {
batchLogs = append(batchLogs, result.emit())
}
if err := a.submitLogs(batchLogs); err != nil {
return
}
a.bufferedResults = a.bufferedResults[batchSize:]
@ -940,13 +959,14 @@ func (a *agent) config() error {
res := fasthttp.AcquireResponse()
err := a.fastClient.Do(req, res)
if err != nil {
return fmt.Errorf("config request failed to run: %w", err)
}
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
if err != nil {
return fmt.Errorf("config request failed to run: %w", err)
}
a.stats.IncrementConfigRequests()
statusCode := res.StatusCode()
@ -1128,13 +1148,14 @@ func (a *agent) DistributedRead() (*distributedReadResponse, error) {
res := fasthttp.AcquireResponse()
err := a.fastClient.Do(req, res)
if err != nil {
return nil, fmt.Errorf("distributed/read request failed to run: %w", err)
}
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
if err != nil {
return nil, fmt.Errorf("distributed/read request failed to run: %w", err)
}
a.stats.IncrementDistributedReads()
statusCode := res.StatusCode()
@ -1584,13 +1605,14 @@ func (a *agent) DistributedWrite(queries map[string]string) error {
res := fasthttp.AcquireResponse()
err = a.fastClient.Do(req, res)
if err != nil {
return fmt.Errorf("distributed/write request failed to run: %w", err)
}
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
if err != nil {
return fmt.Errorf("distributed/write request failed to run: %w", err)
}
a.stats.IncrementDistributedWrites()
statusCode := res.StatusCode()
@ -1603,13 +1625,13 @@ func (a *agent) DistributedWrite(queries map[string]string) error {
return nil
}
func (a *agent) scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage {
func scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage {
return json.RawMessage(`{
"snapshot": [` + rows(numResults, a.UUID) + `
"snapshot": [` + rows(numResults) + `
],
"action": "snapshot",
"name": "pack/` + packName + `/` + queryName + `",
"hostIdentifier": "` + a.UUID + `",
"hostIdentifier": "EF9595F0-CE81-493A-9B06-D8A9D2CCB952",
"calendarTime": "Fri Oct 6 18:13:04 2023 UTC",
"unixTime": 1696615984,
"epoch": 0,
@ -1617,7 +1639,7 @@ func (a *agent) scheduledQueryResults(packName, queryName string, numResults int
"numerics": false,
"decorations": {
"host_uuid": "187c4d56-8e45-1a9d-8513-ac17efd2f0fd",
"hostname": "` + a.CachedString("hostname") + `"
"hostname": "osquery-perf"
}
}`)
}
@ -1651,13 +1673,14 @@ func (a *agent) submitLogs(results []json.RawMessage) error {
res := fasthttp.AcquireResponse()
err = a.fastClient.Do(req, res)
if err != nil {
return fmt.Errorf("log request failed to run: %w", err)
}
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
if err != nil {
return fmt.Errorf("log request failed to run: %w", err)
}
a.stats.IncrementResultLogRequests()
statusCode := res.StatusCode()
@ -1670,7 +1693,7 @@ func (a *agent) submitLogs(results []json.RawMessage) error {
}
// rows returns a set of rows for use in tests for query results.
func rows(num int, hostUUID string) string {
func rows(num int) string {
b := strings.Builder{}
for i := 0; i < num; i++ {
b.WriteString(` {
@ -1683,7 +1706,7 @@ func rows(num int, hostUUID string) string {
"pid": "3574",
"platform_mask": "9",
"start_time": "1696502961",
"uuid": "` + hostUUID + `",
"uuid": "EF9595F0-CE81-493A-9B06-D8A9D2CCB95",
"version": "5.9.2",
"watcher": "3570"
}`)
@ -1696,6 +1719,11 @@ func rows(num int, hostUUID string) string {
}
func main() {
// Start HTTP server for pprof. See https://pkg.go.dev/net/http/pprof.
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
validTemplateNames := map[string]bool{
"macos_13.6.2.tmpl": true,
"macos_14.1.2.tmpl": true,