2016-11-29 18:20:06 +00:00
|
|
|
package service
|
|
|
|
|
|
|
|
import (
|
2017-03-15 15:55:30 +00:00
|
|
|
"context"
|
2016-11-29 18:20:06 +00:00
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
2021-06-26 04:46:51 +00:00
|
|
|
"github.com/fleetdm/fleet/v4/server/authz"
|
2021-11-15 14:11:38 +00:00
|
|
|
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
2021-08-02 22:06:27 +00:00
|
|
|
"github.com/fleetdm/fleet/v4/server/contexts/logging"
|
2021-06-26 04:46:51 +00:00
|
|
|
"github.com/fleetdm/fleet/v4/server/contexts/viewer"
|
|
|
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
|
|
|
"github.com/fleetdm/fleet/v4/server/websocket"
|
2021-06-03 23:24:15 +00:00
|
|
|
"github.com/go-kit/kit/log/level"
|
2020-12-15 02:13:34 +00:00
|
|
|
"github.com/igm/sockjs-go/v3/sockjs"
|
2016-11-29 18:20:06 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type targetTotals struct {
|
2016-12-06 17:37:22 +00:00
|
|
|
Total uint `json:"count"`
|
|
|
|
Online uint `json:"online"`
|
|
|
|
Offline uint `json:"offline"`
|
|
|
|
MissingInAction uint `json:"missing_in_action"`
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
|
2017-01-20 18:57:41 +00:00
|
|
|
const (
|
|
|
|
campaignStatusPending = "pending"
|
|
|
|
campaignStatusFinished = "finished"
|
|
|
|
)
|
|
|
|
|
|
|
|
type campaignStatus struct {
|
|
|
|
ExpectedResults uint `json:"expected_results"`
|
|
|
|
ActualResults uint `json:"actual_results"`
|
|
|
|
Status string `json:"status"`
|
|
|
|
}
|
|
|
|
|
2021-06-01 00:07:51 +00:00
|
|
|
func (svc Service) StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) {
|
2021-08-02 22:06:27 +00:00
|
|
|
logging.WithExtras(ctx, "campaign_id", campaignID)
|
|
|
|
|
2021-08-10 02:30:17 +00:00
|
|
|
// Explicitly set ObserverCanRun: true in this check because we check that the user trying to
|
|
|
|
// read results is the same user that initiated the query. This means the observer check already
|
|
|
|
// happened with the actual value for this query.
|
2022-02-15 18:41:48 +00:00
|
|
|
if err := svc.authz.Authorize(ctx, &fleet.TargetedQuery{Query: &fleet.Query{ObserverCanRun: true}}, fleet.ActionRun); err != nil {
|
2021-06-03 23:24:15 +00:00
|
|
|
level.Info(svc.logger).Log("err", "stream results authorization failed")
|
2021-06-16 17:55:41 +00:00
|
|
|
conn.WriteJSONError(authz.ForbiddenErrorMessage)
|
2021-06-03 23:24:15 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
vc, ok := viewer.FromContext(ctx)
|
|
|
|
if !ok {
|
|
|
|
level.Info(svc.logger).Log("err", "stream results viewer missing")
|
2021-06-16 17:55:41 +00:00
|
|
|
conn.WriteJSONError(authz.ForbiddenErrorMessage)
|
2021-06-03 23:24:15 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:20:06 +00:00
|
|
|
// Find the campaign and ensure it is active
|
2021-09-14 12:11:07 +00:00
|
|
|
campaign, err := svc.ds.DistributedQueryCampaign(ctx, campaignID)
|
2016-11-29 18:20:06 +00:00
|
|
|
if err != nil {
|
|
|
|
conn.WriteJSONError(fmt.Sprintf("cannot find campaign for ID %d", campaignID))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-06-03 23:24:15 +00:00
|
|
|
// Ensure the same user is opening to read results as initiated the query
|
|
|
|
if campaign.UserID != vc.User.ID {
|
|
|
|
level.Info(svc.logger).Log(
|
Add read replica testing helpers and fix non-sso login bug (#4908)
not set on the INSERT.
- OUT: Only sets the ID on the passed session and returns it. (`CreatedAt`, `AccessedAt`, are not set.)
New version:
```go
func (ds *Datastore) NewSession(ctx context.Context, userID uint, sessionKey string) (*fleet.Session, error) {
sqlStatement := `
INSERT INTO sessions (
user_id,
` + "`key`" + `
)
VALUES(?,?)
`
result, err := ds.writer.ExecContext(ctx, sqlStatement, userID, sessionKey)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "inserting session")
}
id, _ := result.LastInsertId() // cannot fail with the mysql driver
return ds.sessionByID(ctx, ds.writer, uint(id))
}
```
- IN: Define arguments that are truly used when creating a session.
- OUT: Load and return the fleet.Session struct with all values set (using the `ds.writer` to support read replicas correctly).
PS: The new `NewSession` version mimics what we already do with other entities, like policies (`Datastore.NewGlobalPolicy`).
2022-04-04 23:52:05 +00:00
|
|
|
"err", "campaign user ID does not match",
|
2021-06-03 23:24:15 +00:00
|
|
|
"expected", campaign.UserID,
|
|
|
|
"got", vc.User.ID,
|
|
|
|
)
|
2021-06-16 17:55:41 +00:00
|
|
|
conn.WriteJSONError(authz.ForbiddenErrorMessage)
|
2021-06-03 23:24:15 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-07-01 17:51:34 +00:00
|
|
|
// Open the channel from which we will receive incoming query results
|
|
|
|
// (probably from the redis pubsub implementation)
|
2021-10-26 14:33:31 +00:00
|
|
|
readChan, cancelFunc, err := svc.GetCampaignReader(ctx, campaign)
|
2020-07-01 17:51:34 +00:00
|
|
|
if err != nil {
|
2021-10-26 14:33:31 +00:00
|
|
|
conn.WriteJSONError("error getting campaign reader: " + err.Error())
|
2016-12-01 18:31:16 +00:00
|
|
|
return
|
|
|
|
}
|
2021-10-26 14:33:31 +00:00
|
|
|
defer cancelFunc()
|
2016-12-01 18:31:16 +00:00
|
|
|
|
|
|
|
// Setting the status to completed stops the query from being sent to
|
|
|
|
// targets. If this fails, there is a background job that will clean up
|
|
|
|
// this campaign.
|
2021-10-26 14:33:31 +00:00
|
|
|
defer svc.CompleteCampaign(ctx, campaign)
|
2016-12-01 18:31:16 +00:00
|
|
|
|
2017-01-20 18:57:41 +00:00
|
|
|
status := campaignStatus{
|
|
|
|
Status: campaignStatusPending,
|
|
|
|
}
|
2018-05-17 22:54:34 +00:00
|
|
|
lastStatus := status
|
|
|
|
lastTotals := targetTotals{}
|
2017-01-20 18:57:41 +00:00
|
|
|
|
2017-01-23 17:37:03 +00:00
|
|
|
// to improve performance of the frontend rendering the results table, we
|
2021-02-03 16:47:43 +00:00
|
|
|
// add the "host_hostname" field to every row and clean null rows.
|
2021-06-06 22:07:29 +00:00
|
|
|
mapHostnameRows := func(res *fleet.DistributedQueryResult) {
|
2021-02-03 16:47:43 +00:00
|
|
|
filteredRows := []map[string]string{}
|
|
|
|
for _, row := range res.Rows {
|
|
|
|
if row == nil {
|
|
|
|
continue
|
|
|
|
}
|
2021-06-24 00:32:19 +00:00
|
|
|
row["host_hostname"] = res.Host.Hostname
|
2021-02-03 16:47:43 +00:00
|
|
|
filteredRows = append(filteredRows, row)
|
2017-01-23 17:37:03 +00:00
|
|
|
}
|
2021-02-03 16:47:43 +00:00
|
|
|
|
|
|
|
res.Rows = filteredRows
|
2017-01-23 17:37:03 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 12:11:07 +00:00
|
|
|
targets, err := svc.ds.DistributedQueryCampaignTargetIDs(ctx, campaign.ID)
|
2020-03-23 01:33:04 +00:00
|
|
|
if err != nil {
|
|
|
|
conn.WriteJSONError("error retrieving campaign targets: " + err.Error())
|
|
|
|
return
|
|
|
|
}
|
2017-02-10 00:12:13 +00:00
|
|
|
|
2020-03-23 01:33:04 +00:00
|
|
|
updateStatus := func() error {
|
2021-06-03 23:24:15 +00:00
|
|
|
metrics, err := svc.CountHostsInTargets(ctx, &campaign.QueryID, *targets)
|
2017-02-10 00:12:13 +00:00
|
|
|
if err != nil {
|
|
|
|
if err = conn.WriteJSONError("error retrieving target counts"); err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return ctxerr.Wrap(ctx, err, "retrieve target counts, write failed")
|
2017-02-10 00:12:13 +00:00
|
|
|
}
|
2021-11-22 14:13:26 +00:00
|
|
|
return ctxerr.Wrap(ctx, err, "retrieve target counts")
|
2017-02-10 00:12:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
totals := targetTotals{
|
|
|
|
Total: metrics.TotalHosts,
|
|
|
|
Online: metrics.OnlineHosts,
|
|
|
|
Offline: metrics.OfflineHosts,
|
|
|
|
MissingInAction: metrics.MissingInActionHosts,
|
|
|
|
}
|
2018-05-17 22:54:34 +00:00
|
|
|
if lastTotals != totals {
|
|
|
|
lastTotals = totals
|
|
|
|
if err = conn.WriteJSONMessage("totals", totals); err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return ctxerr.Wrap(ctx, err, "write totals")
|
2018-05-17 22:54:34 +00:00
|
|
|
}
|
2017-02-10 00:12:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
status.ExpectedResults = totals.Online
|
|
|
|
if status.ActualResults >= status.ExpectedResults {
|
|
|
|
status.Status = campaignStatusFinished
|
|
|
|
}
|
|
|
|
// only write status message if status has changed
|
2018-05-17 22:54:34 +00:00
|
|
|
if lastStatus != status {
|
|
|
|
lastStatus = status
|
2017-02-10 00:12:13 +00:00
|
|
|
if err = conn.WriteJSONMessage("status", status); err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return ctxerr.Wrap(ctx, err, "write status")
|
2017-02-10 00:12:13 +00:00
|
|
|
}
|
|
|
|
}
|
2018-12-21 01:44:49 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := updateStatus(); err != nil {
|
2020-07-01 17:51:34 +00:00
|
|
|
_ = svc.logger.Log("msg", "error updating status", "err", err)
|
2018-12-21 01:44:49 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Push status updates every 5 seconds at most
|
|
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
|
|
defer ticker.Stop()
|
|
|
|
// Loop, pushing updates to results and expected totals
|
|
|
|
for {
|
|
|
|
// Update the expected hosts total (Should happen before
|
|
|
|
// any results are written, to avoid the frontend showing "x of
|
|
|
|
// 0 Hosts Returning y Records")
|
2016-11-29 18:20:06 +00:00
|
|
|
select {
|
|
|
|
case res := <-readChan:
|
|
|
|
// Receive a result and push it over the websocket
|
|
|
|
switch res := res.(type) {
|
2021-06-06 22:07:29 +00:00
|
|
|
case fleet.DistributedQueryResult:
|
2021-02-03 16:47:43 +00:00
|
|
|
mapHostnameRows(&res)
|
2016-11-29 18:20:06 +00:00
|
|
|
err = conn.WriteJSONMessage("result", res)
|
2021-11-15 14:11:38 +00:00
|
|
|
if ctxerr.Cause(err) == sockjs.ErrSessionNotOpen {
|
2020-07-01 17:51:34 +00:00
|
|
|
// return and stop sending the query if the session was closed
|
|
|
|
// by the client
|
|
|
|
return
|
|
|
|
}
|
2016-11-29 18:20:06 +00:00
|
|
|
if err != nil {
|
2020-07-01 17:51:34 +00:00
|
|
|
_ = svc.logger.Log("msg", "error writing to channel", "err", err)
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
2017-01-20 18:57:41 +00:00
|
|
|
status.ActualResults++
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
|
2018-12-21 01:44:49 +00:00
|
|
|
case <-ticker.C:
|
2020-03-23 01:33:04 +00:00
|
|
|
if conn.GetSessionState() == sockjs.SessionClosed {
|
|
|
|
// return and stop sending the query if the session was closed
|
|
|
|
// by the client
|
|
|
|
return
|
|
|
|
}
|
2018-12-21 01:44:49 +00:00
|
|
|
// Update status
|
|
|
|
if err := updateStatus(); err != nil {
|
|
|
|
svc.logger.Log("msg", "error updating status", "err", err)
|
|
|
|
return
|
|
|
|
}
|
2016-11-29 18:20:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|