Implement storage of scheduled query statistics (#735)

Track all data from the osquery_schedule table on a per-host basis. This
data is now returned when retrieving host details in the API.
This commit is contained in:
Zach Wasserman 2021-05-06 21:05:09 -07:00 committed by GitHub
parent b0af428794
commit f788254e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 578 additions and 7 deletions

View File

@ -44,6 +44,7 @@ var TestFunctions = [...]func(*testing.T, kolide.Datastore){
testListHostsInLabel,
testListUniqueHostsInLabels,
testSaveHosts,
testSaveHostPackStats,
testDeleteHost,
testListHosts,
testListHostsFilterAdditional,

View File

@ -92,6 +92,125 @@ func testSaveHosts(t *testing.T, ds kolide.Datastore) {
assert.Nil(t, host)
}
func testSaveHostPackStats(t *testing.T, ds kolide.Datastore) {
host, err := ds.NewHost(&kolide.Host{
DetailUpdateTime: time.Now(),
LabelUpdateTime: time.Now(),
SeenTime: time.Now(),
NodeKey: "1",
UUID: "1",
HostName: "foo.local",
PrimaryIP: "192.168.1.1",
PrimaryMac: "30-65-EC-6F-C4-58",
})
require.NoError(t, err)
require.NotNil(t, host)
// Pack and query must exist for stats to save successfully
pack1 := test.NewPack(t, ds, "test1")
query1 := test.NewQuery(t, ds, "time", "select * from time", 0, true)
squery1 := test.NewScheduledQuery(t, ds, pack1.ID, query1.ID, 30, true, true, "time-scheduled")
stats1 := []kolide.ScheduledQueryStats{
{
ScheduledQueryName: squery1.Name,
ScheduledQueryID: squery1.ID,
QueryName: query1.Name,
PackName: pack1.Name,
PackID: pack1.ID,
AverageMemory: 8000,
Denylisted: false,
Executions: 164,
Interval: 30,
LastExecuted: time.Unix(1620325191, 0).UTC(),
OutputSize: 1337,
SystemTime: 150,
UserTime: 180,
WallTime: 0,
},
}
pack2 := test.NewPack(t, ds, "test2")
squery2 := test.NewScheduledQuery(t, ds, pack2.ID, query1.ID, 30, true, true, "time-scheduled")
query2 := test.NewQuery(t, ds, "processes", "select * from processes", 0, true)
squery3 := test.NewScheduledQuery(t, ds, pack2.ID, query2.ID, 30, true, true, "processes")
stats2 := []kolide.ScheduledQueryStats{
{
ScheduledQueryName: squery2.Name,
ScheduledQueryID: squery2.ID,
QueryName: query1.Name,
PackName: pack2.Name,
PackID: pack2.ID,
AverageMemory: 431,
Denylisted: true,
Executions: 1,
Interval: 30,
LastExecuted: time.Unix(980943843, 0).UTC(),
OutputSize: 134,
SystemTime: 1656,
UserTime: 18453,
WallTime: 10,
},
{
ScheduledQueryName: squery3.Name,
ScheduledQueryID: squery3.ID,
QueryName: query2.Name,
PackName: pack2.Name,
PackID: pack2.ID,
AverageMemory: 8000,
Denylisted: false,
Executions: 164,
Interval: 30,
LastExecuted: time.Unix(1620325191, 0).UTC(),
OutputSize: 1337,
SystemTime: 150,
UserTime: 180,
WallTime: 0,
},
}
host.PackStats = []kolide.PackStats{
{
PackName: "test1",
// Append an additional entry to be sure that receiving stats for a
// now-deleted query doesn't break saving. This extra entry should
// not be returned on loading the host.
QueryStats: append(stats1, kolide.ScheduledQueryStats{PackName: "foo", ScheduledQueryName: "bar"}),
},
{
PackName: "test2",
QueryStats: stats2,
},
}
require.NoError(t, ds.SaveHost(host))
host, err = ds.Host(host.ID)
require.NoError(t, err)
require.Len(t, host.PackStats, 2)
sort.Slice(host.PackStats, func(i, j int) bool {
return host.PackStats[i].PackName < host.PackStats[j].PackName
})
assert.Equal(t, host.PackStats[0].PackName, "test1")
assert.ElementsMatch(t, host.PackStats[0].QueryStats, stats1)
assert.Equal(t, host.PackStats[1].PackName, "test2")
assert.ElementsMatch(t, host.PackStats[1].QueryStats, stats2)
// Set to nil should not overwrite
host.PackStats = nil
require.NoError(t, ds.SaveHost(host))
host, err = ds.Host(host.ID)
require.NoError(t, err)
require.Len(t, host.PackStats, 2)
// Set to empty should make it empty
host.PackStats = []kolide.PackStats{}
require.NoError(t, ds.SaveHost(host))
host, err = ds.Host(host.ID)
require.NoError(t, err)
require.Len(t, host.PackStats, 0)
}
func testDeleteHost(t *testing.T, ds kolide.Datastore) {
host, err := ds.NewHost(&kolide.Host{
DetailUpdateTime: time.Now(),

View File

@ -84,9 +84,11 @@ func testNewScheduledQuery(t *testing.T, ds kolide.Datastore) {
query, err := ds.NewScheduledQuery(&kolide.ScheduledQuery{
PackID: p1.ID,
QueryID: q1.ID,
Name: "foo-scheduled",
})
require.Nil(t, err)
assert.Equal(t, "foo", query.Name)
assert.Equal(t, "foo", query.QueryName)
assert.Equal(t, "foo-scheduled", query.Name)
assert.Equal(t, "select * from time;", query.Query)
}
@ -94,7 +96,7 @@ func testScheduledQuery(t *testing.T, ds kolide.Datastore) {
u1 := test.NewUser(t, ds, "Admin", "admin", "admin@kolide.co", true)
q1 := test.NewQuery(t, ds, "foo", "select * from time;", u1.ID, true)
p1 := test.NewPack(t, ds, "baz")
sq1 := test.NewScheduledQuery(t, ds, p1.ID, q1.ID, 60, false, false)
sq1 := test.NewScheduledQuery(t, ds, p1.ID, q1.ID, 60, false, false, "")
query, err := ds.ScheduledQuery(sq1.ID)
require.Nil(t, err)
@ -118,7 +120,7 @@ func testDeleteScheduledQuery(t *testing.T, ds kolide.Datastore) {
u1 := test.NewUser(t, ds, "Admin", "admin", "admin@kolide.co", true)
q1 := test.NewQuery(t, ds, "foo", "select * from time;", u1.ID, true)
p1 := test.NewPack(t, ds, "baz")
sq1 := test.NewScheduledQuery(t, ds, p1.ID, q1.ID, 60, false, false)
sq1 := test.NewScheduledQuery(t, ds, p1.ID, q1.ID, 60, false, false, "")
query, err := ds.ScheduledQuery(sq1.ID)
require.Nil(t, err)

View File

@ -3,6 +3,7 @@ package mysql
import (
"database/sql"
"fmt"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
@ -129,6 +130,124 @@ func (d *Datastore) SaveHost(host *kolide.Host) error {
return errors.Wrapf(err, "save host with id %d", host.ID)
}
// Save host pack stats only if it is non-nil. Empty stats should be
// represented by an empty slice.
if host.PackStats != nil {
if err := d.saveHostPackStats(host); err != nil {
return err
}
}
return nil
}
func (d *Datastore) saveHostPackStats(host *kolide.Host) error {
if err := d.withRetryTxx(func(tx *sqlx.Tx) error {
sql := `
DELETE FROM scheduled_query_stats
WHERE host_id = ?
`
if _, err := tx.Exec(sql, host.ID); err != nil {
return errors.Wrap(err, "delete old stats")
}
// Bulk insert software entries
var args []interface{}
queryCount := 0
for _, pack := range host.PackStats {
for _, query := range pack.QueryStats {
queryCount++
args = append(args,
query.PackName,
query.ScheduledQueryName,
host.ID,
query.AverageMemory,
query.Denylisted,
query.Executions,
query.Interval,
query.LastExecuted,
query.OutputSize,
query.SystemTime,
query.UserTime,
query.WallTime,
)
}
}
if queryCount == 0 {
return nil
}
values := strings.TrimSuffix(strings.Repeat("((SELECT sq.id FROM scheduled_queries sq JOIN packs p ON (sq.pack_id = p.id) WHERE p.name = ? AND sq.name = ?),?,?,?,?,?,?,?,?,?,?),", queryCount), ",")
sql = fmt.Sprintf(`
INSERT IGNORE INTO scheduled_query_stats (
scheduled_query_id,
host_id,
average_memory,
denylisted,
executions,
schedule_interval,
last_executed,
output_size,
system_time,
user_time,
wall_time
)
VALUES %s
`, values)
if _, err := tx.Exec(sql, args...); err != nil {
return errors.Wrap(err, "insert pack stats")
}
return nil
}); err != nil {
return errors.Wrap(err, "save pack stats")
}
return nil
}
func (d *Datastore) loadHostPackStats(host *kolide.Host) error {
sql := `
SELECT
sqs.scheduled_query_id,
sqs.average_memory,
sqs.denylisted,
sqs.executions,
sqs.schedule_interval,
sqs.last_executed,
sqs.output_size,
sqs.system_time,
sqs.user_time,
sqs.wall_time,
sq.name AS scheduled_query_name,
sq.id AS scheduled_query_id,
sq.query_name AS query_name,
p.name AS pack_name,
p.id as pack_id
FROM scheduled_query_stats sqs
JOIN scheduled_queries sq ON (sqs.scheduled_query_id = sq.id)
JOIN packs p ON (sq.pack_id = p.id)
WHERE host_id = ?
`
var stats []kolide.ScheduledQueryStats
if err := d.db.Select(&stats, sql, host.ID); err != nil {
return errors.Wrap(err, "load pack stats")
}
packs := map[uint]kolide.PackStats{}
for _, query := range stats {
pack := packs[query.PackID]
pack.PackName = query.PackName
pack.PackID = query.PackID
pack.QueryStats = append(pack.QueryStats, query)
packs[pack.PackID] = pack
}
for _, pack := range packs {
host.PackStats = append(host.PackStats, pack)
}
return nil
}
@ -148,7 +267,10 @@ func (d *Datastore) Host(id uint) (*kolide.Host, error) {
host := &kolide.Host{}
err := d.db.Get(host, sqlStatement, id)
if err != nil {
return nil, errors.Wrap(err, "getting host by id")
return nil, errors.Wrap(err, "get host by id")
}
if err := d.loadHostPackStats(host); err != nil {
return nil, err
}
return host, nil
@ -605,5 +727,9 @@ func (d *Datastore) HostByIdentifier(identifier string) (*kolide.Host, error) {
return nil, errors.Wrap(err, "get host by identifier")
}
if err := d.loadHostPackStats(host); err != nil {
return nil, err
}
return host, nil
}

View File

@ -0,0 +1,40 @@
package tables
import (
"database/sql"
"github.com/pkg/errors"
)
func init() {
MigrationClient.AddMigration(Up_20210506095025, Down_20210506095025)
}
func Up_20210506095025(tx *sql.Tx) error {
sql := `
CREATE TABLE scheduled_query_stats (
host_id int unsigned NOT NULL,
scheduled_query_id int unsigned NOT NULL,
average_memory int,
denylisted tinyint(1),
executions int,
schedule_interval int,
last_executed timestamp,
output_size int,
system_time int,
user_time int,
wall_time int,
PRIMARY KEY (host_id, scheduled_query_id),
FOREIGN KEY (host_id) REFERENCES hosts (id),
FOREIGN KEY (scheduled_query_id) REFERENCES scheduled_queries (id)
)
`
if _, err := tx.Exec(sql); err != nil {
return errors.Wrap(err, "create scheduled_query_stats")
}
return nil
}
func Down_20210506095025(tx *sql.Tx) error {
return nil
}

View File

@ -87,7 +87,7 @@ func (d *Datastore) NewScheduledQuery(sq *kolide.ScheduledQuery, opts ...kolide.
}
sq.Query = metadata[0].Query
sq.Name = metadata[0].Name
sq.QueryName = metadata[0].Name
return sq, nil
}

View File

@ -145,6 +145,9 @@ type Host struct {
LoggerTLSPeriod uint `json:"logger_tls_period" db:"logger_tls_period"`
Additional *json.RawMessage `json:"additional,omitempty" db:"additional"`
EnrollSecretName string `json:"enroll_secret_name" db:"enroll_secret_name"`
// Loaded via JOIN in DB
PackStats []PackStats `json:"pack_stats"`
}
// HostDetail provides the full host metadata along with associated labels and

View File

@ -176,3 +176,9 @@ type PackTarget struct {
PackID uint
Target
}
type PackStats struct {
PackID uint `json:"pack_id,omitempty"`
PackName string `json:"pack_name,omitempty"`
QueryStats []ScheduledQueryStats `json:"query_stats"`
}

View File

@ -2,6 +2,7 @@ package kolide
import (
"context"
"time"
"gopkg.in/guregu/null.v3"
)
@ -51,3 +52,25 @@ type ScheduledQueryPayload struct {
Shard *null.Int `json:"shard"`
Denylist *bool `json:"denylist"`
}
type ScheduledQueryStats struct {
ScheduledQueryName string `json:"scheduled_query_name,omitempty" db:"scheduled_query_name"`
ScheduledQueryID uint `json:"scheduled_query_id,omitempty" db:"scheduled_query_id"`
QueryName string `json:"query_name,omitempty" db:"query_name"`
PackName string `json:"pack_name,omitempty" db:"pack_name"`
PackID uint `json:"pack_id,omitempty" db:"pack_id"`
// From osquery directly
AverageMemory int `json:"average_memory" db:"average_memory"`
Denylisted bool `json:"denylisted" db:"denylisted"`
Executions int `json:"executions" db:"executions"`
// Note schedule_interval is used for DB since "interval" is a reserved word in MySQL
Interval int `json:"interval" db:"schedule_interval"`
LastExecuted time.Time `json:"last_executed" db:"last_executed"`
OutputSize int `json:"output_size" db:"output_size"`
SystemTime int `json:"system_time" db:"system_time"`
UserTime int `json:"user_time" db:"user_time"`
WallTime int `json:"wall_time" db:"wall_time"`
}

View File

@ -747,6 +747,80 @@ FROM python_packages;
Platforms: []string{"windows"},
IngestFunc: ingestSoftware,
},
"scheduled_query_stats": {
Query: `
SELECT *,
(SELECT value from osquery_flags where name = 'pack_delimiter') AS delimiter
FROM osquery_schedule
`,
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
packs := map[string][]kolide.ScheduledQueryStats{}
for _, row := range rows {
providedName := row["name"]
if providedName == "" {
level.Debug(logger).Log(
"msg", "host reported scheduled query with empty name",
"host", host.HostName,
)
continue
}
delimiter := row["delimiter"]
if delimiter == "" {
level.Debug(logger).Log(
"msg", "host reported scheduled query with empty delimiter",
"host", host.HostName,
)
continue
}
// Split with a limit of 2 in case query name includes the
// delimiter. Not much we can do if pack name includes the
// delimiter.
trimmedName := strings.TrimPrefix(providedName, "pack"+delimiter)
parts := strings.SplitN(trimmedName, delimiter, 2)
if len(parts) != 2 {
level.Debug(logger).Log(
"msg", "could not split pack and query names",
"host", host.HostName,
"name", providedName,
"delimiter", delimiter,
)
continue
}
packName, scheduledName := parts[0], parts[1]
stats := kolide.ScheduledQueryStats{
ScheduledQueryName: scheduledName,
PackName: packName,
AverageMemory: cast.ToInt(row["average_memory"]),
Denylisted: cast.ToBool(row["denylisted"]),
Executions: cast.ToInt(row["executions"]),
Interval: cast.ToInt(row["interval"]),
// Cast to int first to allow cast.ToTime to interpret the unix timestamp.
LastExecuted: time.Unix(cast.ToInt64(row["last_executed"]), 0).UTC(),
OutputSize: cast.ToInt(row["output_size"]),
SystemTime: cast.ToInt(row["system_time"]),
UserTime: cast.ToInt(row["user_time"]),
WallTime: cast.ToInt(row["wall_time"]),
}
packs[packName] = append(packs[packName], stats)
}
host.PackStats = []kolide.PackStats{}
for packName, stats := range packs {
host.PackStats = append(
host.PackStats,
kolide.PackStats{
PackName: packName,
QueryStats: stats,
},
)
}
return nil
},
},
}
func ingestSoftware(logger log.Logger, host *kolide.Host, rows []map[string]string) error {

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
@ -971,6 +972,177 @@ func TestDetailQueryNetworkInterfaces(t *testing.T) {
assert.Equal(t, "00:00:00:00:00:00", host.PrimaryMac)
}
func TestDetailQueryScheduledQueryStats(t *testing.T) {
host := kolide.Host{}
ingest := detailQueries["scheduled_query_stats"].IngestFunc
assert.NoError(t, ingest(log.NewNopLogger(), &host, nil))
assert.Len(t, host.PackStats, 0)
resJSON := `
[
{
"average_memory":"33",
"delimiter":"/",
"denylisted":"0",
"executions":"1",
"interval":"33",
"last_executed":"1620325191",
"name":"pack/pack-2/time",
"output_size":"",
"query":"SELECT * FROM time",
"system_time":"100",
"user_time":"60",
"wall_time":"180"
},
{
"average_memory":"8000",
"delimiter":"/",
"denylisted":"0",
"executions":"164",
"interval":"30",
"last_executed":"1620325191",
"name":"pack/test/osquery info",
"output_size":"1337",
"query":"SELECT * FROM osquery_info",
"system_time":"150",
"user_time":"180",
"wall_time":"0"
},
{
"average_memory":"50400",
"delimiter":"/",
"denylisted":"1",
"executions":"188",
"interval":"30",
"last_executed":"1620325203",
"name":"pack/test/processes?",
"output_size":"",
"query":"SELECT * FROM processes",
"system_time":"140",
"user_time":"190",
"wall_time":"1"
},
{
"average_memory":"0",
"delimiter":"/",
"denylisted":"0",
"executions":"1",
"interval":"3600",
"last_executed":"1620323381",
"name":"pack/test/processes?-1",
"output_size":"",
"query":"SELECT * FROM processes",
"system_time":"0",
"user_time":"0",
"wall_time":"0"
},
{
"average_memory":"0",
"delimiter":"/",
"denylisted":"0",
"executions":"105",
"interval":"47",
"last_executed":"1620325190",
"name":"pack/test/time",
"output_size":"",
"query":"SELECT * FROM time",
"system_time":"70",
"user_time":"50",
"wall_time":"1"
}
]
`
var rows []map[string]string
require.NoError(t, json.Unmarshal([]byte(resJSON), &rows))
assert.NoError(t, ingest(log.NewNopLogger(), &host, rows))
assert.Len(t, host.PackStats, 2)
sort.Slice(host.PackStats, func(i, j int) bool {
return host.PackStats[i].PackName < host.PackStats[j].PackName
})
assert.Equal(t, host.PackStats[0].PackName, "pack-2")
assert.ElementsMatch(t, host.PackStats[0].QueryStats,
[]kolide.ScheduledQueryStats{
{
ScheduledQueryName: "time",
PackName: "pack-2",
AverageMemory: 33,
Denylisted: false,
Executions: 1,
Interval: 33,
LastExecuted: time.Unix(1620325191, 0).UTC(),
OutputSize: 0,
SystemTime: 100,
UserTime: 60,
WallTime: 180,
},
},
)
assert.Equal(t, host.PackStats[1].PackName, "test")
assert.ElementsMatch(t, host.PackStats[1].QueryStats,
[]kolide.ScheduledQueryStats{
{
ScheduledQueryName: "osquery info",
PackName: "test",
AverageMemory: 8000,
Denylisted: false,
Executions: 164,
Interval: 30,
LastExecuted: time.Unix(1620325191, 0).UTC(),
OutputSize: 1337,
SystemTime: 150,
UserTime: 180,
WallTime: 0,
},
{
ScheduledQueryName: "processes?",
PackName: "test",
AverageMemory: 50400,
Denylisted: true,
Executions: 188,
Interval: 30,
LastExecuted: time.Unix(1620325203, 0).UTC(),
OutputSize: 0,
SystemTime: 140,
UserTime: 190,
WallTime: 1,
},
{
ScheduledQueryName: "processes?-1",
PackName: "test",
AverageMemory: 0,
Denylisted: false,
Executions: 1,
Interval: 3600,
LastExecuted: time.Unix(1620323381, 0).UTC(),
OutputSize: 0,
SystemTime: 0,
UserTime: 0,
WallTime: 0,
},
{
ScheduledQueryName: "time",
PackName: "test",
AverageMemory: 0,
Denylisted: false,
Executions: 105,
Interval: 47,
LastExecuted: time.Unix(1620325190, 0).UTC(),
OutputSize: 0,
SystemTime: 70,
UserTime: 50,
WallTime: 1,
},
},
)
assert.NoError(t, ingest(log.NewNopLogger(), &host, nil))
assert.Len(t, host.PackStats, 0)
}
func TestNewDistributedQueryCampaign(t *testing.T) {
ds := &mock.Store{
AppConfigStore: mock.AppConfigStore{

View File

@ -9,10 +9,14 @@ import (
)
func NewQuery(t *testing.T, ds kolide.Datastore, name, q string, authorID uint, saved bool) *kolide.Query {
authorPtr := &authorID
if authorID == 0 {
authorPtr = nil
}
query, err := ds.NewQuery(&kolide.Query{
Name: name,
Query: q,
AuthorID: &authorID,
AuthorID: authorPtr,
Saved: saved,
})
require.Nil(t, err)
@ -122,8 +126,9 @@ func NewUser(t *testing.T, ds kolide.Datastore, name, username, email string, ad
return u
}
func NewScheduledQuery(t *testing.T, ds kolide.Datastore, pid, qid, interval uint, snapshot, removed bool) *kolide.ScheduledQuery {
func NewScheduledQuery(t *testing.T, ds kolide.Datastore, pid, qid, interval uint, snapshot, removed bool, name string) *kolide.ScheduledQuery {
sq, err := ds.NewScheduledQuery(&kolide.ScheduledQuery{
Name: name,
PackID: pid,
QueryID: qid,
Interval: interval,