fleet/server/pubsub/query_results_test.go
Lucas Manuel Rodriguez 6415f062c6
Reduce size of DistributedQueryResult to improve live query performance (#11882)
This was found while working on #10957.

When running a live query, a lot of unused host data is stored in Redis
and sent on every live query result message via websockets. The frontend
and fleetctl just need `id`, `hostname` and `display_name`. (This
becomes worse every time we add new fields to the `Host` struct.)

Sample of one websocket message result when running `SELECT * from
osquery_info;`:

size in `main`: 2234 bytes
```
a["{\"type\":\"result\",\"data\":{\"distributed_query_execution_id\":57,\"host\":
{\"created_at\":\"2023-05-22T12:14:11Z\",\"updated_at\":\"2023-05-23T12:31:51Z\",
\"software_updated_at\":\"0001-01-01T00:00:00Z\",\"id\":106,\"detail_updated_at\":\"2023-05-23T11:50:04Z\",
\"label_updated_at\":\"2023-05-23T11:50:04Z\",\"policy_updated_at\":\"1970-01-02T00:00:00Z\",
\"last_enrolled_at\":\"2023-05-22T12:14:12Z\",
\"seen_time\":\"2023-05-23T09:52:23.876311-03:00\",\"refetch_requested\":false,
\"hostname\":\"lucass-macbook-pro.local\",\"uuid\":\"BD4DFA10-E334-41D9-8136-D2163A8FE588\",\"platform\":\"darwin\",\"osquery_version\":\"5.8.2\",\"os_version\":\"macOS 13.3.1\",\"build\":\"22E261\",\"platform_like\":\"darwin\",\"code_name\":\"\",
\"uptime\":91125000000000,\"memory\":34359738368,\"cpu_type\":\"x86_64h\",\"cpu_subtype\":\"Intel x86-64h Haswell\",\"cpu_brand\":\"Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz\",\"cpu_physical_cores\":4,\"cpu_logical_cores\":8,\"hardware_vendor\":\"Apple Inc.\",\"hardware_model\":\"MacBookPro16,2\",\"hardware_version\":\"1.0\",
\"hardware_serial\":\"0DPQR4HMD1FZ\",
\"computer_name\":\"Lucas’s MacBook Pro\",\"public_ip\":\"\",
\"primary_ip\":\"192.168.0.230\",\"primary_mac\":\"68:2f:67:8e:b6:1f\",
\"distributed_interval\":1,\"config_tls_refresh\":60,\"logger_tls_period\":10,\"team_id\":null,
\"pack_stats\":null,\"team_name\":null,
\"gigs_disk_space_available\":386.23,\"percent_disk_space_available\":40,
\"issues\":{\"total_issues_count\":0,\"failing_policies_count\":0},
\"mdm\":{\"enrollment_status\":null,\"server_url\":null,\"name\":\"\",\"encryption_key_available\":false},
\"status\":\"online\",\"display_text\":\"lucass-macbook-pro.local\",\"display_name\":\"Lucas’s MacBook Pro\"},
\"rows\":[{\"build_distro\":\"10.14\",\"build_platform\":\"darwin\",
\"config_hash\":\"b7ee9363a7c686e76e99ffb122e9c5241a791e69\",\"config_valid\":\"1\",
\"extensions\":\"active\",\"host_display_name\":\"Lucas’s MacBook Pro\",
\"host_hostname\":\"lucass-macbook-pro.local\",\"instance_id\":\"cde5de81-344b-4c76-b1c5-dae964fdd4f2\",\"pid\":\"8370\",\"platform_mask\":\"21\",\"start_time\":\"1684757652\",
\"uuid\":\"BD4DFA10-E334-41D9-8136-D2163A8FE588\",
\"version\":\"5.8.2\",\"watcher\":\"8364\"}],\"error\":null}}"]
```

vs. size of the message result on this branch: 675 bytes
```
a["{\"type\":\"result\",\"data\":{\"distributed_query_execution_id\":59,
\"host\":{\"id\":106,\"hostname\":\"lucass-macbook-pro.local\",
\"display_name\":\"Lucas’s MacBook Pro\"},
\"rows\":[{\"build_distro\":\"10.14\",\"build_platform\":\"darwin\",
\"config_hash\":\"f80dee827635db39077a458243379b3ad63311fd\",
\"config_valid\":\"1\",\"extensions\":\"active\",\"host_display_name\":\"Lucas’s MacBook Pro\",
\"host_hostname\":\"lucass-macbook-pro.local\",
\"instance_id\":\"cde5de81-344b-4c76-b1c5-dae964fdd4f2\",\"pid\":\"8370\",\"platform_mask\":\"21\",
\"start_time\":\"1684757652\",\"uuid\":\"BD4DFA10-E334-41D9-8136-D2163A8FE588\",\"version\":\"5.8.2\",
\"watcher\":\"8364\"}]}}"]
```

Manual tests included running with an old fleetctl running with a new
fleet server, and vice-versa, a new fleetctl running against an old
fleet server.

- [X] Changes file added for user-visible changes in `changes/` or
`orbit/changes/`.
See [Changes
files](https://fleetdm.com/docs/contributing/committing-changes#changes-files)
for more information.
- [X] Documented any API changes (docs/Using-Fleet/REST-API.md or
docs/Contributing/API-for-contributors.md)
- ~[ ] Documented any permissions changes~
- ~[ ] Input data is properly validated, `SELECT *` is avoided, SQL
injection is prevented (using placeholders for values in statements)~
- ~[ ] Added support on fleet's osquery simulator `cmd/osquery-perf` for
new osquery data ingestion features.~
- [X] Added/updated tests
- [X] Manual QA for all new/changed functionality
  - ~For Orbit and Fleet Desktop changes:~
- ~[ ] Manual QA must be performed in the three main OSs, macOS, Windows
and Linux.~
- ~[ ] Auto-update manual QA, from released version of component to new
version (see [tools/tuf/test](../tools/tuf/test/README.md)).~
2023-05-25 08:11:53 -03:00

217 lines
5.3 KiB
Go

package pubsub
import (
"context"
"sync"
"testing"
"time"
"github.com/fleetdm/fleet/v4/server/datastore/redis"
"github.com/fleetdm/fleet/v4/server/fleet"
redigo "github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out. http://stackoverflow.com/a/32843750/491710
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
func TestQueryResultsStoreErrors(t *testing.T) {
runTest := func(t *testing.T, store *redisQueryResults) {
result := fleet.DistributedQueryResult{
DistributedQueryCampaignID: 9999,
Rows: []map[string]string{{"bing": "fds"}},
Host: fleet.ResultHostData{
ID: 4,
},
}
// Write with no subscriber
err := store.WriteResult(result)
require.Error(t, err)
castErr, ok := err.(Error)
if assert.True(t, ok, "err should be pubsub.Error") {
assert.True(t, castErr.NoSubscriber(), "NoSubscriber() should be true")
}
// Write with one subscriber, force it to bind to a different node if
// this is a cluster, so we don't rely on publishing/subscribing on the
// same nodes.
conn := redis.ReadOnlyConn(store.pool, store.pool.Get())
defer conn.Close()
err = redis.BindConn(store.pool, conn, "ZZZ")
require.NoError(t, err)
psc := &redigo.PubSubConn{Conn: conn}
pubSubName := pubSubForID(9999)
require.NoError(t, psc.Subscribe(pubSubName))
// wait for subscribed confirmation
start := time.Now()
var loopOk bool
loop:
for time.Since(start) < 2*time.Second {
msg := psc.Receive()
switch msg := msg.(type) {
case redigo.Subscription:
require.Equal(t, msg.Count, 1)
loopOk = true
break loop
}
}
require.True(t, loopOk, "timed out")
err = store.WriteResult(result)
require.NoError(t, err)
}
t.Run("standalone", func(t *testing.T) {
store := SetupRedisForTest(t, false, false)
runTest(t, store)
})
t.Run("cluster", func(t *testing.T) {
store := SetupRedisForTest(t, true, true)
runTest(t, store)
})
}
func TestQueryResultsStore(t *testing.T) {
runTest := func(t *testing.T, store *redisQueryResults) {
// Test handling results for two campaigns in parallel
campaign1 := fleet.DistributedQueryCampaign{ID: 1}
ctx1, cancel1 := context.WithCancel(context.Background())
channel1, err := store.ReadChannel(ctx1, campaign1)
assert.Nil(t, err)
expected1 := []fleet.DistributedQueryResult{
{
DistributedQueryCampaignID: 1,
Rows: []map[string]string{{"foo": "bar"}},
Host: fleet.ResultHostData{
ID: 1,
},
},
{
DistributedQueryCampaignID: 1,
Rows: []map[string]string{{"whoo": "wahh"}},
Host: fleet.ResultHostData{
ID: 3,
},
},
{
DistributedQueryCampaignID: 1,
Rows: []map[string]string{{"bing": "fds"}},
Host: fleet.ResultHostData{
ID: 4,
},
},
}
campaign2 := fleet.DistributedQueryCampaign{ID: 2}
ctx2, cancel2 := context.WithCancel(context.Background())
channel2, err := store.ReadChannel(ctx2, campaign2)
assert.Nil(t, err)
expected2 := []fleet.DistributedQueryResult{
{
DistributedQueryCampaignID: 2,
Rows: []map[string]string{{"tim": "tom"}},
Host: fleet.ResultHostData{
ID: 1,
},
},
{
DistributedQueryCampaignID: 2,
Rows: []map[string]string{{"slim": "slam"}},
Host: fleet.ResultHostData{
ID: 3,
},
},
}
var results1, results2 []fleet.DistributedQueryResult
var readerWg, writerWg sync.WaitGroup
readerWg.Add(1)
go func() {
defer readerWg.Done()
for res := range channel1 {
switch res := res.(type) {
case fleet.DistributedQueryResult:
results1 = append(results1, res)
}
}
}()
readerWg.Add(1)
go func() {
defer readerWg.Done()
for res := range channel2 {
switch res := res.(type) {
case fleet.DistributedQueryResult:
results2 = append(results2, res)
}
}
}()
// Wait to ensure subscriptions are activated before writing
time.Sleep(100 * time.Millisecond)
writerWg.Add(1)
go func() {
defer writerWg.Done()
for _, res := range expected1 {
assert.Nil(t, store.WriteResult(res))
}
time.Sleep(300 * time.Millisecond)
cancel1()
}()
writerWg.Add(1)
go func() {
defer writerWg.Done()
for _, res := range expected2 {
assert.Nil(t, store.WriteResult(res))
}
time.Sleep(300 * time.Millisecond)
cancel2()
}()
// wait with a timeout to ensure that the test can't hang
if waitTimeout(&writerWg, 5*time.Second) {
t.Error("Timed out waiting for writers to join")
}
if waitTimeout(&readerWg, 5*time.Second) {
t.Error("Timed out waiting for readers to join")
}
assert.EqualValues(t, expected1, results1)
assert.EqualValues(t, expected2, results2)
}
t.Run("standalone", func(t *testing.T) {
store := SetupRedisForTest(t, false, false)
runTest(t, store)
})
t.Run("cluster", func(t *testing.T) {
store := SetupRedisForTest(t, true, true)
runTest(t, store)
})
}