mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 17:05:18 +00:00
Push distributed query errors over results websocket (#878)
As of recently, osquery will report when a distributed query fails. We now expose errors over the results websocket. When a query errored on the host, the `error` key in the result will be non-null. Note that osquery currently doesn't provide any details so the error string will always be "failed". I anticipate that we will fix this and the string is included for future-proofing. Successful result: ``` { "type": "result", "data": { "distributed_query_execution_id": 15, "host": { ... omitted ... }, "rows": [ { "hour": "1" } ], "error": null } } ``` Failed result: ``` { "type": "result", "data": { "distributed_query_execution_id": 14, "host": { ... omitted ... }, "rows": [ ], "error": "failed" } } ```
This commit is contained in:
parent
2371f58705
commit
971eca9b2b
@ -103,6 +103,11 @@ type DistributedQueryResult struct {
|
||||
DistributedQueryCampaignID uint `json:"distributed_query_execution_id"`
|
||||
Host Host `json:"host"`
|
||||
Rows []map[string]string `json:"rows"`
|
||||
// osquery currently doesn't return any helpful error information,
|
||||
// but we use string here instead of bool for future-proofing. Note also
|
||||
// that we can't use the error interface here because something
|
||||
// implementing that interface may not (un)marshal properly
|
||||
Error *string `json:"error"`
|
||||
}
|
||||
|
||||
// DistributedQueryExecution is the metadata associated with a distributed
|
||||
|
@ -7,7 +7,7 @@ type OsqueryService interface {
|
||||
AuthenticateHost(ctx context.Context, nodeKey string) (host *Host, err error)
|
||||
GetClientConfig(ctx context.Context) (config *OsqueryConfig, err error)
|
||||
GetDistributedQueries(ctx context.Context) (queries map[string]string, err error)
|
||||
SubmitDistributedQueryResults(ctx context.Context, results OsqueryDistributedQueryResults) (err error)
|
||||
SubmitDistributedQueryResults(ctx context.Context, results OsqueryDistributedQueryResults, statuses map[string]string) (err error)
|
||||
SubmitStatusLogs(ctx context.Context, logs []OsqueryStatusLog) (err error)
|
||||
SubmitResultLogs(ctx context.Context, logs []OsqueryResultLog) (err error)
|
||||
}
|
||||
|
@ -90,8 +90,9 @@ func makeGetDistributedQueriesEndpoint(svc kolide.Service) endpoint.Endpoint {
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
type submitDistributedQueryResultsRequest struct {
|
||||
NodeKey string `json:"node_key"`
|
||||
Results kolide.OsqueryDistributedQueryResults `json:"queries"`
|
||||
NodeKey string `json:"node_key"`
|
||||
Results kolide.OsqueryDistributedQueryResults `json:"queries"`
|
||||
Statuses map[string]string `json:"statuses"`
|
||||
}
|
||||
|
||||
type submitDistributedQueryResultsResponse struct {
|
||||
@ -103,7 +104,7 @@ func (r submitDistributedQueryResultsResponse) error() error { return r.Err }
|
||||
func makeSubmitDistributedQueryResultsEndpoint(svc kolide.Service) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
req := request.(submitDistributedQueryResultsRequest)
|
||||
err := svc.SubmitDistributedQueryResults(ctx, req.Results)
|
||||
err := svc.SubmitDistributedQueryResults(ctx, req.Results, req.Statuses)
|
||||
if err != nil {
|
||||
return submitDistributedQueryResultsResponse{Err: err}, nil
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ func (mw loggingMiddleware) GetDistributedQueries(ctx context.Context) (map[stri
|
||||
return queries, err
|
||||
}
|
||||
|
||||
func (mw loggingMiddleware) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults) error {
|
||||
func (mw loggingMiddleware) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults, statuses map[string]string) error {
|
||||
var (
|
||||
err error
|
||||
)
|
||||
@ -92,7 +92,7 @@ func (mw loggingMiddleware) SubmitDistributedQueryResults(ctx context.Context, r
|
||||
)
|
||||
}(time.Now())
|
||||
|
||||
err = mw.Service.SubmitDistributedQueryResults(ctx, results)
|
||||
err = mw.Service.SubmitDistributedQueryResults(ctx, results, statuses)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -3,13 +3,11 @@ package service
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
hostctx "github.com/kolide/kolide-ose/server/contexts/host"
|
||||
"github.com/kolide/kolide-ose/server/errors"
|
||||
"github.com/kolide/kolide-ose/server/kolide"
|
||||
"github.com/kolide/kolide-ose/server/pubsub"
|
||||
"golang.org/x/net/context"
|
||||
@ -132,7 +130,7 @@ func (svc service) SubmitStatusLogs(ctx context.Context, logs []kolide.OsquerySt
|
||||
for _, log := range logs {
|
||||
err := json.NewEncoder(svc.osqueryStatusLogWriter).Encode(log)
|
||||
if err != nil {
|
||||
return errors.NewFromError(err, http.StatusInternalServerError, "error writing status log")
|
||||
return osqueryError{message: "error writing status log: " + err.Error()}
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,7 +151,7 @@ func (svc service) SubmitResultLogs(ctx context.Context, logs []kolide.OsqueryRe
|
||||
for _, log := range logs {
|
||||
err := json.NewEncoder(svc.osqueryResultLogWriter).Encode(log)
|
||||
if err != nil {
|
||||
return errors.NewFromError(err, http.StatusInternalServerError, "error writing result log")
|
||||
return osqueryError{message: "error writing result log: " + err.Error()}
|
||||
}
|
||||
}
|
||||
|
||||
@ -419,7 +417,7 @@ func (svc service) ingestLabelQuery(host kolide.Host, query string, rows []map[s
|
||||
|
||||
// ingestDistributedQuery takes the results of a distributed query and modifies the
|
||||
// provided kolide.Host appropriately.
|
||||
func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []map[string]string) error {
|
||||
func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []map[string]string, failed bool) error {
|
||||
trimmedQuery := strings.TrimPrefix(name, hostDistributedQueryPrefix)
|
||||
|
||||
campaignID, err := strconv.Atoi(trimmedQuery)
|
||||
@ -433,6 +431,12 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
|
||||
Host: host,
|
||||
Rows: rows,
|
||||
}
|
||||
if failed {
|
||||
// osquery errors are not currently helpful, but we should fix
|
||||
// them to be better in the future
|
||||
errString := "failed"
|
||||
res.Error = &errString
|
||||
}
|
||||
|
||||
err = svc.resultStore.WriteResult(res)
|
||||
if err != nil {
|
||||
@ -456,10 +460,14 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
|
||||
}
|
||||
|
||||
// Record execution of the query
|
||||
status := kolide.ExecutionSucceeded
|
||||
if failed {
|
||||
status = kolide.ExecutionFailed
|
||||
}
|
||||
exec := &kolide.DistributedQueryExecution{
|
||||
HostID: host.ID,
|
||||
DistributedQueryCampaignID: uint(campaignID),
|
||||
Status: kolide.ExecutionSucceeded,
|
||||
Status: status,
|
||||
}
|
||||
|
||||
_, err = svc.ds.NewDistributedQueryExecution(exec)
|
||||
@ -470,7 +478,7 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc service) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults) error {
|
||||
func (svc service) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults, statuses map[string]string) error {
|
||||
host, ok := hostctx.FromContext(ctx)
|
||||
|
||||
if !ok {
|
||||
@ -490,7 +498,11 @@ func (svc service) SubmitDistributedQueryResults(ctx context.Context, results ko
|
||||
case strings.HasPrefix(query, hostLabelQueryPrefix):
|
||||
err = svc.ingestLabelQuery(host, query, rows, labelResults)
|
||||
case strings.HasPrefix(query, hostDistributedQueryPrefix):
|
||||
err = svc.ingestDistributedQuery(host, query, rows)
|
||||
// osquery docs say any nonzero (string) value for
|
||||
// status indicates a query error
|
||||
status, ok := statuses[query]
|
||||
failed := ok && status != "0"
|
||||
err = svc.ingestDistributedQuery(host, query, rows, failed)
|
||||
default:
|
||||
err = osqueryError{message: "unknown query prefix: " + query}
|
||||
}
|
||||
|
@ -324,6 +324,7 @@ func TestLabelQueries(t *testing.T) {
|
||||
map[string][]map[string]string{
|
||||
hostLabelQueryPrefix + "1": {{"col1": "val1"}},
|
||||
},
|
||||
map[string]string{},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
|
||||
@ -358,6 +359,7 @@ func TestLabelQueries(t *testing.T) {
|
||||
hostLabelQueryPrefix + "2": {{"col1": "val1"}},
|
||||
hostLabelQueryPrefix + "3": {},
|
||||
},
|
||||
map[string]string{},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
|
||||
@ -567,7 +569,7 @@ func TestDetailQueries(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
// Verify that results are ingested properly
|
||||
svc.SubmitDistributedQueryResults(ctx, results)
|
||||
svc.SubmitDistributedQueryResults(ctx, results, map[string]string{})
|
||||
|
||||
// Make sure the result saved to the datastore
|
||||
host, err = ds.AuthenticateHost(nodeKey)
|
||||
@ -715,7 +717,7 @@ func TestDistributedQueries(t *testing.T) {
|
||||
// this test.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
err = svc.SubmitDistributedQueryResults(ctx, results)
|
||||
err = svc.SubmitDistributedQueryResults(ctx, results, map[string]string{})
|
||||
require.Nil(t, err)
|
||||
|
||||
// Now the distributed query should be completed and not returned
|
||||
@ -775,7 +777,7 @@ func TestOrphanedQueryCampaign(t *testing.T) {
|
||||
|
||||
// Submit results
|
||||
ctx = hostctx.NewContext(context.Background(), *host)
|
||||
err = svc.SubmitDistributedQueryResults(ctx, results)
|
||||
err = svc.SubmitDistributedQueryResults(ctx, results, map[string]string{})
|
||||
require.Nil(t, err)
|
||||
|
||||
// The campaign should be set to completed because it is orphaned
|
||||
|
@ -48,8 +48,9 @@ func decodeSubmitDistributedQueryResultsRequest(ctx context.Context, r *http.Req
|
||||
// "node_key":"IGXCXknWQ1baTa8TZ6rF3kAPZ4\/aTsui"
|
||||
// }
|
||||
type distributedQueryResultsShim struct {
|
||||
NodeKey string `json:"node_key"`
|
||||
Results map[string]json.RawMessage `json:"queries"`
|
||||
NodeKey string `json:"node_key"`
|
||||
Results map[string]json.RawMessage `json:"queries"`
|
||||
Statuses map[string]string `json:"statuses"`
|
||||
}
|
||||
|
||||
var shim distributedQueryResultsShim
|
||||
@ -68,8 +69,9 @@ func decodeSubmitDistributedQueryResultsRequest(ctx context.Context, r *http.Req
|
||||
}
|
||||
|
||||
req := submitDistributedQueryResultsRequest{
|
||||
NodeKey: shim.NodeKey,
|
||||
Results: results,
|
||||
NodeKey: shim.NodeKey,
|
||||
Results: results,
|
||||
Statuses: shim.Statuses,
|
||||
}
|
||||
|
||||
return req, nil
|
||||
|
@ -2,11 +2,12 @@ package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"golang.org/x/net/context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/kolide/kolide-ose/server/kolide"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -95,6 +96,7 @@ func TestDecodeSubmitDistributedQueryResultsRequest(t *testing.T) {
|
||||
},
|
||||
"id3": {},
|
||||
}, params.Results)
|
||||
assert.Equal(t, map[string]string{"id1": "0", "id3": "1"}, params.Statuses)
|
||||
}).Methods("POST")
|
||||
|
||||
// Note we explicitly test the case that requires using the shim
|
||||
@ -111,7 +113,8 @@ func TestDecodeSubmitDistributedQueryResultsRequest(t *testing.T) {
|
||||
{"col3": "val5", "col4": "val6"}
|
||||
],
|
||||
"id3": ""
|
||||
}
|
||||
},
|
||||
"statuses": {"id1": "0", "id3": "1"}
|
||||
}`))
|
||||
|
||||
router.ServeHTTP(
|
||||
|
Loading…
Reference in New Issue
Block a user