2016-11-29 18:20:06 +00:00
|
|
|
package service
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/kolide/kolide-ose/server/contexts/viewer"
|
|
|
|
"github.com/kolide/kolide-ose/server/kolide"
|
|
|
|
"github.com/kolide/kolide-ose/server/websocket"
|
2016-12-01 21:21:27 +00:00
|
|
|
"github.com/pkg/errors"
|
2016-11-29 18:20:06 +00:00
|
|
|
"golang.org/x/net/context"
|
|
|
|
)
|
|
|
|
|
|
|
|
func (svc service) NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*kolide.DistributedQueryCampaign, error) {
|
|
|
|
vc, ok := viewer.FromContext(ctx)
|
|
|
|
if !ok {
|
|
|
|
return nil, errNoContext
|
|
|
|
}
|
|
|
|
|
2016-12-01 21:21:27 +00:00
|
|
|
query, err := svc.ds.NewQuery(&kolide.Query{
|
2016-12-07 20:22:31 +00:00
|
|
|
Name: fmt.Sprintf("distributed_%s_%d", vc.Username(), time.Now().Unix()),
|
|
|
|
Query: queryString,
|
|
|
|
Saved: false,
|
|
|
|
AuthorID: vc.UserID(),
|
2016-11-29 18:20:06 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
2016-12-01 21:21:27 +00:00
|
|
|
return nil, errors.Wrap(err, "new query")
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
campaign, err := svc.ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{
|
|
|
|
QueryID: query.ID,
|
2016-12-01 18:31:16 +00:00
|
|
|
Status: kolide.QueryWaiting,
|
2016-11-29 18:20:06 +00:00
|
|
|
UserID: vc.UserID(),
|
|
|
|
})
|
|
|
|
if err != nil {
|
2016-12-01 21:21:27 +00:00
|
|
|
return nil, errors.Wrap(err, "new campaign")
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Add host targets
|
|
|
|
for _, hid := range hosts {
|
|
|
|
_, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{
|
|
|
|
Type: kolide.TargetHost,
|
|
|
|
DistributedQueryCampaignID: campaign.ID,
|
|
|
|
TargetID: hid,
|
|
|
|
})
|
|
|
|
if err != nil {
|
2016-12-01 21:21:27 +00:00
|
|
|
return nil, errors.Wrap(err, "adding host target")
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add label targets
|
|
|
|
for _, lid := range labels {
|
|
|
|
_, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{
|
|
|
|
Type: kolide.TargetLabel,
|
|
|
|
DistributedQueryCampaignID: campaign.ID,
|
|
|
|
TargetID: lid,
|
|
|
|
})
|
|
|
|
if err != nil {
|
2016-12-01 21:21:27 +00:00
|
|
|
return nil, errors.Wrap(err, "adding label target")
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return campaign, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type targetTotals struct {
|
2016-12-06 17:37:22 +00:00
|
|
|
Total uint `json:"count"`
|
|
|
|
Online uint `json:"online"`
|
|
|
|
Offline uint `json:"offline"`
|
|
|
|
MissingInAction uint `json:"missing_in_action"`
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (svc service) StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) {
|
|
|
|
// Find the campaign and ensure it is active
|
|
|
|
campaign, err := svc.ds.DistributedQueryCampaign(campaignID)
|
|
|
|
if err != nil {
|
|
|
|
conn.WriteJSONError(fmt.Sprintf("cannot find campaign for ID %d", campaignID))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-12-01 18:31:16 +00:00
|
|
|
if campaign.Status != kolide.QueryWaiting {
|
2016-11-29 18:20:06 +00:00
|
|
|
conn.WriteJSONError(fmt.Sprintf("campaign %d not running", campaignID))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-12-01 18:31:16 +00:00
|
|
|
// Setting status to running will cause the query to be returned to the
|
|
|
|
// targets when they check in for their queries
|
|
|
|
campaign.Status = kolide.QueryRunning
|
|
|
|
if err := svc.ds.SaveDistributedQueryCampaign(campaign); err != nil {
|
|
|
|
conn.WriteJSONError("error saving campaign state")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Setting the status to completed stops the query from being sent to
|
|
|
|
// targets. If this fails, there is a background job that will clean up
|
|
|
|
// this campaign.
|
|
|
|
defer func() {
|
|
|
|
campaign.Status = kolide.QueryComplete
|
|
|
|
svc.ds.SaveDistributedQueryCampaign(campaign)
|
|
|
|
}()
|
|
|
|
|
2016-11-29 18:20:06 +00:00
|
|
|
// Open the channel from which we will receive incoming query results
|
|
|
|
// (probably from the redis pubsub implementation)
|
|
|
|
readChan, err := svc.resultStore.ReadChannel(context.Background(), *campaign)
|
|
|
|
if err != nil {
|
|
|
|
conn.WriteJSONError(fmt.Sprintf("cannot open read channel for campaign %d ", campaignID))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Loop, pushing updates to results and expected totals
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case res := <-readChan:
|
|
|
|
// Receive a result and push it over the websocket
|
|
|
|
switch res := res.(type) {
|
|
|
|
case kolide.DistributedQueryResult:
|
|
|
|
err = conn.WriteJSONMessage("result", res)
|
|
|
|
if err != nil {
|
|
|
|
fmt.Println("error writing to channel")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
case <-time.After(1 * time.Second):
|
|
|
|
// Update the expected hosts total
|
|
|
|
hostIDs, labelIDs, err := svc.ds.DistributedQueryCampaignTargetIDs(campaign.ID)
|
|
|
|
if err != nil {
|
|
|
|
if err = conn.WriteJSONError("error retrieving campaign targets"); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-06 17:37:22 +00:00
|
|
|
metrics, err := svc.CountHostsInTargets(context.Background(), hostIDs, labelIDs)
|
2016-11-29 18:20:06 +00:00
|
|
|
if err != nil {
|
|
|
|
if err = conn.WriteJSONError("error retrieving target counts"); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-06 17:37:22 +00:00
|
|
|
totals := targetTotals{
|
|
|
|
Total: metrics.TotalHosts,
|
|
|
|
Online: metrics.OnlineHosts,
|
|
|
|
Offline: metrics.OfflineHosts,
|
|
|
|
MissingInAction: metrics.MissingInActionHosts,
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:20:06 +00:00
|
|
|
if err = conn.WriteJSONMessage("totals", totals); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|