diff --git a/changes/log-errors-when-injesting-distributed-query-results b/changes/log-errors-when-injesting-distributed-query-results new file mode 100644 index 000000000..198bec0e3 --- /dev/null +++ b/changes/log-errors-when-injesting-distributed-query-results @@ -0,0 +1 @@ +* Only log errors and try to process all distributed query results from hosts instead of erroring out. diff --git a/server/contexts/logging/logging.go b/server/contexts/logging/logging.go index 1c3170f2f..7dc1c460e 100644 --- a/server/contexts/logging/logging.go +++ b/server/contexts/logging/logging.go @@ -35,9 +35,9 @@ func WithStartTime(ctx context.Context) context.Context { } // WithErr returns a context with logging.Err set as the error provided -func WithErr(ctx context.Context, err error) context.Context { +func WithErr(ctx context.Context, err ...error) context.Context { if logCtx, ok := FromContext(ctx); ok { - logCtx.Err = err + logCtx.Errs = append(logCtx.Errs, err...) } return ctx } @@ -68,7 +68,7 @@ func WithLevel(ctx context.Context, level func(kitlog.Logger) kitlog.Logger) con // LoggingContext contains the context information for logging the current request type LoggingContext struct { StartTime time.Time - Err error + Errs []error Extras []interface{} SkipUser bool ForceLevel func(kitlog.Logger) kitlog.Logger @@ -76,13 +76,9 @@ type LoggingContext struct { // Log logs the data within the context func (l *LoggingContext) Log(ctx context.Context, logger kitlog.Logger) { - if e, ok := l.Err.(fleet.ErrWithInternal); ok { - logger = kitlog.With(logger, "internal", e.Internal()) - } - if l.ForceLevel != nil { logger = l.ForceLevel(logger) - } else if l.Err != nil || len(l.Extras) > 0 { + } else if l.Errs != nil || len(l.Extras) > 0 { logger = level.Info(logger) } else { logger = level.Debug(logger) @@ -99,19 +95,37 @@ func (l *LoggingContext) Log(ctx context.Context, logger kitlog.Logger) { keyvals = append(keyvals, "user", loggedInUser) } - keyvals = append(keyvals, - "method", ctx.Value(kithttp.ContextKeyRequestMethod).(string), - "uri", ctx.Value(kithttp.ContextKeyRequestURI).(string), - "took", time.Since(l.StartTime), - ) - - if l.Err != nil { - keyvals = append(keyvals, "err", l.Err) + requestMethod, ok := ctx.Value(kithttp.ContextKeyRequestMethod).(string) + if !ok { + requestMethod = "" } + requestURI, ok := ctx.Value(kithttp.ContextKeyRequestURI).(string) + if !ok { + requestURI = "" + } + keyvals = append(keyvals, "method", requestMethod, "uri", requestURI, "took", time.Since(l.StartTime)) if len(l.Extras) > 0 { keyvals = append(keyvals, l.Extras...) } + if len(l.Errs) > 0 { + var errs []string + var internalErrs []string + for _, err := range l.Errs { + if e, ok := err.(fleet.ErrWithInternal); ok { + internalErrs = append(internalErrs, e.Internal()) + } else { + errs = append(errs, err.Error()) + } + } + if len(errs) > 0 { + keyvals = append(keyvals, "err", errs) + } + if len(internalErrs) > 0 { + keyvals = append(keyvals, "internal", internalErrs) + } + } + _ = logger.Log(keyvals...) } diff --git a/server/service/service_osquery.go b/server/service/service_osquery.go index 5de88b319..f35829daa 100644 --- a/server/service/service_osquery.go +++ b/server/service/service_osquery.go @@ -1114,13 +1114,15 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f return osqueryError{message: "internal error: missing host from request context"} } - // Check for label queries and if so, load host additional. If we don't do - // this, we will end up unintentionally dropping any existing host - // additional info. + // Check for host details queries and if so, load host additional. + // If we don't do this, we will end up unintentionally dropping + // any existing host additional info. for query := range results { - if strings.HasPrefix(query, hostLabelQueryPrefix) { + if strings.HasPrefix(query, hostDetailQueryPrefix) { fullHost, err := svc.ds.Host(host.ID) if err != nil { + // leave this error return here, we don't want to drop host additionals + // if we can't get a host, everything is lost return osqueryError{message: "internal error: load host additional: " + err.Error()} } host = *fullHost @@ -1147,14 +1149,14 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f // osquery docs say any nonzero (string) value for // status indicates a query error status, ok := statuses[query] - failed := (ok && status != fleet.StatusOK) + failed := ok && status != fleet.StatusOK err = svc.ingestDistributedQuery(host, query, rows, failed, messages[query]) default: err = osqueryError{message: "unknown query prefix: " + query} } if err != nil { - return osqueryError{message: "failed to ingest result: " + err.Error()} + logging.WithExtras(ctx, "ingestion-err", err) } } @@ -1163,7 +1165,7 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f host.LabelUpdatedAt = svc.clock.Now() err = svc.ds.RecordLabelQueryExecutions(&host, labelResults, svc.clock.Now()) if err != nil { - return osqueryError{message: "failed to save labels: " + err.Error()} + logging.WithErr(ctx, err) } } @@ -1172,16 +1174,17 @@ func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results f host.DetailUpdatedAt = svc.clock.Now() additionalJSON, err := json.Marshal(additionalResults) if err != nil { - return osqueryError{message: "failed to marshal additional: " + err.Error()} + logging.WithErr(ctx, err) + } else { + additional := json.RawMessage(additionalJSON) + host.Additional = &additional } - additional := json.RawMessage(additionalJSON) - host.Additional = &additional } if host.Modified { err = svc.ds.SaveHost(&host) if err != nil { - return osqueryError{message: "failed to update host details: " + err.Error()} + logging.WithErr(ctx, err) } } diff --git a/server/service/service_osquery_test.go b/server/service/service_osquery_test.go index 97d9617e8..592c780d0 100644 --- a/server/service/service_osquery_test.go +++ b/server/service/service_osquery_test.go @@ -1,6 +1,7 @@ package service import ( + "bytes" "context" "encoding/json" "errors" @@ -13,8 +14,10 @@ import ( "time" "github.com/WatchBeam/clock" + "github.com/fleetdm/fleet/v4/server/authz" "github.com/fleetdm/fleet/v4/server/config" hostctx "github.com/fleetdm/fleet/v4/server/contexts/host" + fleetLogging "github.com/fleetdm/fleet/v4/server/contexts/logging" "github.com/fleetdm/fleet/v4/server/contexts/viewer" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/live_query" @@ -23,6 +26,7 @@ import ( "github.com/fleetdm/fleet/v4/server/ptr" "github.com/fleetdm/fleet/v4/server/pubsub" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -641,6 +645,10 @@ func TestDetailQueriesWithEmptyStrings(t *testing.T) { return nil } + ds.HostFunc = func(id uint) (*fleet.Host, error) { + return &host, nil + } + // Verify that results are ingested properly svc.SubmitDistributedQueryResults(ctx, results, map[string]fleet.OsqueryStatus{}, map[string]string{}) @@ -824,6 +832,10 @@ func TestDetailQueries(t *testing.T) { return nil } + ds.HostFunc = func(id uint) (*fleet.Host, error) { + return &host, nil + } + // Verify that results are ingested properly svc.SubmitDistributedQueryResults(ctx, results, map[string]fleet.OsqueryStatus{}, map[string]string{}) @@ -1821,3 +1833,74 @@ func TestGetHostIdentifier(t *testing.T) { }) } } + +func TestDistributedQueriesLogsManyErrors(t *testing.T) { + buf := new(bytes.Buffer) + logger := log.NewJSONLogger(buf) + logger = level.NewFilter(logger, level.AllowDebug()) + ds := new(mock.Store) + svc := newTestService(ds, nil, nil) + + host := &fleet.Host{Platform: "darwin"} + + ds.SaveHostFunc = func(host *fleet.Host) error { + return authz.CheckMissingWithResponse(nil) + } + ds.RecordLabelQueryExecutionsFunc = func(host *fleet.Host, results map[uint]bool, t time.Time) error { + return errors.New("something went wrong") + } + + lCtx := &fleetLogging.LoggingContext{} + ctx := fleetLogging.NewContext(context.Background(), lCtx) + ctx = hostctx.NewContext(ctx, *host) + + err := svc.SubmitDistributedQueryResults( + ctx, + map[string][]map[string]string{ + hostLabelQueryPrefix + "1": {{"col1": "val1"}}, + }, + map[string]fleet.OsqueryStatus{}, + map[string]string{}, + ) + assert.Nil(t, err) + + lCtx.Log(ctx, logger) + + logs := buf.String() + parts := strings.Split(strings.TrimSpace(logs), "\n") + require.Len(t, parts, 1) + logData := make(map[string]json.RawMessage) + require.NoError(t, json.Unmarshal([]byte(parts[0]), &logData)) + assert.Equal(t, json.RawMessage(`["something went wrong"]`), logData["err"]) + assert.Equal(t, json.RawMessage(`["Missing authorization check"]`), logData["internal"]) +} + +func TestDistributedQueriesReloadsHostIfDetailsAreIn(t *testing.T) { + ds := new(mock.Store) + svc := newTestService(ds, nil, nil) + + host := &fleet.Host{ID: 42, Platform: "darwin"} + ip := "1.1.1.1" + + ds.SaveHostFunc = func(host *fleet.Host) error { + assert.Equal(t, ip, host.PrimaryIP) + return nil + } + ds.HostFunc = func(id uint) (*fleet.Host, error) { + require.Equal(t, uint(42), id) + return &fleet.Host{ID: 42, Platform: "darwin", PrimaryIP: ip}, nil + } + + ctx := hostctx.NewContext(context.Background(), *host) + + err := svc.SubmitDistributedQueryResults( + ctx, + map[string][]map[string]string{ + hostDetailQueryPrefix + "1": {{"col1": "val1"}}, + }, + map[string]fleet.OsqueryStatus{}, + map[string]string{}, + ) + assert.Nil(t, err) + assert.True(t, ds.HostFuncInvoked) +}