Only log distributed queries ingestion errors (except for one case) (#1535)

* Don't return errors in distributed query ingestion, just log them

* Allow for multiple errors in the logging context

* Update check when loading host

* Log multiple errors and add tests for other changes

* Add missing host func

* Add another missing host func

* Add changes file
This commit is contained in:
Tomas Touceda 2021-08-04 10:11:51 -03:00 committed by GitHub
parent df47fcb808
commit 52ae04ecf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 128 additions and 27 deletions

View File

@ -0,0 +1 @@
* Only log errors and try to process all distributed query results from hosts instead of erroring out.

View File

@ -35,9 +35,9 @@ func WithStartTime(ctx context.Context) context.Context {
}
// WithErr returns a context with logging.Err set as the error provided
func WithErr(ctx context.Context, err error) context.Context {
func WithErr(ctx context.Context, err ...error) context.Context {
if logCtx, ok := FromContext(ctx); ok {
logCtx.Err = err
logCtx.Errs = append(logCtx.Errs, err...)
}
return ctx
}
@ -68,7 +68,7 @@ func WithLevel(ctx context.Context, level func(kitlog.Logger) kitlog.Logger) con
// LoggingContext contains the context information for logging the current request
type LoggingContext struct {
StartTime time.Time
Err error
Errs []error
Extras []interface{}
SkipUser bool
ForceLevel func(kitlog.Logger) kitlog.Logger
@ -76,13 +76,9 @@ type LoggingContext struct {
// Log logs the data within the context
func (l *LoggingContext) Log(ctx context.Context, logger kitlog.Logger) {
if e, ok := l.Err.(fleet.ErrWithInternal); ok {
logger = kitlog.With(logger, "internal", e.Internal())
}
if l.ForceLevel != nil {
logger = l.ForceLevel(logger)
} else if l.Err != nil || len(l.Extras) > 0 {
} else if l.Errs != nil || len(l.Extras) > 0 {
logger = level.Info(logger)
} else {
logger = level.Debug(logger)
@ -99,19 +95,37 @@ func (l *LoggingContext) Log(ctx context.Context, logger kitlog.Logger) {
keyvals = append(keyvals, "user", loggedInUser)
}
keyvals = append(keyvals,
"method", ctx.Value(kithttp.ContextKeyRequestMethod).(string),
"uri", ctx.Value(kithttp.ContextKeyRequestURI).(string),
"took", time.Since(l.StartTime),
)
if l.Err != nil {
keyvals = append(keyvals, "err", l.Err)
requestMethod, ok := ctx.Value(kithttp.ContextKeyRequestMethod).(string)
if !ok {
requestMethod = ""
}
requestURI, ok := ctx.Value(kithttp.ContextKeyRequestURI).(string)
if !ok {
requestURI = ""
}
keyvals = append(keyvals, "method", requestMethod, "uri", requestURI, "took", time.Since(l.StartTime))
if len(l.Extras) > 0 {
keyvals = append(keyvals, l.Extras...)
}
if len(l.Errs) > 0 {
var errs []string
var internalErrs []string
for _, err := range l.Errs {
if e, ok := err.(fleet.ErrWithInternal); ok {
internalErrs = append(internalErrs, e.Internal())
} else {
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
keyvals = append(keyvals, "err", errs)
}
if len(internalErrs) > 0 {
keyvals = append(keyvals, "internal", internalErrs)
}
}
_ = logger.Log(keyvals...)
}

View File

@ -1114,13 +1114,15 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f
return osqueryError{message: "internal error: missing host from request context"}
}
// Check for label queries and if so, load host additional. If we don't do
// this, we will end up unintentionally dropping any existing host
// additional info.
// Check for host details queries and if so, load host additional.
// If we don't do this, we will end up unintentionally dropping
// any existing host additional info.
for query := range results {
if strings.HasPrefix(query, hostLabelQueryPrefix) {
if strings.HasPrefix(query, hostDetailQueryPrefix) {
fullHost, err := svc.ds.Host(host.ID)
if err != nil {
// leave this error return here, we don't want to drop host additionals
// if we can't get a host, everything is lost
return osqueryError{message: "internal error: load host additional: " + err.Error()}
}
host = *fullHost
@ -1147,14 +1149,14 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f
// osquery docs say any nonzero (string) value for
// status indicates a query error
status, ok := statuses[query]
failed := (ok && status != fleet.StatusOK)
failed := ok && status != fleet.StatusOK
err = svc.ingestDistributedQuery(host, query, rows, failed, messages[query])
default:
err = osqueryError{message: "unknown query prefix: " + query}
}
if err != nil {
return osqueryError{message: "failed to ingest result: " + err.Error()}
logging.WithExtras(ctx, "ingestion-err", err)
}
}
@ -1163,7 +1165,7 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f
host.LabelUpdatedAt = svc.clock.Now()
err = svc.ds.RecordLabelQueryExecutions(&host, labelResults, svc.clock.Now())
if err != nil {
return osqueryError{message: "failed to save labels: " + err.Error()}
logging.WithErr(ctx, err)
}
}
@ -1172,16 +1174,17 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f
host.DetailUpdatedAt = svc.clock.Now()
additionalJSON, err := json.Marshal(additionalResults)
if err != nil {
return osqueryError{message: "failed to marshal additional: " + err.Error()}
logging.WithErr(ctx, err)
} else {
additional := json.RawMessage(additionalJSON)
host.Additional = &additional
}
additional := json.RawMessage(additionalJSON)
host.Additional = &additional
}
if host.Modified {
err = svc.ds.SaveHost(&host)
if err != nil {
return osqueryError{message: "failed to update host details: " + err.Error()}
logging.WithErr(ctx, err)
}
}

View File

@ -1,6 +1,7 @@
package service
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -13,8 +14,10 @@ import (
"time"
"github.com/WatchBeam/clock"
"github.com/fleetdm/fleet/v4/server/authz"
"github.com/fleetdm/fleet/v4/server/config"
hostctx "github.com/fleetdm/fleet/v4/server/contexts/host"
fleetLogging "github.com/fleetdm/fleet/v4/server/contexts/logging"
"github.com/fleetdm/fleet/v4/server/contexts/viewer"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/live_query"
@ -23,6 +26,7 @@ import (
"github.com/fleetdm/fleet/v4/server/ptr"
"github.com/fleetdm/fleet/v4/server/pubsub"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -641,6 +645,10 @@ func TestDetailQueriesWithEmptyStrings(t *testing.T) {
return nil
}
ds.HostFunc = func(id uint) (*fleet.Host, error) {
return &host, nil
}
// Verify that results are ingested properly
svc.SubmitDistributedQueryResults(ctx, results, map[string]fleet.OsqueryStatus{}, map[string]string{})
@ -824,6 +832,10 @@ func TestDetailQueries(t *testing.T) {
return nil
}
ds.HostFunc = func(id uint) (*fleet.Host, error) {
return &host, nil
}
// Verify that results are ingested properly
svc.SubmitDistributedQueryResults(ctx, results, map[string]fleet.OsqueryStatus{}, map[string]string{})
@ -1821,3 +1833,74 @@ func TestGetHostIdentifier(t *testing.T) {
})
}
}
func TestDistributedQueriesLogsManyErrors(t *testing.T) {
buf := new(bytes.Buffer)
logger := log.NewJSONLogger(buf)
logger = level.NewFilter(logger, level.AllowDebug())
ds := new(mock.Store)
svc := newTestService(ds, nil, nil)
host := &fleet.Host{Platform: "darwin"}
ds.SaveHostFunc = func(host *fleet.Host) error {
return authz.CheckMissingWithResponse(nil)
}
ds.RecordLabelQueryExecutionsFunc = func(host *fleet.Host, results map[uint]bool, t time.Time) error {
return errors.New("something went wrong")
}
lCtx := &fleetLogging.LoggingContext{}
ctx := fleetLogging.NewContext(context.Background(), lCtx)
ctx = hostctx.NewContext(ctx, *host)
err := svc.SubmitDistributedQueryResults(
ctx,
map[string][]map[string]string{
hostLabelQueryPrefix + "1": {{"col1": "val1"}},
},
map[string]fleet.OsqueryStatus{},
map[string]string{},
)
assert.Nil(t, err)
lCtx.Log(ctx, logger)
logs := buf.String()
parts := strings.Split(strings.TrimSpace(logs), "\n")
require.Len(t, parts, 1)
logData := make(map[string]json.RawMessage)
require.NoError(t, json.Unmarshal([]byte(parts[0]), &logData))
assert.Equal(t, json.RawMessage(`["something went wrong"]`), logData["err"])
assert.Equal(t, json.RawMessage(`["Missing authorization check"]`), logData["internal"])
}
func TestDistributedQueriesReloadsHostIfDetailsAreIn(t *testing.T) {
ds := new(mock.Store)
svc := newTestService(ds, nil, nil)
host := &fleet.Host{ID: 42, Platform: "darwin"}
ip := "1.1.1.1"
ds.SaveHostFunc = func(host *fleet.Host) error {
assert.Equal(t, ip, host.PrimaryIP)
return nil
}
ds.HostFunc = func(id uint) (*fleet.Host, error) {
require.Equal(t, uint(42), id)
return &fleet.Host{ID: 42, Platform: "darwin", PrimaryIP: ip}, nil
}
ctx := hostctx.NewContext(context.Background(), *host)
err := svc.SubmitDistributedQueryResults(
ctx,
map[string][]map[string]string{
hostDetailQueryPrefix + "1": {{"col1": "val1"}},
},
map[string]fleet.OsqueryStatus{},
map[string]string{},
)
assert.Nil(t, err)
assert.True(t, ds.HostFuncInvoked)
}