Fix duplicate schedules and platform matching on scheduled queries (#2977)

* Fix duplicate schedules and platform matching on scheduled queries

* scheduled_queries.platform can be NULL

* Add unit tests

* Add rhel host and check zero stats
This commit is contained in:
Lucas Manuel Rodriguez 2021-11-17 19:03:30 -03:00 committed by GitHub
parent b61e34ea90
commit cb54d9a8dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 496 additions and 18 deletions

View File

@ -264,9 +264,20 @@ func saveHostPackStatsDB(ctx context.Context, db sqlx.ExecerContext, host *fleet
return nil
}
// schQueriesPlatformFromHost converts the platform from a Host.Platform
// string to a scheduled query platform string.
func schQueryPlatformFromHost(hostPlatform string) string {
switch hostPlatform {
case "ubuntu", "rhel", "debian":
return "linux"
default: // darwin, windows
return hostPlatform
}
}
// loadhostPacksStatsDB will load all the pack stats for the given host. The scheduled
// queries that haven't run yet are returned with zero values.
func loadHostPackStatsDB(ctx context.Context, db sqlx.QueryerContext, hid uint) ([]fleet.PackStats, error) {
func loadHostPackStatsDB(ctx context.Context, db sqlx.QueryerContext, hid uint, hostPlatform string) ([]fleet.PackStats, error) {
packs, err := listPacksForHost(ctx, db, hid)
if err != nil {
return nil, ctxerr.Wrapf(ctx, err, "list packs for host: %d", hid)
@ -306,8 +317,20 @@ func loadHostPackStatsDB(ctx context.Context, db sqlx.QueryerContext, hid uint)
goqu.I("queries").As("q"),
goqu.On(goqu.I("sq.query_name").Eq(goqu.I("q.name"))),
).LeftJoin(
goqu.I("scheduled_query_stats").As("sqs"),
dialect.From("scheduled_query_stats").As("sqs").Where(
goqu.I("host_id").Eq(hid),
),
goqu.On(goqu.I("sqs.scheduled_query_id").Eq(goqu.I("sq.id"))),
).Where(
goqu.Or(
// sq.platform empty or NULL means the scheduled query is set to
// run on all hosts.
goqu.I("sq.platform").Eq(""),
goqu.I("sq.platform").IsNull(),
// scheduled_queries.platform can be a comma-separated list of
// platforms, e.g. "darwin,windows".
goqu.L("FIND_IN_SET(?, sq.platform)", schQueryPlatformFromHost(hostPlatform)).Neq(0),
),
)
sql, args, err := ds.ToSQL()
if err != nil {
@ -392,7 +415,7 @@ func (d *Datastore) Host(ctx context.Context, id uint) (*fleet.Host, error) {
return nil, ctxerr.Wrap(ctx, err, "get host by id")
}
packStats, err := loadHostPackStatsDB(ctx, d.reader, host.ID)
packStats, err := loadHostPackStatsDB(ctx, d.reader, host.ID, host.Platform)
if err != nil {
return nil, err
}
@ -823,7 +846,7 @@ func (d *Datastore) HostByIdentifier(ctx context.Context, identifier string) (*f
return nil, ctxerr.Wrap(ctx, err, "get host by identifier")
}
packStats, err := loadHostPackStatsDB(ctx, d.reader, host.ID)
packStats, err := loadHostPackStatsDB(ctx, d.reader, host.ID, host.Platform)
if err != nil {
return nil, err
}

View File

@ -14,6 +14,7 @@ import (
"time"
"github.com/WatchBeam/clock"
"github.com/fleetdm/fleet/v4/server"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/ptr"
"github.com/fleetdm/fleet/v4/server/test"
@ -90,6 +91,8 @@ func TestHosts(t *testing.T) {
{"HostsListFailingPolicies", testHostsListFailingPolicies},
{"HostsExpiration", testHostsExpiration},
{"HostsAllPackStats", testHostsAllPackStats},
{"HostsPackStatsMultipleHosts", testHostsPackStatsMultipleHosts},
{"HostsPackStatsForPlatform", testHostsPackStatsForPlatform},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
@ -204,6 +207,7 @@ func testHostsSavePackStats(t *testing.T, ds *Datastore) {
Hostname: "foo.local",
PrimaryIP: "192.168.1.1",
PrimaryMac: "30-65-EC-6F-C4-58",
Platform: "darwin",
})
require.NoError(t, err)
require.NotNil(t, host)
@ -325,6 +329,7 @@ func testHostsSavePackStatsOverwrites(t *testing.T, ds *Datastore) {
Hostname: "foo.local",
PrimaryIP: "192.168.1.1",
PrimaryMac: "30-65-EC-6F-C4-58",
Platform: "darwin",
})
require.NoError(t, err)
require.NotNil(t, host)
@ -477,6 +482,7 @@ func testHostsWithTeamPackStats(t *testing.T, ds *Datastore) {
Hostname: "foo.local",
PrimaryIP: "192.168.1.1",
PrimaryMac: "30-65-EC-6F-C4-58",
Platform: "darwin",
})
require.NoError(t, err)
require.NotNil(t, host)
@ -2249,6 +2255,7 @@ func testHostsAllPackStats(t *testing.T, ds *Datastore) {
Hostname: "foo.local",
PrimaryIP: "192.168.1.1",
PrimaryMac: "30-65-EC-6F-C4-58",
Platform: "darwin",
})
require.NoError(t, err)
require.NotNil(t, host)
@ -2404,3 +2411,424 @@ func testHostsAllPackStats(t *testing.T, ds *Datastore) {
require.ElementsMatch(t, packStats[1].QueryStats, teamPackSQueryStats)
require.ElementsMatch(t, packStats[2].QueryStats, userPackSQueryStats)
}
// See #2965.
func testHostsPackStatsMultipleHosts(t *testing.T, ds *Datastore) {
osqueryHostID1, _ := server.GenerateRandomText(10)
host1, err := ds.NewHost(context.Background(), &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now(),
NodeKey: "1",
UUID: "1",
Hostname: "foo.local",
PrimaryIP: "192.168.1.1",
PrimaryMac: "30-65-EC-6F-C4-58",
Platform: "darwin",
OsqueryHostID: osqueryHostID1,
})
require.NoError(t, err)
require.NotNil(t, host1)
osqueryHostID2, _ := server.GenerateRandomText(10)
host2, err := ds.NewHost(context.Background(), &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now(),
NodeKey: "2",
UUID: "2",
Hostname: "bar.local",
PrimaryIP: "192.168.1.2",
PrimaryMac: "30-65-EC-6F-C4-59",
Platform: "darwin",
OsqueryHostID: osqueryHostID2,
})
require.NoError(t, err)
require.NotNil(t, host2)
// Create global pack (and one scheduled query in it).
test.AddAllHostsLabel(t, ds) // the global pack needs the "All Hosts" label.
labels, err := ds.ListLabels(context.Background(), fleet.TeamFilter{}, fleet.ListOptions{})
require.NoError(t, err)
require.Len(t, labels, 1)
globalPack, err := ds.EnsureGlobalPack(context.Background())
require.NoError(t, err)
globalQuery := test.NewQuery(t, ds, "global-time", "select * from time", 0, true)
globalSQuery := test.NewScheduledQuery(t, ds, globalPack.ID, globalQuery.ID, 30, true, true, "time-scheduled-global")
err = ds.AsyncBatchInsertLabelMembership(context.Background(), [][2]uint{
{labels[0].ID, host1.ID},
{labels[0].ID, host2.ID},
})
require.NoError(t, err)
globalStatsHost1 := []fleet.ScheduledQueryStats{{
ScheduledQueryName: globalSQuery.Name,
ScheduledQueryID: globalSQuery.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 8000,
Denylisted: false,
Executions: 164,
Interval: 30,
LastExecuted: time.Unix(1620325191, 0).UTC(),
OutputSize: 1337,
SystemTime: 150,
UserTime: 180,
WallTime: 0,
}}
globalStatsHost2 := []fleet.ScheduledQueryStats{{
ScheduledQueryName: globalSQuery.Name,
ScheduledQueryID: globalSQuery.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 9000,
Denylisted: false,
Executions: 165,
Interval: 30,
LastExecuted: time.Unix(1620325192, 0).UTC(),
OutputSize: 1338,
SystemTime: 151,
UserTime: 181,
WallTime: 1,
}}
// Reload the hosts and set the scheduled queries stats.
for _, tc := range []struct {
hostID uint
globalStats []fleet.ScheduledQueryStats
}{
{
hostID: host1.ID,
globalStats: globalStatsHost1,
},
{
hostID: host2.ID,
globalStats: globalStatsHost2,
},
} {
host, err := ds.Host(context.Background(), tc.hostID)
require.NoError(t, err)
host.PackStats = []fleet.PackStats{
{PackID: globalPack.ID, PackName: globalPack.Name, QueryStats: tc.globalStats},
}
require.NoError(t, ds.SaveHost(context.Background(), host))
}
// Both hosts should see just one stats entry on the one pack.
for _, tc := range []struct {
host *fleet.Host
expectedStats []fleet.ScheduledQueryStats
}{
{
host: host1,
expectedStats: globalStatsHost1,
},
{
host: host2,
expectedStats: globalStatsHost2,
},
} {
host, err := ds.Host(context.Background(), tc.host.ID)
require.NoError(t, err)
packStats := host.PackStats
require.Len(t, packStats, 1)
require.Len(t, packStats[0].QueryStats, 1)
require.ElementsMatch(t, packStats[0].QueryStats, tc.expectedStats)
}
}
// See #2964.
func testHostsPackStatsForPlatform(t *testing.T, ds *Datastore) {
osqueryHostID1, _ := server.GenerateRandomText(10)
host1, err := ds.NewHost(context.Background(), &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now(),
NodeKey: "1",
UUID: "1",
Hostname: "foo.local",
PrimaryIP: "192.168.1.1",
PrimaryMac: "30-65-EC-6F-C4-58",
Platform: "darwin",
OsqueryHostID: osqueryHostID1,
})
require.NoError(t, err)
require.NotNil(t, host1)
osqueryHostID2, _ := server.GenerateRandomText(10)
host2, err := ds.NewHost(context.Background(), &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now(),
NodeKey: "2",
UUID: "2",
Hostname: "foo.local.2",
PrimaryIP: "192.168.1.2",
PrimaryMac: "30-65-EC-6F-C4-59",
Platform: "rhel",
OsqueryHostID: osqueryHostID2,
})
require.NoError(t, err)
require.NotNil(t, host2)
// Create global pack (and one scheduled query in it).
test.AddAllHostsLabel(t, ds) // the global pack needs the "All Hosts" label.
labels, err := ds.ListLabels(context.Background(), fleet.TeamFilter{}, fleet.ListOptions{})
require.NoError(t, err)
require.Len(t, labels, 1)
globalPack, err := ds.EnsureGlobalPack(context.Background())
require.NoError(t, err)
globalQuery := test.NewQuery(t, ds, "global-time", "select * from time", 0, true)
globalSQuery1, err := ds.NewScheduledQuery(context.Background(), &fleet.ScheduledQuery{
Name: "Scheduled Query For Linux only",
PackID: globalPack.ID,
QueryID: globalQuery.ID,
Interval: 30,
Snapshot: ptr.Bool(true),
Removed: ptr.Bool(true),
Platform: ptr.String("linux"),
})
require.NoError(t, err)
require.NotZero(t, globalSQuery1.ID)
globalSQuery2, err := ds.NewScheduledQuery(context.Background(), &fleet.ScheduledQuery{
Name: "Scheduled Query For Darwin only",
PackID: globalPack.ID,
QueryID: globalQuery.ID,
Interval: 30,
Snapshot: ptr.Bool(true),
Removed: ptr.Bool(true),
Platform: ptr.String("darwin"),
})
require.NoError(t, err)
require.NotZero(t, globalSQuery2.ID)
globalSQuery3, err := ds.NewScheduledQuery(context.Background(), &fleet.ScheduledQuery{
Name: "Scheduled Query For Darwin and Linux",
PackID: globalPack.ID,
QueryID: globalQuery.ID,
Interval: 30,
Snapshot: ptr.Bool(true),
Removed: ptr.Bool(true),
Platform: ptr.String("darwin,linux"),
})
require.NoError(t, err)
require.NotZero(t, globalSQuery3.ID)
globalSQuery4, err := ds.NewScheduledQuery(context.Background(), &fleet.ScheduledQuery{
Name: "Scheduled Query For All Platforms",
PackID: globalPack.ID,
QueryID: globalQuery.ID,
Interval: 30,
Snapshot: ptr.Bool(true),
Removed: ptr.Bool(true),
Platform: ptr.String(""),
})
require.NoError(t, err)
require.NotZero(t, globalSQuery4.ID)
globalSQuery5, err := ds.NewScheduledQuery(context.Background(), &fleet.ScheduledQuery{
Name: "Scheduled Query For All Platforms v2",
PackID: globalPack.ID,
QueryID: globalQuery.ID,
Interval: 30,
Snapshot: ptr.Bool(true),
Removed: ptr.Bool(true),
Platform: nil,
})
require.NoError(t, err)
require.NotZero(t, globalSQuery5.ID)
err = ds.AsyncBatchInsertLabelMembership(context.Background(), [][2]uint{
{labels[0].ID, host1.ID},
{labels[0].ID, host2.ID},
})
require.NoError(t, err)
globalStats := []fleet.ScheduledQueryStats{
{
ScheduledQueryName: globalSQuery2.Name,
ScheduledQueryID: globalSQuery2.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 8001,
Denylisted: false,
Executions: 165,
Interval: 30,
LastExecuted: time.Unix(1620325192, 0).UTC(),
OutputSize: 1338,
SystemTime: 151,
UserTime: 181,
WallTime: 1,
},
{
ScheduledQueryName: globalSQuery3.Name,
ScheduledQueryID: globalSQuery3.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 8002,
Denylisted: false,
Executions: 166,
Interval: 30,
LastExecuted: time.Unix(1620325193, 0).UTC(),
OutputSize: 1339,
SystemTime: 152,
UserTime: 182,
WallTime: 2,
},
{
ScheduledQueryName: globalSQuery4.Name,
ScheduledQueryID: globalSQuery4.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 8003,
Denylisted: false,
Executions: 167,
Interval: 30,
LastExecuted: time.Unix(1620325194, 0).UTC(),
OutputSize: 1340,
SystemTime: 153,
UserTime: 183,
WallTime: 3,
},
{
ScheduledQueryName: globalSQuery5.Name,
ScheduledQueryID: globalSQuery5.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 8003,
Denylisted: false,
Executions: 167,
Interval: 30,
LastExecuted: time.Unix(1620325194, 0).UTC(),
OutputSize: 1340,
SystemTime: 153,
UserTime: 183,
WallTime: 3,
},
}
// Reload the host and set the scheduled queries stats for the scheduled queries that apply.
// Plus we set schedule query stats for a query that does not apply (globalSQuery1)
// (This could happen if the target platform of a schedule query is changed after creation.)
stats := make([]fleet.ScheduledQueryStats, len(globalStats))
for i := range globalStats {
stats[i] = globalStats[i]
}
stats = append(stats, fleet.ScheduledQueryStats{
ScheduledQueryName: globalSQuery1.Name,
ScheduledQueryID: globalSQuery1.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 8003,
Denylisted: false,
Executions: 167,
Interval: 30,
LastExecuted: time.Unix(1620325194, 0).UTC(),
OutputSize: 1340,
SystemTime: 153,
UserTime: 183,
WallTime: 3,
})
host, err := ds.Host(context.Background(), host1.ID)
require.NoError(t, err)
host.PackStats = []fleet.PackStats{
{PackID: globalPack.ID, PackName: globalPack.Name, QueryStats: stats},
}
require.NoError(t, ds.SaveHost(context.Background(), host))
// host should only return scheduled query stats only for the scheduled queries
// scheduled to run on "darwin".
host, err = ds.Host(context.Background(), host.ID)
require.NoError(t, err)
packStats := host.PackStats
require.Len(t, packStats, 1)
require.Len(t, packStats[0].QueryStats, 4)
sort.Slice(packStats[0].QueryStats, func(i, j int) bool {
return packStats[0].QueryStats[i].ScheduledQueryID < packStats[0].QueryStats[j].ScheduledQueryID
})
sort.Slice(globalStats, func(i, j int) bool {
return globalStats[i].ScheduledQueryID < globalStats[j].ScheduledQueryID
})
require.ElementsMatch(t, packStats[0].QueryStats, globalStats)
// host2 should only return scheduled query stats only for the scheduled queries
// scheduled to run on "linux"
host2, err = ds.Host(context.Background(), host2.ID)
require.NoError(t, err)
packStats2 := host2.PackStats
require.Len(t, packStats2, 1)
require.Len(t, packStats2[0].QueryStats, 4)
zeroStats := []fleet.ScheduledQueryStats{
{
ScheduledQueryName: globalSQuery1.Name,
ScheduledQueryID: globalSQuery1.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 0,
Denylisted: false,
Executions: 0,
Interval: 30,
LastExecuted: time.Time{},
OutputSize: 0,
SystemTime: 0,
UserTime: 0,
WallTime: 0,
},
{
ScheduledQueryName: globalSQuery3.Name,
ScheduledQueryID: globalSQuery3.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 0,
Denylisted: false,
Executions: 0,
Interval: 30,
LastExecuted: time.Time{},
OutputSize: 0,
SystemTime: 0,
UserTime: 0,
WallTime: 0,
},
{
ScheduledQueryName: globalSQuery4.Name,
ScheduledQueryID: globalSQuery4.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 0,
Denylisted: false,
Executions: 0,
Interval: 30,
LastExecuted: time.Time{},
OutputSize: 0,
SystemTime: 0,
UserTime: 0,
WallTime: 0,
},
{
ScheduledQueryName: globalSQuery5.Name,
ScheduledQueryID: globalSQuery5.ID,
QueryName: globalQuery.Name,
PackName: globalPack.Name,
PackID: globalPack.ID,
AverageMemory: 0,
Denylisted: false,
Executions: 0,
Interval: 30,
LastExecuted: time.Time{},
OutputSize: 0,
SystemTime: 0,
UserTime: 0,
WallTime: 0,
},
}
require.ElementsMatch(t, packStats2[0].QueryStats, zeroStats)
}

