osquery-perf changes for Fleet downtime load test (#17310)

Including all the osquery-perf changes needed to perform #16423.
This commit is contained in:
Lucas Manuel Rodriguez 2024-03-04 15:10:10 -03:00 committed by GitHub
parent b2edd1f201
commit 4acb713bf1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 149 additions and 58 deletions

View File

@ -14,7 +14,6 @@ import (
"io"
"log"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"os"
@ -377,7 +376,13 @@ type agent struct {
scheduledQueriesMu sync.Mutex // protects the below members
scheduledQueries []string
scheduledQueryData []scheduledQuery
bufferedResults []resultLog
// bufferedResults contains result logs that are buffered when
// /api/v1/osquery/log requests to the Fleet server fail.
//
// NOTE: We use a map instead of a slice to prevent the data structure to
// increase indefinitely (we sacrifice accuracy of logs but that's
// a-ok for osquery-perf and load testing).
bufferedResults map[resultLog]int
}
type entityCount struct {
@ -462,6 +467,7 @@ func newAgent(
disableScriptExec: disableScriptExec,
disableFleetDesktop: disableFleetDesktop,
loggerTLSMaxLines: loggerTLSMaxLines,
bufferedResults: make(map[resultLog]int),
}
}
@ -562,7 +568,7 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
var results []resultLog
now := time.Now().Unix()
a.scheduledQueriesMu.Lock()
prevCount := len(a.bufferedResults)
prevCount := a.countBuffered()
for i, query := range a.scheduledQueryData {
if query.nextRun == 0 || now >= int64(query.nextRun) {
results = append(results, resultLog{
@ -573,25 +579,56 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
a.scheduledQueryData[i].nextRun = float64(now + int64(query.ScheduleInterval))
}
}
a.bufferedResults = append(a.bufferedResults, results...)
if len(a.bufferedResults) > 1_000_000 { // osquery buffered_log_max is 1M
extra := len(a.bufferedResults) - 1_000_000
a.bufferedResults = a.bufferedResults[extra:]
if prevCount+len(results) < 1_000_000 { // osquery buffered_log_max is 1M
a.addToBuffer(results)
}
a.sendLogsBatch()
newBufferedCount := len(a.bufferedResults) - prevCount
newBufferedCount := a.countBuffered() - prevCount
a.stats.UpdateBufferedLogs(newBufferedCount)
a.scheduledQueriesMu.Unlock()
}
}
func (a *agent) countBuffered() int {
var total int
for _, count := range a.bufferedResults {
total += count
}
return total
}
func (a *agent) addToBuffer(results []resultLog) {
for _, result := range results {
a.bufferedResults[result] += 1
}
}
// getBatch returns a random set of logs from the buffered logs.
// NOTE: We sacrifice some accuracy in the name of CPU and memory efficiency.
func (a *agent) getBatch(batchSize int) []resultLog {
results := make([]resultLog, 0, batchSize)
for result, count := range a.bufferedResults {
left := batchSize - len(results)
if left <= 0 {
return results
}
if count > left {
count = left
}
for i := 0; i < count; i++ {
results = append(results, result)
}
}
return results
}
type resultLog struct {
packName string
queryName string
numRows int
}
func (r resultLog) emit() json.RawMessage {
func (r resultLog) emit() []byte {
return scheduledQueryResults(r.packName, r.queryName, r.numRows)
}
@ -602,18 +639,29 @@ func (a *agent) sendLogsBatch() {
}
batchSize := a.loggerTLSMaxLines
if len(a.bufferedResults) < batchSize {
batchSize = len(a.bufferedResults)
if count := a.countBuffered(); count < batchSize {
batchSize = count
}
batch := a.bufferedResults[:batchSize]
batchLogs := make([]json.RawMessage, 0, len(batch))
for _, result := range batch {
batchLogs = append(batchLogs, result.emit())
}
if err := a.submitLogs(batchLogs); err != nil {
batch := a.getBatch(batchSize)
if err := a.submitLogs(batch); err != nil {
return
}
a.bufferedResults = a.bufferedResults[batchSize:]
a.removeBuffered(batchSize)
}
// removeBuffered removes a random set of logs from the buffered logs.
// NOTE: We sacrifice some accuracy in the name of CPU and memory efficiency.
func (a *agent) removeBuffered(batchSize int) {
for b := batchSize; b > 0; {
for result, count := range a.bufferedResults {
if count > b {
a.bufferedResults[result] -= b
return
}
delete(a.bufferedResults, result)
b -= count
}
}
}
func (a *agent) runOrbitLoop() {
@ -833,17 +881,18 @@ func (a *agent) execScripts(execIDs []string, orbitClient *service.OrbitClient)
}
}
func (a *agent) waitingDo(request *http.Request) *http.Response {
response, err := http.DefaultClient.Do(request)
func (a *agent) waitingDo(fn func() *http.Request) *http.Response {
response, err := http.DefaultClient.Do(fn())
for err != nil || response.StatusCode != http.StatusOK {
if err != nil {
log.Printf("failed to run request: %s", err)
} else { // res.StatusCode() != http.StatusOK
response.Body.Close()
log.Printf("request failed: %d", response.StatusCode)
}
a.stats.IncrementErrors(1)
<-time.Tick(time.Duration(rand.Intn(120)+1) * time.Second)
response, err = http.DefaultClient.Do(request)
response, err = http.DefaultClient.Do(fn())
}
return response
}
@ -863,13 +912,14 @@ func (a *agent) orbitEnroll() error {
return err
}
request, err := http.NewRequest("POST", a.serverAddress+"/api/fleet/orbit/enroll", bytes.NewReader(jsonBytes))
if err != nil {
return err
}
request.Header.Add("Content-type", "application/json")
response := a.waitingDo(request)
response := a.waitingDo(func() *http.Request {
request, err := http.NewRequest("POST", a.serverAddress+"/api/fleet/orbit/enroll", bytes.NewReader(jsonBytes))
if err != nil {
panic(err)
}
request.Header.Add("Content-type", "application/json")
return request
})
defer response.Body.Close()
var parsedResp service.EnrollOrbitResponse
@ -900,13 +950,14 @@ func (a *agent) enroll(i int, onlyAlreadyEnrolled bool) error {
return err
}
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/enroll", &body)
if err != nil {
return err
}
request.Header.Add("Content-type", "application/json")
response := a.waitingDo(request)
response := a.waitingDo(func() *http.Request {
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/enroll", &body)
if err != nil {
panic(err)
}
request.Header.Add("Content-type", "application/json")
return request
})
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
@ -955,6 +1006,7 @@ func (a *agent) config() error {
} `json:"packs"`
}{}
if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil {
a.stats.IncrementConfigErrors()
return fmt.Errorf("json parse at config: %w", err)
}
@ -1135,6 +1187,7 @@ func (a *agent) DistributedRead() (*distributedReadResponse, error) {
var parsedResp distributedReadResponse
if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil {
a.stats.IncrementDistributedReadErrors()
log.Printf("json parse: %s", err)
return nil, err
}
@ -1589,8 +1642,8 @@ func (a *agent) DistributedWrite(queries map[string]string) error {
return nil
}
func scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage {
return json.RawMessage(`{
func scheduledQueryResults(packName, queryName string, numResults int) []byte {
return []byte(`{
"snapshot": [` + rows(numResults) + `
],
"action": "snapshot",
@ -1608,33 +1661,37 @@ func scheduledQueryResults(packName, queryName string, numResults int) json.RawM
}`)
}
func (a *agent) submitLogs(results []json.RawMessage) error {
// Connection check to prevent unnecessary JSON marshaling when the server is down.
conn, err := net.Dial("tcp", strings.TrimPrefix(a.serverAddress, "https://"))
func (a *agent) connCheck() error {
request, err := http.NewRequest("GET", a.serverAddress+"/version", nil)
if err != nil {
panic(err)
}
response, err := http.DefaultClient.Do(request)
if err != nil {
return err
}
conn.Close()
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return errors.New(http.StatusText(response.StatusCode))
}
return nil
}
jsonResults, err := json.Marshal(results)
if err != nil {
panic(err)
}
type submitLogsRequest struct {
NodeKey string `json:"node_key"`
LogType string `json:"log_type"`
Data json.RawMessage `json:"data"`
}
slr := submitLogsRequest{
NodeKey: a.nodeKey,
LogType: "result",
Data: jsonResults,
}
body, err := json.Marshal(slr)
if err != nil {
panic(err)
func (a *agent) submitLogs(results []resultLog) error {
// Connection check to prevent unnecessary JSON marshaling when the server is down.
if err := a.connCheck(); err != nil {
return fmt.Errorf("/version check failed: %w", err)
}
var resultLogs []byte
for i, result := range results {
if i > 0 {
resultLogs = append(resultLogs, ',')
}
resultLogs = append(resultLogs, result.emit()...)
}
body := []byte(`{"node_key": "` + a.nodeKey + `", "log_type": "result", "data": [` + string(resultLogs) + `]}`)
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/log", bytes.NewReader(body))
if err != nil {
return err

View File

@ -0,0 +1,33 @@
#!/bin/bash
# Script for enrolling osquery-perf hosts by `terraform apply`ing in increments of 8 `loadtest` containers.
# NOTE(lucas): This is the currently known configuration that won't tip the loadtest environment,
# but maybe in the future we can be more aggressive (and reduce enroll time).
#
# ./enroll.sh my-branch 8 240
BRANCH_NAME=$1
START_INDEX=$2
END_INDEX=$3
INCREMENT=8
SLEEP_TIME_SECONDS=60
if [ -z "$BRANCH_NAME" ]; then
echo "Missing BRANCH_NAME"
fi
if [ -z "$START_INDEX" ]; then
echo "Missing START_INDEX"
fi
if [ -z "$END_INDEX" ]; then
echo "Missing END_INDEX"
fi
# We add this check to avoid terraform (error-prone) locking in case of typos.
read -p "You will use BRANCH_NAME=$BRANCH_NAME. Continue? "
set -x
for (( c=$START_INDEX; c<=$END_INDEX; c+=$INCREMENT )); do
terraform apply -var tag=$BRANCH_NAME -var loadtest_containers=$c -auto-approve
sleep $SLEEP_TIME_SECONDS
done

View File

@ -1457,6 +1457,7 @@ func submitLogsEndpoint(ctx context.Context, request interface{}, svc fleet.Serv
err = newOsqueryError("unmarshalling result logs: " + err.Error())
break
}
logging.WithExtras(ctx, "results", len(results))
// We currently return errors to osqueryd if there are any issues submitting results
// to the configured external destinations.