mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 00:45:19 +00:00
Add distributed discovery query support for detail queries, add orbit… (#4597)
* Add distributed discovery query support for detail queries, add orbit_info ingestion * Amend changes file
This commit is contained in:
parent
bb678b6b2e
commit
d2ba34c8fc
1
changes/issue-3914-fleet-desktop-orbit-table
Normal file
1
changes/issue-3914-fleet-desktop-orbit-table
Normal file
@ -0,0 +1 @@
|
||||
* Reduce osquery status log verbosity by only running detail queries when the relevant tables exist.
|
@ -28,7 +28,7 @@ func (o orbitInfoExtension) Columns() []table.ColumnDefinition {
|
||||
}
|
||||
|
||||
// GenerateFunc partially implements orbit_table.Extension.
|
||||
func (o orbitInfoExtension) GenerateFunc(ctx context.Context, _ table.QueryContext) ([]map[string]string, error) {
|
||||
func (o orbitInfoExtension) GenerateFunc(_ context.Context, _ table.QueryContext) ([]map[string]string, error) {
|
||||
v := version
|
||||
if v == "" {
|
||||
v = "unknown"
|
||||
|
@ -762,6 +762,16 @@ func (ds *Datastore) LoadHostByDeviceAuthToken(ctx context.Context, authToken st
|
||||
}
|
||||
}
|
||||
|
||||
// SetOrUpdateDeviceAuthToken inserts or updates the auth token for a host.
|
||||
func (ds *Datastore) SetOrUpdateDeviceAuthToken(ctx context.Context, hostID uint, authToken string) error {
|
||||
return ds.updateOrInsert(
|
||||
ctx,
|
||||
`UPDATE host_device_auth SET token=? WHERE host_id=?`,
|
||||
`INSERT INTO host_device_auth(token, host_id) VALUES (?,?)`,
|
||||
authToken, hostID,
|
||||
)
|
||||
}
|
||||
|
||||
func (ds *Datastore) MarkHostsSeen(ctx context.Context, hostIDs []uint, t time.Time) error {
|
||||
if len(hostIDs) == 0 {
|
||||
return nil
|
||||
|
@ -113,6 +113,7 @@ func TestHosts(t *testing.T) {
|
||||
{"UpdateOsqueryIntervals", testUpdateOsqueryIntervals},
|
||||
{"UpdateRefetchRequested", testUpdateRefetchRequested},
|
||||
{"LoadHostByDeviceAuthToken", testHostsLoadHostByDeviceAuthToken},
|
||||
{"SetOrUpdateDeviceAuthToken", testHostsSetOrUpdateDeviceAuthToken},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
@ -3706,7 +3707,7 @@ func testHostsLoadHostByDeviceAuthToken(t *testing.T, ds *Datastore) {
|
||||
require.NoError(t, err)
|
||||
|
||||
validToken := "abcd"
|
||||
_, err = ds.writer.ExecContext(context.Background(), `INSERT INTO host_device_auth (host_id, token) VALUES (?, ?)`, host.ID, validToken)
|
||||
err = ds.SetOrUpdateDeviceAuthToken(context.Background(), host.ID, validToken)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = ds.LoadHostByDeviceAuthToken(context.Background(), "nosuchtoken")
|
||||
@ -3717,3 +3718,64 @@ func testHostsLoadHostByDeviceAuthToken(t *testing.T, ds *Datastore) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, host.ID, h.ID)
|
||||
}
|
||||
|
||||
func testHostsSetOrUpdateDeviceAuthToken(t *testing.T, ds *Datastore) {
|
||||
host, err := ds.NewHost(context.Background(), &fleet.Host{
|
||||
DetailUpdatedAt: time.Now(),
|
||||
LabelUpdatedAt: time.Now(),
|
||||
PolicyUpdatedAt: time.Now(),
|
||||
SeenTime: time.Now(),
|
||||
NodeKey: "1",
|
||||
UUID: "1",
|
||||
OsqueryHostID: "1",
|
||||
Hostname: "foo.local",
|
||||
PrimaryIP: "192.168.1.1",
|
||||
PrimaryMac: "30-65-EC-6F-C4-58",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
host2, err := ds.NewHost(context.Background(), &fleet.Host{
|
||||
DetailUpdatedAt: time.Now(),
|
||||
LabelUpdatedAt: time.Now(),
|
||||
PolicyUpdatedAt: time.Now(),
|
||||
SeenTime: time.Now(),
|
||||
NodeKey: "2",
|
||||
UUID: "2",
|
||||
OsqueryHostID: "2",
|
||||
Hostname: "foo.local2",
|
||||
PrimaryIP: "192.168.1.2",
|
||||
PrimaryMac: "30-65-EC-6F-C4-59",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
token1 := "token1"
|
||||
err = ds.SetOrUpdateDeviceAuthToken(context.Background(), host.ID, token1)
|
||||
require.NoError(t, err)
|
||||
|
||||
token2 := "token2"
|
||||
err = ds.SetOrUpdateDeviceAuthToken(context.Background(), host2.ID, token2)
|
||||
require.NoError(t, err)
|
||||
|
||||
h, err := ds.LoadHostByDeviceAuthToken(context.Background(), token1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, host.ID, h.ID)
|
||||
|
||||
h, err = ds.LoadHostByDeviceAuthToken(context.Background(), token2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, host2.ID, h.ID)
|
||||
|
||||
token2Updated := "token2_updated"
|
||||
err = ds.SetOrUpdateDeviceAuthToken(context.Background(), host2.ID, token2Updated)
|
||||
require.NoError(t, err)
|
||||
|
||||
h, err = ds.LoadHostByDeviceAuthToken(context.Background(), token1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, host.ID, h.ID)
|
||||
|
||||
h, err = ds.LoadHostByDeviceAuthToken(context.Background(), token2Updated)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, host2.ID, h.ID)
|
||||
|
||||
h, err = ds.LoadHostByDeviceAuthToken(context.Background(), token2)
|
||||
require.Error(t, err)
|
||||
assert.ErrorIs(t, err, sql.ErrNoRows)
|
||||
}
|
||||
|
@ -205,6 +205,8 @@ type Datastore interface {
|
||||
// LoadHostByDeviceAuthToken loads the host identified by the device auth token.
|
||||
// If the token is invalid it returns a NotFoundError.
|
||||
LoadHostByDeviceAuthToken(ctx context.Context, authToken string) (*Host, error)
|
||||
// SetOrUpdateDeviceAuthToken inserts or updates the auth token for a host.
|
||||
SetOrUpdateDeviceAuthToken(ctx context.Context, hostID uint, authToken string) error
|
||||
|
||||
// ListPoliciesForHost lists the policies that a host will check and whether they are passing
|
||||
ListPoliciesForHost(ctx context.Context, host *Host) ([]*HostPolicy, error)
|
||||
|
@ -27,7 +27,7 @@ type OsqueryService interface {
|
||||
//
|
||||
// To enable the osquery "accelerated checkins" feature, a positive integer (number of seconds to activate for)
|
||||
// should be returned. Returning 0 for this will not activate the feature.
|
||||
GetDistributedQueries(ctx context.Context) (queries map[string]string, accelerate uint, err error)
|
||||
GetDistributedQueries(ctx context.Context) (queries map[string]string, discovery map[string]string, accelerate uint, err error)
|
||||
SubmitDistributedQueryResults(
|
||||
ctx context.Context,
|
||||
results OsqueryDistributedQueryResults,
|
||||
|
@ -73,13 +73,14 @@ func (svc *launcherWrapper) RequestQueries(ctx context.Context, nodeKey string)
|
||||
return nil, invalid, err
|
||||
}
|
||||
|
||||
queryMap, accelerate, err := svc.tls.GetDistributedQueries(newCtx)
|
||||
queryMap, discoveryMap, accelerate, err := svc.tls.GetDistributedQueries(newCtx)
|
||||
if err != nil {
|
||||
return nil, false, ctxerr.Wrap(ctx, err, "get queries for launcher")
|
||||
}
|
||||
|
||||
result := &distributed.GetQueriesResult{
|
||||
Queries: queryMap,
|
||||
Discovery: discoveryMap,
|
||||
AccelerateSeconds: int(accelerate),
|
||||
}
|
||||
|
||||
|
@ -136,10 +136,13 @@ func newTLSService(t *testing.T) *mock.TLSService {
|
||||
|
||||
GetDistributedQueriesFunc: func(
|
||||
ctx context.Context,
|
||||
) (queries map[string]string, accelerate uint, err error) {
|
||||
) (queries map[string]string, discovery map[string]string, accelerate uint, err error) {
|
||||
queries = map[string]string{
|
||||
"noop": `{"key": "value"}`,
|
||||
}
|
||||
discovery = map[string]string{
|
||||
"noop": `select 1`,
|
||||
}
|
||||
return
|
||||
},
|
||||
SubmitDistributedQueryResultsFunc: func(
|
||||
|
@ -172,6 +172,8 @@ type ListHostDeviceMappingFunc func(ctx context.Context, id uint) ([]*fleet.Host
|
||||
|
||||
type LoadHostByDeviceAuthTokenFunc func(ctx context.Context, authToken string) (*fleet.Host, error)
|
||||
|
||||
type SetOrUpdateDeviceAuthTokenFunc func(ctx context.Context, hostID uint, authToken string) error
|
||||
|
||||
type ListPoliciesForHostFunc func(ctx context.Context, host *fleet.Host) ([]*fleet.HostPolicy, error)
|
||||
|
||||
type GetMunkiVersionFunc func(ctx context.Context, hostID uint) (string, error)
|
||||
@ -625,6 +627,9 @@ type DataStore struct {
|
||||
LoadHostByDeviceAuthTokenFunc LoadHostByDeviceAuthTokenFunc
|
||||
LoadHostByDeviceAuthTokenFuncInvoked bool
|
||||
|
||||
SetOrUpdateDeviceAuthTokenFunc SetOrUpdateDeviceAuthTokenFunc
|
||||
SetOrUpdateDeviceAuthTokenFuncInvoked bool
|
||||
|
||||
ListPoliciesForHostFunc ListPoliciesForHostFunc
|
||||
ListPoliciesForHostFuncInvoked bool
|
||||
|
||||
@ -1344,6 +1349,11 @@ func (s *DataStore) LoadHostByDeviceAuthToken(ctx context.Context, authToken str
|
||||
return s.LoadHostByDeviceAuthTokenFunc(ctx, authToken)
|
||||
}
|
||||
|
||||
func (s *DataStore) SetOrUpdateDeviceAuthToken(ctx context.Context, hostID uint, authToken string) error {
|
||||
s.SetOrUpdateDeviceAuthTokenFuncInvoked = true
|
||||
return s.SetOrUpdateDeviceAuthTokenFunc(ctx, hostID, authToken)
|
||||
}
|
||||
|
||||
func (s *DataStore) ListPoliciesForHost(ctx context.Context, host *fleet.Host) ([]*fleet.HostPolicy, error) {
|
||||
s.ListPoliciesForHostFuncInvoked = true
|
||||
return s.ListPoliciesForHostFunc(ctx, host)
|
||||
|
@ -107,7 +107,7 @@ func authenticatedHost(svc fleet.Service, logger log.Logger, next endpoint.Endpo
|
||||
}
|
||||
|
||||
if debug {
|
||||
logJSON(hlogger, request, "response")
|
||||
logJSON(hlogger, resp, "response")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ type AuthenticateHostFunc func(ctx context.Context, nodeKey string) (host *fleet
|
||||
|
||||
type GetClientConfigFunc func(ctx context.Context) (config map[string]interface{}, err error)
|
||||
|
||||
type GetDistributedQueriesFunc func(ctx context.Context) (queries map[string]string, accelerate uint, err error)
|
||||
type GetDistributedQueriesFunc func(ctx context.Context) (queries map[string]string, discovery map[string]string, accelerate uint, err error)
|
||||
|
||||
type SubmitDistributedQueryResultsFunc func(ctx context.Context, results fleet.OsqueryDistributedQueryResults, statuses map[string]fleet.OsqueryStatus, messages map[string]string) (err error)
|
||||
|
||||
@ -63,7 +63,7 @@ func (s *TLSService) GetClientConfig(ctx context.Context) (config map[string]int
|
||||
return s.GetClientConfigFunc(ctx)
|
||||
}
|
||||
|
||||
func (s *TLSService) GetDistributedQueries(ctx context.Context) (queries map[string]string, accelerate uint, err error) {
|
||||
func (s *TLSService) GetDistributedQueries(ctx context.Context) (queries map[string]string, discovery map[string]string, accelerate uint, err error) {
|
||||
s.GetDistributedQueriesFuncInvoked = true
|
||||
return s.GetDistributedQueriesFunc(ctx)
|
||||
}
|
||||
|
@ -469,6 +469,7 @@ func (r *getDistributedQueriesRequest) hostNodeKey() string {
|
||||
|
||||
type getDistributedQueriesResponse struct {
|
||||
Queries map[string]string `json:"queries"`
|
||||
Discovery map[string]string `json:"discovery"`
|
||||
Accelerate uint `json:"accelerate,omitempty"`
|
||||
Err error `json:"error,omitempty"`
|
||||
}
|
||||
@ -476,35 +477,43 @@ type getDistributedQueriesResponse struct {
|
||||
func (r getDistributedQueriesResponse) error() error { return r.Err }
|
||||
|
||||
func getDistributedQueriesEndpoint(ctx context.Context, request interface{}, svc fleet.Service) (interface{}, error) {
|
||||
queries, accelerate, err := svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, accelerate, err := svc.GetDistributedQueries(ctx)
|
||||
if err != nil {
|
||||
return getDistributedQueriesResponse{Err: err}, nil
|
||||
}
|
||||
return getDistributedQueriesResponse{Queries: queries, Accelerate: accelerate}, nil
|
||||
return getDistributedQueriesResponse{
|
||||
Queries: queries,
|
||||
Discovery: discovery,
|
||||
Accelerate: accelerate,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (svc *Service) GetDistributedQueries(ctx context.Context) (map[string]string, uint, error) {
|
||||
func (svc *Service) GetDistributedQueries(ctx context.Context) (queries map[string]string, discovery map[string]string, accelerate uint, err error) {
|
||||
// skipauth: Authorization is currently for user endpoints only.
|
||||
svc.authz.SkipAuthorization(ctx)
|
||||
|
||||
host, ok := hostctx.FromContext(ctx)
|
||||
if !ok {
|
||||
return nil, 0, osqueryError{message: "internal error: missing host from request context"}
|
||||
return nil, nil, 0, osqueryError{message: "internal error: missing host from request context"}
|
||||
}
|
||||
|
||||
queries := make(map[string]string)
|
||||
queries = make(map[string]string)
|
||||
discovery = make(map[string]string)
|
||||
|
||||
detailQueries, err := svc.detailQueriesForHost(ctx, host)
|
||||
detailQueries, detailDiscovery, err := svc.detailQueriesForHost(ctx, host)
|
||||
if err != nil {
|
||||
return nil, 0, osqueryError{message: err.Error()}
|
||||
return nil, nil, 0, osqueryError{message: err.Error()}
|
||||
}
|
||||
for name, query := range detailQueries {
|
||||
queries[name] = query
|
||||
}
|
||||
for name, query := range detailDiscovery {
|
||||
discovery[name] = query
|
||||
}
|
||||
|
||||
labelQueries, err := svc.labelQueriesForHost(ctx, host)
|
||||
if err != nil {
|
||||
return nil, 0, osqueryError{message: err.Error()}
|
||||
return nil, nil, 0, osqueryError{message: err.Error()}
|
||||
}
|
||||
for name, query := range labelQueries {
|
||||
queries[hostLabelQueryPrefix+name] = query
|
||||
@ -523,13 +532,13 @@ func (svc *Service) GetDistributedQueries(ctx context.Context) (map[string]strin
|
||||
|
||||
policyQueries, err := svc.policyQueriesForHost(ctx, host)
|
||||
if err != nil {
|
||||
return nil, 0, osqueryError{message: err.Error()}
|
||||
return nil, nil, 0, osqueryError{message: err.Error()}
|
||||
}
|
||||
for name, query := range policyQueries {
|
||||
queries[hostPolicyQueryPrefix+name] = query
|
||||
}
|
||||
|
||||
accelerate := uint(0)
|
||||
accelerate = uint(0)
|
||||
if host.Hostname == "" || host.Platform == "" {
|
||||
// Assume this host is just enrolling, and accelerate checkins
|
||||
// (to allow for platform restricted labels to run quickly
|
||||
@ -537,44 +546,70 @@ func (svc *Service) GetDistributedQueries(ctx context.Context) (map[string]strin
|
||||
accelerate = 10
|
||||
}
|
||||
|
||||
return queries, accelerate, nil
|
||||
// The way osquery's distributed "discovery" queries work is:
|
||||
// If len(discovery) > 0, then only those queries that have a "discovery"
|
||||
// query and return more than one row are executed on the host.
|
||||
//
|
||||
// Thus, we set the alwaysTrueQuery for all queries, except for those where we set
|
||||
// an explicit discovery query (e.g. orbit_info, google_chrome_profiles).
|
||||
for name := range queries {
|
||||
discoveryQuery := discovery[name]
|
||||
if discoveryQuery == "" {
|
||||
discoveryQuery = alwaysTrueQuery
|
||||
}
|
||||
discovery[name] = discoveryQuery
|
||||
}
|
||||
|
||||
return queries, discovery, accelerate, nil
|
||||
}
|
||||
|
||||
const alwaysTrueQuery = "SELECT 1"
|
||||
|
||||
// detailQueriesForHost returns the map of detail+additional queries that should be executed by
|
||||
// osqueryd to fill in the host details.
|
||||
func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host) (map[string]string, error) {
|
||||
func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host) (queries map[string]string, discovery map[string]string, err error) {
|
||||
if !svc.shouldUpdate(host.DetailUpdatedAt, svc.config.Osquery.DetailUpdateInterval, host.ID) && !host.RefetchRequested {
|
||||
return nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
config, err := svc.ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "read app config")
|
||||
return nil, nil, ctxerr.Wrap(ctx, err, "read app config")
|
||||
}
|
||||
|
||||
queries := make(map[string]string)
|
||||
queries = make(map[string]string)
|
||||
discovery = make(map[string]string)
|
||||
|
||||
detailQueries := osquery_utils.GetDetailQueries(config, svc.config)
|
||||
for name, query := range detailQueries {
|
||||
if query.RunsForPlatform(host.Platform) {
|
||||
queries[hostDetailQueryPrefix+name] = query.Query
|
||||
queryName := hostDetailQueryPrefix + name
|
||||
queries[queryName] = query.Query
|
||||
discoveryQuery := query.Discovery
|
||||
if discoveryQuery == "" {
|
||||
discoveryQuery = alwaysTrueQuery
|
||||
}
|
||||
discovery[queryName] = discoveryQuery
|
||||
}
|
||||
}
|
||||
|
||||
if config.HostSettings.AdditionalQueries == nil {
|
||||
// No additional queries set
|
||||
return queries, nil
|
||||
return queries, discovery, nil
|
||||
}
|
||||
|
||||
var additionalQueries map[string]string
|
||||
if err := json.Unmarshal(*config.HostSettings.AdditionalQueries, &additionalQueries); err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "unmarshal additional queries")
|
||||
return nil, nil, ctxerr.Wrap(ctx, err, "unmarshal additional queries")
|
||||
}
|
||||
|
||||
for name, query := range additionalQueries {
|
||||
queries[hostAdditionalQueryPrefix+name] = query
|
||||
queryName := hostAdditionalQueryPrefix + name
|
||||
queries[queryName] = query
|
||||
discovery[queryName] = alwaysTrueQuery
|
||||
}
|
||||
|
||||
return queries, nil
|
||||
return queries, discovery, nil
|
||||
}
|
||||
|
||||
func (svc *Service) shouldUpdate(lastUpdated time.Time, interval time.Duration, hostID uint) bool {
|
||||
|
@ -404,6 +404,23 @@ func TestSubmitResultLogs(t *testing.T) {
|
||||
assert.Equal(t, results, testLogger.logs)
|
||||
}
|
||||
|
||||
func verifyDiscovery(t *testing.T, queries, discovery map[string]string) {
|
||||
assert.Equal(t, len(queries), len(discovery))
|
||||
// discoveryUsed holds the queries where we know use the distributed discovery feature.
|
||||
discoveryUsed := map[string]struct{}{
|
||||
hostDetailQueryPrefix + "google_chrome_profiles": {},
|
||||
hostDetailQueryPrefix + "orbit_info": {},
|
||||
}
|
||||
for name := range queries {
|
||||
require.NotEmpty(t, discovery[name])
|
||||
if _, ok := discoveryUsed[name]; ok {
|
||||
require.NotEqual(t, alwaysTrueQuery, discovery[name])
|
||||
} else {
|
||||
require.Equal(t, alwaysTrueQuery, discovery[name])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHostDetailQueries(t *testing.T) {
|
||||
ds := new(mock.Store)
|
||||
additional := json.RawMessage(`{"foobar": "select foo", "bim": "bam"}`)
|
||||
@ -439,23 +456,26 @@ func TestHostDetailQueries(t *testing.T) {
|
||||
jitterH: make(map[time.Duration]*jitterHashTable),
|
||||
}
|
||||
|
||||
queries, err := svc.detailQueriesForHost(context.Background(), &host)
|
||||
queries, discovery, err := svc.detailQueriesForHost(context.Background(), &host)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, queries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
|
||||
// With refetch requested detail queries should be returned
|
||||
host.RefetchRequested = true
|
||||
queries, err = svc.detailQueriesForHost(context.Background(), &host)
|
||||
queries, discovery, err = svc.detailQueriesForHost(context.Background(), &host)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, queries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
host.RefetchRequested = false
|
||||
|
||||
// Advance the time
|
||||
mockClock.AddTime(1*time.Hour + 1*time.Minute)
|
||||
|
||||
queries, err = svc.detailQueriesForHost(context.Background(), &host)
|
||||
queries, discovery, err = svc.detailQueriesForHost(context.Background(), &host)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+2)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
for name := range queries {
|
||||
assert.True(t,
|
||||
strings.HasPrefix(name, hostDetailQueryPrefix) || strings.HasPrefix(name, hostAdditionalQueryPrefix),
|
||||
@ -468,7 +488,7 @@ func TestHostDetailQueries(t *testing.T) {
|
||||
func TestGetDistributedQueriesMissingHost(t *testing.T) {
|
||||
svc := newTestService(t, &mock.Store{}, nil, nil)
|
||||
|
||||
_, _, err := svc.GetDistributedQueries(context.Background())
|
||||
_, _, _, err := svc.GetDistributedQueries(context.Background())
|
||||
require.NotNil(t, err)
|
||||
assert.Contains(t, err.Error(), "missing host")
|
||||
}
|
||||
@ -506,9 +526,10 @@ func TestLabelQueries(t *testing.T) {
|
||||
|
||||
// With a new host, we should get the detail queries (and accelerate
|
||||
// should be turned on so that we can quickly fill labels)
|
||||
queries, acc, err := svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err := svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.NotZero(t, acc)
|
||||
|
||||
// Simulate the detail queries being added.
|
||||
@ -516,9 +537,10 @@ func TestLabelQueries(t *testing.T) {
|
||||
host.Hostname = "zwass.local"
|
||||
ctx = hostctx.NewContext(ctx, host)
|
||||
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 0)
|
||||
require.Empty(t, queries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
|
||||
ds.LabelQueriesForHostFunc = func(ctx context.Context, host *fleet.Host) (map[string]string, error) {
|
||||
@ -530,9 +552,10 @@ func TestLabelQueries(t *testing.T) {
|
||||
}
|
||||
|
||||
// Now we should get the label queries
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 3)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
|
||||
var gotHost *fleet.Host
|
||||
@ -584,17 +607,19 @@ func TestLabelQueries(t *testing.T) {
|
||||
// We should get no labels now.
|
||||
host.LabelUpdatedAt = mockClock.Now()
|
||||
ctx = hostctx.NewContext(ctx, host)
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 0)
|
||||
require.Empty(t, queries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
|
||||
// With refetch requested details+label queries should be returned.
|
||||
host.RefetchRequested = true
|
||||
ctx = hostctx.NewContext(ctx, host)
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+3)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
|
||||
// Record a query execution
|
||||
@ -620,9 +645,10 @@ func TestLabelQueries(t *testing.T) {
|
||||
|
||||
// There shouldn't be any labels now.
|
||||
ctx = hostctx.NewContext(context.Background(), host)
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 0)
|
||||
require.Empty(t, queries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
}
|
||||
|
||||
@ -658,9 +684,10 @@ func TestDetailQueriesWithEmptyStrings(t *testing.T) {
|
||||
|
||||
// With a new host, we should get the detail queries (and accelerated
|
||||
// queries)
|
||||
queries, acc, err := svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err := svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries-3)
|
||||
require.Len(t, queries, expectedDetailQueries-2)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.NotZero(t, acc)
|
||||
|
||||
resultJSON := `
|
||||
@ -747,6 +774,16 @@ func TestDetailQueriesWithEmptyStrings(t *testing.T) {
|
||||
"name":"logger_tls_period",
|
||||
"value":""
|
||||
}
|
||||
],
|
||||
"fleet_detail_query_orbit_info": [
|
||||
{
|
||||
"name":"version",
|
||||
"value":"42"
|
||||
},
|
||||
{
|
||||
"name":"device_auth_token",
|
||||
"value":"foo"
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
@ -790,17 +827,19 @@ func TestDetailQueriesWithEmptyStrings(t *testing.T) {
|
||||
|
||||
// Now no detail queries should be required
|
||||
ctx = hostctx.NewContext(context.Background(), host)
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 0)
|
||||
require.Empty(t, queries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
|
||||
// Advance clock and queries should exist again
|
||||
mockClock.AddTime(1*time.Hour + 1*time.Minute)
|
||||
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
}
|
||||
|
||||
@ -837,6 +876,11 @@ func TestDetailQueries(t *testing.T) {
|
||||
require.Equal(t, "3.4.5", version)
|
||||
return nil
|
||||
}
|
||||
ds.SetOrUpdateDeviceAuthTokenFunc = func(ctx context.Context, hostID uint, authToken string) error {
|
||||
require.Equal(t, uint(1), hostID)
|
||||
require.Equal(t, "foo", authToken)
|
||||
return nil
|
||||
}
|
||||
ds.HostLiteFunc = func(ctx context.Context, id uint) (*fleet.Host, error) {
|
||||
if id != 1 {
|
||||
return nil, errors.New("not found")
|
||||
@ -846,9 +890,10 @@ func TestDetailQueries(t *testing.T) {
|
||||
|
||||
// With a new host, we should get the detail queries (and accelerated
|
||||
// queries)
|
||||
queries, acc, err := svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err := svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries-2)
|
||||
require.Len(t, queries, expectedDetailQueries-1)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.NotZero(t, acc)
|
||||
|
||||
resultJSON := `
|
||||
@ -985,6 +1030,12 @@ func TestDetailQueries(t *testing.T) {
|
||||
{
|
||||
"version": "3.4.5"
|
||||
}
|
||||
],
|
||||
"fleet_detail_query_orbit_info": [
|
||||
{
|
||||
"version": "42",
|
||||
"device_auth_token": "foo"
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
@ -1021,6 +1072,7 @@ func TestDetailQueries(t *testing.T) {
|
||||
|
||||
require.True(t, ds.SetOrUpdateMDMDataFuncInvoked)
|
||||
require.True(t, ds.SetOrUpdateMunkiVersionFuncInvoked)
|
||||
require.True(t, ds.SetOrUpdateDeviceAuthTokenFuncInvoked)
|
||||
|
||||
// osquery_info
|
||||
assert.Equal(t, "darwin", gotHost.Platform)
|
||||
@ -1085,17 +1137,19 @@ func TestDetailQueries(t *testing.T) {
|
||||
|
||||
// Now no detail queries should be required
|
||||
ctx = hostctx.NewContext(ctx, host)
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 0)
|
||||
require.Empty(t, queries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
|
||||
// Advance clock and queries should exist again
|
||||
mockClock.AddTime(1*time.Hour + 1*time.Minute)
|
||||
|
||||
queries, acc, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, acc, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+1)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
assert.Zero(t, acc)
|
||||
}
|
||||
|
||||
@ -1219,9 +1273,10 @@ func TestDistributedQueryResults(t *testing.T) {
|
||||
lq.On("QueryCompletedByHost", strconv.Itoa(int(campaign.ID)), host.ID).Return(nil)
|
||||
|
||||
// Now we should get the active distributed query
|
||||
queries, acc, err := svc.GetDistributedQueries(hostCtx)
|
||||
queries, discovery, acc, err := svc.GetDistributedQueries(hostCtx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries-2)
|
||||
require.Len(t, queries, expectedDetailQueries-1)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
queryKey := fmt.Sprintf("%s%d", hostDistributedQueryPrefix, campaign.ID)
|
||||
assert.Equal(t, "select * from time", queries[queryKey])
|
||||
assert.NotZero(t, acc)
|
||||
@ -2063,9 +2118,10 @@ func TestPolicyQueries(t *testing.T) {
|
||||
|
||||
ctx := hostctx.NewContext(context.Background(), host)
|
||||
|
||||
queries, _, err := svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err := svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+2)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
|
||||
checkPolicyResults := func(queries map[string]string) {
|
||||
hasPolicy1, hasPolicy2 := false, false
|
||||
@ -2118,17 +2174,19 @@ func TestPolicyQueries(t *testing.T) {
|
||||
|
||||
// After the first time we get policies and update the host, then there shouldn't be any policies.
|
||||
ctx = hostctx.NewContext(context.Background(), host)
|
||||
queries, _, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
noPolicyResults(queries)
|
||||
|
||||
// Let's move time forward, there should be policies now.
|
||||
mockClock.AddTime(2 * time.Hour)
|
||||
|
||||
queries, _, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+2)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
checkPolicyResults(queries)
|
||||
|
||||
// Record another query execution.
|
||||
@ -2153,17 +2211,19 @@ func TestPolicyQueries(t *testing.T) {
|
||||
|
||||
// There shouldn't be any policies now.
|
||||
ctx = hostctx.NewContext(context.Background(), host)
|
||||
queries, _, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
noPolicyResults(queries)
|
||||
|
||||
// With refetch requested policy queries should be returned.
|
||||
host.RefetchRequested = true
|
||||
ctx = hostctx.NewContext(context.Background(), host)
|
||||
queries, _, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+2)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
checkPolicyResults(queries)
|
||||
|
||||
// Record another query execution.
|
||||
@ -2190,9 +2250,10 @@ func TestPolicyQueries(t *testing.T) {
|
||||
|
||||
// There shouldn't be any policies now.
|
||||
ctx = hostctx.NewContext(context.Background(), host)
|
||||
queries, _, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
noPolicyResults(queries)
|
||||
}
|
||||
|
||||
@ -2254,9 +2315,10 @@ func TestPolicyWebhooks(t *testing.T) {
|
||||
}
|
||||
ctx := hostctx.NewContext(context.Background(), host)
|
||||
|
||||
queries, _, err := svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err := svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+3)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
|
||||
checkPolicyResults := func(queries map[string]string) {
|
||||
hasPolicy1, hasPolicy2, hasPolicy3 := false, false, false
|
||||
@ -2366,17 +2428,19 @@ func TestPolicyWebhooks(t *testing.T) {
|
||||
|
||||
// After the first time we get policies and update the host, then there shouldn't be any policies.
|
||||
ctx = hostctx.NewContext(context.Background(), host)
|
||||
queries, _, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
noPolicyResults(queries)
|
||||
|
||||
// Let's move time forward, there should be policies now.
|
||||
mockClock.AddTime(2 * time.Hour)
|
||||
|
||||
queries, _, err = svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err = svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries+3)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
checkPolicyResults(queries)
|
||||
|
||||
ds.FlippingPoliciesForHostFunc = func(ctx context.Context, hostID uint, incomingResults map[uint]*bool) (newFailing []uint, newPassing []uint, err error) {
|
||||
@ -2496,9 +2560,10 @@ func TestLiveQueriesFailing(t *testing.T) {
|
||||
|
||||
ctx := hostctx.NewContext(context.Background(), host)
|
||||
|
||||
queries, _, err := svc.GetDistributedQueries(ctx)
|
||||
queries, discovery, _, err := svc.GetDistributedQueries(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, expectedDetailQueries)
|
||||
verifyDiscovery(t, queries, discovery)
|
||||
|
||||
logs, err := ioutil.ReadAll(buf)
|
||||
require.NoError(t, err)
|
||||
|
@ -17,7 +17,11 @@ import (
|
||||
)
|
||||
|
||||
type DetailQuery struct {
|
||||
// Query is the SQL query string.
|
||||
Query string
|
||||
// Discovery is the SQL query that defines whether the query will run or the host or not.
|
||||
// If not set, Fleet makes sure the query will always run.
|
||||
Discovery string
|
||||
// Platforms is a list of platforms to run the query on. If this value is
|
||||
// empty, run on all platforms.
|
||||
Platforms []string
|
||||
@ -305,11 +309,18 @@ FROM logical_drives WHERE file_system = 'NTFS' LIMIT 1;`,
|
||||
"google_chrome_profiles": {
|
||||
Query: `SELECT email FROM google_chrome_profiles WHERE NOT ephemeral`,
|
||||
DirectIngestFunc: directIngestChromeProfiles,
|
||||
// Technically this does work on Windows and Linux, but so far no one is
|
||||
// deploying the extension to those platforms and it's causing log spam
|
||||
// for customers. See https://github.com/fleetdm/fleet/issues/4123
|
||||
Platforms: []string{"darwin"},
|
||||
Discovery: discoveryTable("google_chrome_profiles"),
|
||||
},
|
||||
"orbit_info": {
|
||||
Query: `SELECT * FROM orbit_info`,
|
||||
DirectIngestFunc: directIngestOrbitInfo,
|
||||
Discovery: discoveryTable("orbit_info"),
|
||||
},
|
||||
}
|
||||
|
||||
// discoveryTable returns a query to determine whether a table exists or not.
|
||||
func discoveryTable(tableName string) string {
|
||||
return fmt.Sprintf("SELECT 1 FROM osquery_registry WHERE active = true AND registry = 'table' AND name = '%s';", tableName)
|
||||
}
|
||||
|
||||
var softwareMacOS = DetailQuery{
|
||||
@ -568,6 +579,20 @@ func directIngestChromeProfiles(ctx context.Context, logger log.Logger, host *fl
|
||||
return ds.ReplaceHostDeviceMapping(ctx, host.ID, mapping)
|
||||
}
|
||||
|
||||
func directIngestOrbitInfo(ctx context.Context, logger log.Logger, host *fleet.Host, ds fleet.Datastore, rows []map[string]string, failed bool) error {
|
||||
if len(rows) != 1 {
|
||||
return ctxerr.Errorf(ctx, "invalid number of orbit_info rows: %d", len(rows))
|
||||
}
|
||||
deviceAuthToken := rows[0]["device_auth_token"]
|
||||
if deviceAuthToken == "" {
|
||||
return ctxerr.New(ctx, "empty orbit_info.device_auth_token")
|
||||
}
|
||||
if err := ds.SetOrUpdateDeviceAuthToken(ctx, host.ID, deviceAuthToken); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "set or update device_auth_token")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func directIngestScheduledQueryStats(ctx context.Context, logger log.Logger, host *fleet.Host, ds fleet.Datastore, rows []map[string]string, failed bool) error {
|
||||
if failed {
|
||||
level.Error(logger).Log("op", "directIngestScheduledQueryStats", "err", "failed")
|
||||
|
@ -290,7 +290,7 @@ func sortedKeysCompare(t *testing.T, m map[string]DetailQuery, expectedKeys []st
|
||||
|
||||
func TestGetDetailQueries(t *testing.T) {
|
||||
queriesNoConfig := GetDetailQueries(nil, config.FleetConfig{})
|
||||
require.Len(t, queriesNoConfig, 11)
|
||||
require.Len(t, queriesNoConfig, 12)
|
||||
baseQueries := []string{
|
||||
"network_interface",
|
||||
"os_version",
|
||||
@ -303,15 +303,16 @@ func TestGetDetailQueries(t *testing.T) {
|
||||
"mdm",
|
||||
"munki_info",
|
||||
"google_chrome_profiles",
|
||||
"orbit_info",
|
||||
}
|
||||
sortedKeysCompare(t, queriesNoConfig, baseQueries)
|
||||
|
||||
queriesWithUsers := GetDetailQueries(&fleet.AppConfig{HostSettings: fleet.HostSettings{EnableHostUsers: true}}, config.FleetConfig{App: config.AppConfig{EnableScheduledQueryStats: true}})
|
||||
require.Len(t, queriesWithUsers, 13)
|
||||
require.Len(t, queriesWithUsers, 14)
|
||||
sortedKeysCompare(t, queriesWithUsers, append(baseQueries, "users", "scheduled_query_stats"))
|
||||
|
||||
queriesWithUsersAndSoftware := GetDetailQueries(&fleet.AppConfig{HostSettings: fleet.HostSettings{EnableHostUsers: true, EnableSoftwareInventory: true}}, config.FleetConfig{App: config.AppConfig{EnableScheduledQueryStats: true}})
|
||||
require.Len(t, queriesWithUsersAndSoftware, 16)
|
||||
require.Len(t, queriesWithUsersAndSoftware, 17)
|
||||
sortedKeysCompare(t, queriesWithUsersAndSoftware,
|
||||
append(baseQueries, "users", "software_macos", "software_linux", "software_windows", "scheduled_query_stats"))
|
||||
}
|
||||
@ -394,3 +395,23 @@ func TestDirectIngestMDM(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.True(t, ds.SetOrUpdateMDMDataFuncInvoked)
|
||||
}
|
||||
|
||||
func TestDirectIngestOrbitInfo(t *testing.T) {
|
||||
ds := new(mock.Store)
|
||||
ds.SetOrUpdateDeviceAuthTokenFunc = func(ctx context.Context, hostID uint, authToken string) error {
|
||||
require.Equal(t, hostID, uint(1))
|
||||
require.Equal(t, authToken, "foo")
|
||||
return nil
|
||||
}
|
||||
|
||||
host := fleet.Host{
|
||||
ID: 1,
|
||||
}
|
||||
|
||||
err := directIngestOrbitInfo(context.Background(), log.NewNopLogger(), &host, ds, []map[string]string{{
|
||||
"version": "42",
|
||||
"device_auth_token": "foo",
|
||||
}}, true)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ds.SetOrUpdateDeviceAuthTokenFuncInvoked)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user