View File

@ -6,22 +6,46 @@ import (
"gopkg.in/guregu/null.v3"
)
// ScheduledQuery is a query that runs on a schedule.
//
// Source of documentation for the fields:
// https://osquery.readthedocs.io/en/stable/deployment/configuration/
type ScheduledQuery struct {
UpdateCreateTimestamps
ID uint `json:"id"`
PackID uint `json:"pack_id" db:"pack_id"`
Name string `json:"name"`
QueryID uint `json:"query_id" db:"query_id"`
QueryName string `json:"query_name" db:"query_name"`
Query string `json:"query"` // populated via a join on queries
Description string `json:"description,omitempty"`
Interval uint `json:"interval"`
Snapshot *bool `json:"snapshot"`
Removed *bool `json:"removed"`
Platform *string `json:"platform,omitempty"`
Version *string `json:"version,omitempty"`
Shard *uint `json:"shard"`
Denylist *bool `json:"denylist"`
ID uint `json:"id"`
PackID uint `json:"pack_id" db:"pack_id"`
Name string `json:"name"`
QueryID uint `json:"query_id" db:"query_id"`
QueryName string `json:"query_name" db:"query_name"`
Query string `json:"query"` // populated via a join on queries
Description string `json:"description,omitempty"`
// Interval specifies query frequency, in seconds.
Interval uint `json:"interval"`
Snapshot *bool `json:"snapshot"`
// Removed is a boolean to determine if "removed" actions
// should be logged default is true.
//
// When the results from a table differ from the results when the
// query was last executed, logs are emitted with {"action": "removed"}
// or {"action": "added"} for the appropriate action.
// References:
// https://osquery.readthedocs.io/en/stable/deployment/logging/#differential-logs
Removed *bool `json:"removed"`
// Platform is a comma-separated string that indicates the target platforms
// for this scheduled query.
//
// Possible values are: "darwin", "linux" and "windows".
// An empty string or nil means the scheduled query will run on all platforms.
Platform *string `json:"platform,omitempty"`
// Version can be set to only run on osquery versions greater
// than or equal-to this version string.
Version *string `json:"version,omitempty"`
// Shard restricts this query to a percentage (1-100) of target hosts.
Shard *uint `json:"shard"`
// Denylist is a boolean to determine if this query may be denylisted
// (when stopped by the Watchdog for excessive resource consumption),
// default is true.
Denylist *bool `json:"denylist"`
AggregatedStats `json:"stats,omitempty"`
}

View File

@ -8,6 +8,7 @@ import (
"github.com/fleetdm/fleet/v4/server"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/ptr"
"github.com/stretchr/testify/require"
)
@ -108,6 +109,7 @@ func NewHost(t *testing.T, ds fleet.Datastore, name, ip, key, uuid string, now t
PolicyUpdatedAt: now,
SeenTime: now,
OsqueryHostID: osqueryHostID,
Platform: "darwin",
})
require.NoError(t, err)
@ -144,6 +146,7 @@ func NewScheduledQuery(t *testing.T, ds fleet.Datastore, pid, qid, interval uint
Interval: interval,
Snapshot: &snapshot,
Removed: &removed,
Platform: ptr.String("darwin"),
})
require.NoError(t, err)
require.NotZero(t, sq.ID)