Refactor campaign methods into separate file (#540)

This commit is contained in:
Zachary Wasserman 2016-11-29 10:20:06 -08:00 committed by GitHub
parent 9a769d67a4
commit ed3c696c6e
16 changed files with 674 additions and 599 deletions

View File

@ -0,0 +1,130 @@
package datastore
import (
"testing"
"time"
"github.com/WatchBeam/clock"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/patrickmn/sortutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newQuery(t *testing.T, ds kolide.Datastore, name, q string) *kolide.Query {
query, err := ds.NewQuery(&kolide.Query{
Name: name,
Query: q,
})
require.Nil(t, err)
return query
}
func newCampaign(t *testing.T, ds kolide.Datastore, queryID uint, status kolide.DistributedQueryStatus) *kolide.DistributedQueryCampaign {
campaign, err := ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{
QueryID: queryID,
Status: status,
})
require.Nil(t, err)
return campaign
}
func newHost(t *testing.T, ds kolide.Datastore, name, ip, key, uuid string, tim time.Time) *kolide.Host {
h, err := ds.NewHost(&kolide.Host{
HostName: name,
PrimaryIP: ip,
NodeKey: key,
UUID: uuid,
DetailUpdateTime: tim,
})
require.Nil(t, err)
require.NotZero(t, h.ID)
require.Nil(t, ds.MarkHostSeen(h, tim))
return h
}
func newLabel(t *testing.T, ds kolide.Datastore, name, query string) *kolide.Label {
l, err := ds.NewLabel(&kolide.Label{Name: name, Query: query})
require.Nil(t, err)
require.NotZero(t, l.ID)
return l
}
func addHost(t *testing.T, ds kolide.Datastore, campaignID, hostID uint) {
_, err := ds.NewDistributedQueryCampaignTarget(
&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetHost,
TargetID: hostID,
DistributedQueryCampaignID: campaignID,
})
require.Nil(t, err)
}
func addLabel(t *testing.T, ds kolide.Datastore, campaignID, labelID uint) {
_, err := ds.NewDistributedQueryCampaignTarget(
&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetLabel,
TargetID: labelID,
DistributedQueryCampaignID: campaignID,
})
require.Nil(t, err)
}
func checkTargets(t *testing.T, ds kolide.Datastore, campaignID uint, expectedHostIDs []uint, expectedLabelIDs []uint) {
hostIDs, labelIDs, err := ds.DistributedQueryCampaignTargetIDs(campaignID)
require.Nil(t, err)
sortutil.Asc(expectedHostIDs)
sortutil.Asc(hostIDs)
assert.Equal(t, expectedHostIDs, hostIDs)
sortutil.Asc(expectedLabelIDs)
sortutil.Asc(labelIDs)
assert.Equal(t, expectedLabelIDs, labelIDs)
}
func testDistributedQueryCampaign(t *testing.T, ds kolide.Datastore) {
mockClock := clock.NewMockClock()
query := newQuery(t, ds, "test", "select * from time")
campaign := newCampaign(t, ds, query.ID, kolide.QueryRunning)
{
retrieved, err := ds.DistributedQueryCampaign(campaign.ID)
require.Nil(t, err)
assert.Equal(t, campaign.QueryID, retrieved.QueryID)
assert.Equal(t, campaign.Status, retrieved.Status)
}
h1 := newHost(t, ds, "foo.local", "192.168.1.10", "1", "1", mockClock.Now())
h2 := newHost(t, ds, "bar.local", "192.168.1.11", "2", "2", mockClock.Now().Add(-1*time.Hour))
h3 := newHost(t, ds, "baz.local", "192.168.1.12", "3", "3", mockClock.Now().Add(-13*time.Minute))
l1 := newLabel(t, ds, "label foo", "query foo")
l2 := newLabel(t, ds, "label bar", "query foo")
checkTargets(t, ds, campaign.ID, []uint{}, []uint{})
addHost(t, ds, campaign.ID, h1.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{})
addLabel(t, ds, campaign.ID, l1.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID})
addLabel(t, ds, campaign.ID, l2.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID, l2.ID})
addHost(t, ds, campaign.ID, h2.ID)
addHost(t, ds, campaign.ID, h3.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID, h2.ID, h3.ID}, []uint{l1.ID, l2.ID})
}

View File

@ -3,13 +3,9 @@ package datastore
import (
"fmt"
"testing"
"time"
"github.com/WatchBeam/clock"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/patrickmn/sortutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func testDeleteQuery(t *testing.T, ds kolide.Datastore) {
@ -63,121 +59,3 @@ func testListQuery(t *testing.T, ds kolide.Datastore) {
assert.Nil(t, err)
assert.Equal(t, 10, len(results))
}
func newQuery(t *testing.T, ds kolide.Datastore, name, q string) *kolide.Query {
query, err := ds.NewQuery(&kolide.Query{
Name: name,
Query: q,
})
require.Nil(t, err)
return query
}
func newCampaign(t *testing.T, ds kolide.Datastore, queryID uint, status kolide.DistributedQueryStatus) *kolide.DistributedQueryCampaign {
campaign, err := ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{
QueryID: queryID,
Status: status,
})
require.Nil(t, err)
return campaign
}
func newHost(t *testing.T, ds kolide.Datastore, name, ip, key, uuid string, tim time.Time) *kolide.Host {
h, err := ds.NewHost(&kolide.Host{
HostName: name,
PrimaryIP: ip,
NodeKey: key,
UUID: uuid,
DetailUpdateTime: tim,
})
require.Nil(t, err)
require.NotZero(t, h.ID)
require.Nil(t, ds.MarkHostSeen(h, tim))
return h
}
func newLabel(t *testing.T, ds kolide.Datastore, name, query string) *kolide.Label {
l, err := ds.NewLabel(&kolide.Label{Name: name, Query: query})
require.Nil(t, err)
require.NotZero(t, l.ID)
return l
}
func addHost(t *testing.T, ds kolide.Datastore, campaignID, hostID uint) {
_, err := ds.NewDistributedQueryCampaignTarget(
&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetHost,
TargetID: hostID,
DistributedQueryCampaignID: campaignID,
})
require.Nil(t, err)
}
func addLabel(t *testing.T, ds kolide.Datastore, campaignID, labelID uint) {
_, err := ds.NewDistributedQueryCampaignTarget(
&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetLabel,
TargetID: labelID,
DistributedQueryCampaignID: campaignID,
})
require.Nil(t, err)
}
func checkTargets(t *testing.T, ds kolide.Datastore, campaignID uint, expectedHostIDs []uint, expectedLabelIDs []uint) {
hostIDs, labelIDs, err := ds.DistributedQueryCampaignTargetIDs(campaignID)
require.Nil(t, err)
sortutil.Asc(expectedHostIDs)
sortutil.Asc(hostIDs)
assert.Equal(t, expectedHostIDs, hostIDs)
sortutil.Asc(expectedLabelIDs)
sortutil.Asc(labelIDs)
assert.Equal(t, expectedLabelIDs, labelIDs)
}
func testDistributedQueryCampaign(t *testing.T, ds kolide.Datastore) {
mockClock := clock.NewMockClock()
query := newQuery(t, ds, "test", "select * from time")
campaign := newCampaign(t, ds, query.ID, kolide.QueryRunning)
{
retrieved, err := ds.DistributedQueryCampaign(campaign.ID)
require.Nil(t, err)
assert.Equal(t, campaign.QueryID, retrieved.QueryID)
assert.Equal(t, campaign.Status, retrieved.Status)
}
h1 := newHost(t, ds, "foo.local", "192.168.1.10", "1", "1", mockClock.Now())
h2 := newHost(t, ds, "bar.local", "192.168.1.11", "2", "2", mockClock.Now().Add(-1*time.Hour))
h3 := newHost(t, ds, "baz.local", "192.168.1.12", "3", "3", mockClock.Now().Add(-13*time.Minute))
l1 := newLabel(t, ds, "label foo", "query foo")
l2 := newLabel(t, ds, "label bar", "query foo")
checkTargets(t, ds, campaign.ID, []uint{}, []uint{})
addHost(t, ds, campaign.ID, h1.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{})
addLabel(t, ds, campaign.ID, l1.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID})
addLabel(t, ds, campaign.ID, l2.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID, l2.ID})
addHost(t, ds, campaign.ID, h2.ID)
addHost(t, ds, campaign.ID, h3.ID)
checkTargets(t, ds, campaign.ID, []uint{h1.ID, h2.ID, h3.ID}, []uint{l1.ID, l2.ID})
}

View File

@ -0,0 +1,89 @@
package inmem
import (
"fmt"
"github.com/kolide/kolide-ose/server/errors"
"github.com/kolide/kolide-ose/server/kolide"
)
func (orm *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
camp.ID = orm.nextID(camp)
orm.distributedQueryCampaigns[camp.ID] = *camp
return camp, nil
}
func (orm *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
campaign, ok := orm.distributedQueryCampaigns[id]
if !ok {
return nil, errors.ErrNotFound
}
return &campaign, nil
}
func (orm *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error {
orm.mtx.Lock()
defer orm.mtx.Unlock()
if _, ok := orm.distributedQueryCampaigns[camp.ID]; !ok {
return errors.ErrNotFound
}
orm.distributedQueryCampaigns[camp.ID] = *camp
return nil
}
func (orm *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
hostIDs = []uint{}
labelIDs = []uint{}
for _, target := range orm.distributedQueryCampaignTargets {
if target.DistributedQueryCampaignID == id {
if target.Type == kolide.TargetHost {
hostIDs = append(hostIDs, target.TargetID)
} else if target.Type == kolide.TargetLabel {
labelIDs = append(labelIDs, target.TargetID)
} else {
return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type)
}
}
}
return hostIDs, labelIDs, nil
}
func (orm *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
target.ID = orm.nextID(target)
orm.distributedQueryCampaignTargets[target.ID] = *target
return target, nil
}
func (orm *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
for _, e := range orm.distributedQueryExecutions {
if exec.HostID == e.ID && exec.DistributedQueryCampaignID == e.DistributedQueryCampaignID {
return exec, errors.ErrExists
}
}
exec.ID = orm.nextID(exec)
orm.distributedQueryExecutions[exec.ID] = *exec
return exec, nil
}

View File

@ -1,7 +1,6 @@
package inmem
import (
"fmt"
"sort"
"github.com/kolide/kolide-ose/server/errors"
@ -103,84 +102,3 @@ func (orm *Datastore) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, erro
return queries, nil
}
func (orm *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
camp.ID = orm.nextID(camp)
orm.distributedQueryCampaigns[camp.ID] = *camp
return camp, nil
}
func (orm *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
campaign, ok := orm.distributedQueryCampaigns[id]
if !ok {
return nil, errors.ErrNotFound
}
return &campaign, nil
}
func (orm *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error {
orm.mtx.Lock()
defer orm.mtx.Unlock()
if _, ok := orm.distributedQueryCampaigns[camp.ID]; !ok {
return errors.ErrNotFound
}
orm.distributedQueryCampaigns[camp.ID] = *camp
return nil
}
func (orm *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
hostIDs = []uint{}
labelIDs = []uint{}
for _, target := range orm.distributedQueryCampaignTargets {
if target.DistributedQueryCampaignID == id {
if target.Type == kolide.TargetHost {
hostIDs = append(hostIDs, target.TargetID)
} else if target.Type == kolide.TargetLabel {
labelIDs = append(labelIDs, target.TargetID)
} else {
return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type)
}
}
}
return hostIDs, labelIDs, nil
}
func (orm *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
target.ID = orm.nextID(target)
orm.distributedQueryCampaignTargets[target.ID] = *target
return target, nil
}
func (orm *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
for _, e := range orm.distributedQueryExecutions {
if exec.HostID == e.ID && exec.DistributedQueryCampaignID == e.DistributedQueryCampaignID {
return exec, errors.ErrExists
}
}
exec.ID = orm.nextID(exec)
orm.distributedQueryExecutions[exec.ID] = *exec
return exec, nil
}

View File

@ -0,0 +1,123 @@
package mysql
import (
"fmt"
"github.com/kolide/kolide-ose/server/errors"
"github.com/kolide/kolide-ose/server/kolide"
)
func (d *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) {
sqlStatement := `
INSERT INTO distributed_query_campaigns (
query_id,
status,
user_id
)
VALUES(?,?,?)
`
result, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID)
if err != nil {
return nil, errors.DatabaseError(err)
}
id, _ := result.LastInsertId()
camp.ID = uint(id)
return camp, nil
}
func (d *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) {
sql := `
SELECT * FROM distributed_query_campaigns WHERE id = ? AND NOT deleted
`
campaign := &kolide.DistributedQueryCampaign{}
if err := d.db.Get(campaign, sql, id); err != nil {
return nil, errors.DatabaseError(err)
}
return campaign, nil
}
func (d *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error {
sqlStatement := `
UPDATE distributed_query_campaigns SET
query_id = ?,
status = ?,
user_id = ?
WHERE id = ?
AND NOT deleted
`
_, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID, camp.ID)
if err != nil {
return errors.DatabaseError(err)
}
return nil
}
func (d *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) {
sqlStatement := `
SELECT * FROM distributed_query_campaign_targets WHERE distributed_query_campaign_id = ?
`
targets := []kolide.DistributedQueryCampaignTarget{}
if err = d.db.Select(&targets, sqlStatement, id); err != nil {
return nil, nil, errors.DatabaseError(err)
}
hostIDs = []uint{}
labelIDs = []uint{}
for _, target := range targets {
if target.Type == kolide.TargetHost {
hostIDs = append(hostIDs, target.TargetID)
} else if target.Type == kolide.TargetLabel {
labelIDs = append(labelIDs, target.TargetID)
} else {
return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type)
}
}
return hostIDs, labelIDs, nil
}
func (d *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) {
sqlStatement := `
INSERT into distributed_query_campaign_targets (
type,
distributed_query_campaign_id,
target_id
)
VALUES (?,?,?)
`
result, err := d.db.Exec(sqlStatement, target.Type, target.DistributedQueryCampaignID, target.TargetID)
if err != nil {
return nil, errors.DatabaseError(err)
}
id, _ := result.LastInsertId()
target.ID = uint(id)
return target, nil
}
func (d *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) {
sqlStatement := `
INSERT INTO distributed_query_executions (
host_id,
distributed_query_campaign_id,
status,
error,
execution_duration
) VALUES (?,?,?,?,?)
`
result, err := d.db.Exec(sqlStatement, exec.HostID, exec.DistributedQueryCampaignID,
exec.Status, exec.Error, exec.ExecutionDuration)
if err != nil {
return nil, errors.DatabaseError(err)
}
id, _ := result.LastInsertId()
exec.ID = uint(id)
return exec, nil
}

View File

@ -1,8 +1,6 @@
package mysql
import (
"fmt"
"github.com/kolide/kolide-ose/server/errors"
"github.com/kolide/kolide-ose/server/kolide"
)
@ -90,118 +88,3 @@ func (d *Datastore) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, error)
return results, nil
}
func (d *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) {
sqlStatement := `
INSERT INTO distributed_query_campaigns (
query_id,
status,
user_id
)
VALUES(?,?,?)
`
result, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID)
if err != nil {
return nil, errors.DatabaseError(err)
}
id, _ := result.LastInsertId()
camp.ID = uint(id)
return camp, nil
}
func (d *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) {
sql := `
SELECT * FROM distributed_query_campaigns WHERE id = ? AND NOT deleted
`
campaign := &kolide.DistributedQueryCampaign{}
if err := d.db.Get(campaign, sql, id); err != nil {
return nil, errors.DatabaseError(err)
}
return campaign, nil
}
func (d *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error {
sqlStatement := `
UPDATE distributed_query_campaigns SET
query_id = ?,
status = ?,
user_id = ?
WHERE id = ?
AND NOT deleted
`
_, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID, camp.ID)
if err != nil {
return errors.DatabaseError(err)
}
return nil
}
func (d *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) {
sqlStatement := `
SELECT * FROM distributed_query_campaign_targets WHERE distributed_query_campaign_id = ?
`
targets := []kolide.DistributedQueryCampaignTarget{}
if err = d.db.Select(&targets, sqlStatement, id); err != nil {
return nil, nil, errors.DatabaseError(err)
}
hostIDs = []uint{}
labelIDs = []uint{}
for _, target := range targets {
if target.Type == kolide.TargetHost {
hostIDs = append(hostIDs, target.TargetID)
} else if target.Type == kolide.TargetLabel {
labelIDs = append(labelIDs, target.TargetID)
} else {
return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type)
}
}
return hostIDs, labelIDs, nil
}
func (d *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) {
sqlStatement := `
INSERT into distributed_query_campaign_targets (
type,
distributed_query_campaign_id,
target_id
)
VALUES (?,?,?)
`
result, err := d.db.Exec(sqlStatement, target.Type, target.DistributedQueryCampaignID, target.TargetID)
if err != nil {
return nil, errors.DatabaseError(err)
}
id, _ := result.LastInsertId()
target.ID = uint(id)
return target, nil
}
func (d *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) {
sqlStatement := `
INSERT INTO distributed_query_executions (
host_id,
distributed_query_campaign_id,
status,
error,
execution_duration
) VALUES (?,?,?,?,?)
`
result, err := d.db.Exec(sqlStatement, exec.HostID, exec.DistributedQueryCampaignID,
exec.Status, exec.Error, exec.ExecutionDuration)
if err != nil {
return nil, errors.DatabaseError(err)
}
id, _ := result.LastInsertId()
exec.ID = uint(id)
return exec, nil
}

106
server/kolide/campaigns.go Normal file
View File

@ -0,0 +1,106 @@
package kolide
import (
"time"
"github.com/kolide/kolide-ose/server/websocket"
"golang.org/x/net/context"
)
// CampaignStore defines the distributed query campaign related datastore
// methods
type CampaignStore interface {
// NewDistributedQueryCampaign creates a new distributed query campaign
NewDistributedQueryCampaign(camp *DistributedQueryCampaign) (*DistributedQueryCampaign, error)
// DistributedQueryCampaign loads a distributed query campaign by ID
DistributedQueryCampaign(id uint) (*DistributedQueryCampaign, error)
// SaveDistributedQueryCampaign updates an existing distributed query
// campaign
SaveDistributedQueryCampaign(camp *DistributedQueryCampaign) error
// DistributedQueryCampaignTargetIDs gets the IDs of the targets for
// the query campaign of the provided ID
DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error)
// NewDistributedQueryCampaignTarget adds a new target to an existing
// distributed query campaign
NewDistributedQueryCampaignTarget(target *DistributedQueryCampaignTarget) (*DistributedQueryCampaignTarget, error)
// NewDistributedQueryCampaignExecution records a new execution for a
// distributed query campaign
NewDistributedQueryExecution(exec *DistributedQueryExecution) (*DistributedQueryExecution, error)
}
// CampaignService defines the distributed query campaign related service
// methods
type CampaignService interface {
// NewDistributedQueryCampaign creates a new distributed query campaign
// with the provided query and host/label targets
NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*DistributedQueryCampaign, error)
// StreamCampaignResults streams updates with query results and
// expected host totals over the provided websocket. Note that the type
// signature is somewhat inconsistent due to this being a streaming API
// and not the typical go-kit RPC style.
StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint)
}
// DistributedQueryStatus is the lifecycle status of a distributed query
// campaign.
type DistributedQueryStatus int
const (
QueryRunning DistributedQueryStatus = iota
QueryComplete DistributedQueryStatus = iota
QueryError DistributedQueryStatus = iota
)
// DistributedQueryCampaign is the basic metadata associated with a distributed
// query.
type DistributedQueryCampaign struct {
UpdateCreateTimestamps
DeleteFields
ID uint `json:"id"`
QueryID uint `json:"query_id" db:"query_id"`
Status DistributedQueryStatus `json:"status"`
UserID uint `json:"user_id" db:"user_id"`
}
// DistributedQueryCampaignTarget stores a target (host or label) for a
// distributed query campaign. There is a one -> many mapping of campaigns to
// targets.
type DistributedQueryCampaignTarget struct {
ID uint
Type TargetType
DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"`
TargetID uint `db:"target_id"`
}
// DistributedQueryExecutionStatus is the status of a distributed query
// execution on a single host.
type DistributedQueryExecutionStatus int
const (
ExecutionWaiting DistributedQueryExecutionStatus = iota
ExecutionRequested
ExecutionSucceeded
ExecutionFailed
)
// DistributedQueryResult is the result returned from the execution of a
// distributed query on a single host.
type DistributedQueryResult struct {
DistributedQueryCampaignID uint `json:"distributed_query_execution_id"`
Host Host `json:"host"`
Rows []map[string]string `json:"rows"`
}
// DistributedQueryExecution is the metadata associated with a distributed
// query execution on a single host.
type DistributedQueryExecution struct {
ID uint
HostID uint `db:"host_id"`
DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"`
Status DistributedQueryExecutionStatus
Error string
ExecutionDuration time.Duration `db:"execution_duration"`
}

View File

@ -4,6 +4,7 @@ package kolide
type Datastore interface {
UserStore
QueryStore
CampaignStore
PackStore
LabelStore
HostStore

View File

@ -3,7 +3,6 @@ package kolide
import (
"time"
"github.com/kolide/kolide-ose/server/websocket"
"golang.org/x/net/context"
)
@ -14,25 +13,6 @@ type QueryStore interface {
DeleteQuery(query *Query) error
Query(id uint) (*Query, error)
ListQueries(opt ListOptions) ([]*Query, error)
// NewDistributedQueryCampaign creates a new distributed query campaign
NewDistributedQueryCampaign(camp *DistributedQueryCampaign) (*DistributedQueryCampaign, error)
// DistributedQueryCampaign loads a distributed query campaign by ID
DistributedQueryCampaign(id uint) (*DistributedQueryCampaign, error)
// SaveDistributedQueryCampaign updates an existing distributed query
// campaign
SaveDistributedQueryCampaign(camp *DistributedQueryCampaign) error
// DistributedQueryCampaignTargetIDs gets the IDs of the targets for
// the query campaign of the provided ID
DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error)
// NewDistributedQueryCampaignTarget adds a new target to an existing
// distributed query campaign
NewDistributedQueryCampaignTarget(target *DistributedQueryCampaignTarget) (*DistributedQueryCampaignTarget, error)
// NewDistributedQueryCampaignExecution records a new execution for a
// distributed query campaign
NewDistributedQueryExecution(exec *DistributedQueryExecution) (*DistributedQueryExecution, error)
}
type QueryService interface {
@ -41,13 +21,6 @@ type QueryService interface {
NewQuery(ctx context.Context, p QueryPayload) (*Query, error)
ModifyQuery(ctx context.Context, id uint, p QueryPayload) (*Query, error)
DeleteQuery(ctx context.Context, id uint) error
NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*DistributedQueryCampaign, error)
// StreamCampaignResults streams updates with query results and
// expected host totals over the provided websocket. Note that the type
// signature is somewhat inconsistent due to this being a streaming API
// and not the typical go-kit RPC style.
StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint)
}
type QueryPayload struct {
@ -75,54 +48,6 @@ type Query struct {
Version string `json:"version"`
}
type DistributedQueryStatus int
const (
QueryRunning DistributedQueryStatus = iota
QueryComplete DistributedQueryStatus = iota
QueryError DistributedQueryStatus = iota
)
type DistributedQueryCampaign struct {
UpdateCreateTimestamps
DeleteFields
ID uint `json:"id"`
QueryID uint `json:"query_id" db:"query_id"`
Status DistributedQueryStatus `json:"status"`
UserID uint `json:"user_id" db:"user_id"`
}
type DistributedQueryCampaignTarget struct {
ID uint
Type TargetType
DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"`
TargetID uint `db:"target_id"`
}
type DistributedQueryExecutionStatus int
const (
ExecutionWaiting DistributedQueryExecutionStatus = iota
ExecutionRequested
ExecutionSucceeded
ExecutionFailed
)
type DistributedQueryResult struct {
DistributedQueryCampaignID uint `json:"distributed_query_execution_id"`
Host Host `json:"host"`
Rows []map[string]string `json:"rows"`
}
type DistributedQueryExecution struct {
ID uint
HostID uint `db:"host_id"`
DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"`
Status DistributedQueryExecutionStatus
Error string
ExecutionDuration time.Duration `db:"execution_duration"`
}
type Option struct {
ID uint
CreatedAt time.Time

View File

@ -7,6 +7,7 @@ type Service interface {
PackService
LabelService
QueryService
CampaignService
OsqueryService
HostService
AppConfigService

View File

@ -0,0 +1,81 @@
package service
import (
"net/http"
"github.com/go-kit/kit/endpoint"
"github.com/kolide/kolide-ose/server/contexts/viewer"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/kolide/kolide-ose/server/websocket"
"golang.org/x/net/context"
)
////////////////////////////////////////////////////////////////////////////////
// Create Distributed Query Campaign
////////////////////////////////////////////////////////////////////////////////
type createDistributedQueryCampaignRequest struct {
UserID uint
Query string `json:"query"`
Selected struct {
Labels []uint `json:"labels"`
Hosts []uint `json:"hosts"`
} `json:"selected"`
}
type createDistributedQueryCampaignResponse struct {
Campaign *kolide.DistributedQueryCampaign `json:"campaign,omitempty"`
Err error `json:"error,omitempty"`
}
func (r createDistributedQueryCampaignResponse) error() error { return r.Err }
func makeCreateDistributedQueryCampaignEndpoint(svc kolide.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(createDistributedQueryCampaignRequest)
campaign, err := svc.NewDistributedQueryCampaign(ctx, req.Query, req.Selected.Hosts, req.Selected.Labels)
if err != nil {
return createQueryResponse{Err: err}, nil
}
return createDistributedQueryCampaignResponse{campaign, nil}, nil
}
}
////////////////////////////////////////////////////////////////////////////////
// Stream Distributed Query Campaign Results and Metadata
////////////////////////////////////////////////////////////////////////////////
func makeStreamDistributedQueryCampaignResultsHandler(svc kolide.Service, jwtKey string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Upgrade to websocket connection
conn, err := websocket.Upgrade(w, r)
if err != nil {
return
}
defer conn.Close()
// Receive the auth bearer token
token, err := conn.ReadAuthToken()
if err != nil {
return
}
// Authenticate with the token
vc, err := authViewer(context.Background(), jwtKey, string(token), svc)
if err != nil || !vc.CanPerformActions() {
conn.WriteJSONError("unauthorized")
return
}
ctx := viewer.NewContext(context.Background(), *vc)
campaignID, err := idFromRequest(r, "id")
if err != nil {
conn.WriteJSONError("invalid campaign ID")
return
}
svc.StreamCampaignResults(ctx, conn, campaignID)
}
}

View File

@ -1,12 +1,8 @@
package service
import (
"net/http"
"github.com/go-kit/kit/endpoint"
"github.com/kolide/kolide-ose/server/contexts/viewer"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/kolide/kolide-ose/server/websocket"
"golang.org/x/net/context"
)
@ -143,73 +139,3 @@ func makeDeleteQueryEndpoint(svc kolide.Service) endpoint.Endpoint {
return deleteQueryResponse{}, nil
}
}
////////////////////////////////////////////////////////////////////////////////
// Create Distributed Query Campaign
////////////////////////////////////////////////////////////////////////////////
type createDistributedQueryCampaignRequest struct {
UserID uint
Query string `json:"query"`
Selected struct {
Labels []uint `json:"labels"`
Hosts []uint `json:"hosts"`
} `json:"selected"`
}
type createDistributedQueryCampaignResponse struct {
Campaign *kolide.DistributedQueryCampaign `json:"campaign,omitempty"`
Err error `json:"error,omitempty"`
}
func (r createDistributedQueryCampaignResponse) error() error { return r.Err }
func makeCreateDistributedQueryCampaignEndpoint(svc kolide.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(createDistributedQueryCampaignRequest)
campaign, err := svc.NewDistributedQueryCampaign(ctx, req.Query, req.Selected.Hosts, req.Selected.Labels)
if err != nil {
return createQueryResponse{Err: err}, nil
}
return createDistributedQueryCampaignResponse{campaign, nil}, nil
}
}
////////////////////////////////////////////////////////////////////////////////
// Stream Distributed Query Campaign Results and Metadata
////////////////////////////////////////////////////////////////////////////////
func makeStreamDistributedQueryCampaignResultsHandler(svc kolide.Service, jwtKey string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Upgrade to websocket connection
conn, err := websocket.Upgrade(w, r)
if err != nil {
return
}
defer conn.Close()
// Receive the auth bearer token
token, err := conn.ReadAuthToken()
if err != nil {
return
}
// Authenticate with the token
vc, err := authViewer(context.Background(), jwtKey, string(token), svc)
if err != nil || !vc.CanPerformActions() {
conn.WriteJSONError("unauthorized")
return
}
ctx := viewer.NewContext(context.Background(), *vc)
campaignID, err := idFromRequest(r, "id")
if err != nil {
conn.WriteJSONError("invalid campaign ID")
return
}
svc.StreamCampaignResults(ctx, conn, campaignID)
}
}

View File

@ -0,0 +1,127 @@
package service
import (
"fmt"
"time"
"github.com/kolide/kolide-ose/server/contexts/viewer"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/kolide/kolide-ose/server/websocket"
"golang.org/x/net/context"
)
func (svc service) NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*kolide.DistributedQueryCampaign, error) {
vc, ok := viewer.FromContext(ctx)
if !ok {
return nil, errNoContext
}
query, err := svc.NewQuery(ctx, kolide.QueryPayload{
Name: &queryString,
Query: &queryString,
})
if err != nil {
return nil, err
}
campaign, err := svc.ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{
QueryID: query.ID,
Status: kolide.QueryRunning,
UserID: vc.UserID(),
})
if err != nil {
return nil, err
}
// Add host targets
for _, hid := range hosts {
_, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetHost,
DistributedQueryCampaignID: campaign.ID,
TargetID: hid,
})
if err != nil {
return nil, err
}
}
// Add label targets
for _, lid := range labels {
_, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetLabel,
DistributedQueryCampaignID: campaign.ID,
TargetID: lid,
})
if err != nil {
return nil, err
}
}
return campaign, nil
}
type targetTotals struct {
Total uint `json:"count"`
Online uint `json:"online"`
}
func (svc service) StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) {
// Find the campaign and ensure it is active
campaign, err := svc.ds.DistributedQueryCampaign(campaignID)
if err != nil {
conn.WriteJSONError(fmt.Sprintf("cannot find campaign for ID %d", campaignID))
return
}
if campaign.Status != kolide.QueryRunning {
conn.WriteJSONError(fmt.Sprintf("campaign %d not running", campaignID))
return
}
// Open the channel from which we will receive incoming query results
// (probably from the redis pubsub implementation)
readChan, err := svc.resultStore.ReadChannel(context.Background(), *campaign)
if err != nil {
conn.WriteJSONError(fmt.Sprintf("cannot open read channel for campaign %d ", campaignID))
return
}
// Loop, pushing updates to results and expected totals
for {
select {
case res := <-readChan:
// Receive a result and push it over the websocket
switch res := res.(type) {
case kolide.DistributedQueryResult:
err = conn.WriteJSONMessage("result", res)
if err != nil {
fmt.Println("error writing to channel")
}
}
case <-time.After(1 * time.Second):
// Update the expected hosts total
hostIDs, labelIDs, err := svc.ds.DistributedQueryCampaignTargetIDs(campaign.ID)
if err != nil {
if err = conn.WriteJSONError("error retrieving campaign targets"); err != nil {
return
}
}
var totals targetTotals
totals.Total, totals.Online, err = svc.CountHostsInTargets(
context.Background(), hostIDs, labelIDs,
)
if err != nil {
if err = conn.WriteJSONError("error retrieving target counts"); err != nil {
return
}
}
if err = conn.WriteJSONMessage("totals", totals); err != nil {
return
}
}
}
}

View File

@ -1,12 +1,7 @@
package service
import (
"fmt"
"time"
"github.com/kolide/kolide-ose/server/contexts/viewer"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/kolide/kolide-ose/server/websocket"
"golang.org/x/net/context"
)
@ -119,119 +114,3 @@ func (svc service) DeleteQuery(ctx context.Context, id uint) error {
return nil
}
func (svc service) NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*kolide.DistributedQueryCampaign, error) {
vc, ok := viewer.FromContext(ctx)
if !ok {
return nil, errNoContext
}
query, err := svc.NewQuery(ctx, kolide.QueryPayload{
Name: &queryString,
Query: &queryString,
})
if err != nil {
return nil, err
}
campaign, err := svc.ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{
QueryID: query.ID,
Status: kolide.QueryRunning,
UserID: vc.UserID(),
})
if err != nil {
return nil, err
}
// Add host targets
for _, hid := range hosts {
_, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetHost,
DistributedQueryCampaignID: campaign.ID,
TargetID: hid,
})
if err != nil {
return nil, err
}
}
// Add label targets
for _, lid := range labels {
_, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetLabel,
DistributedQueryCampaignID: campaign.ID,
TargetID: lid,
})
if err != nil {
return nil, err
}
}
return campaign, nil
}
type targetTotals struct {
Total uint `json:"count"`
Online uint `json:"online"`
}
func (svc service) StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) {
// Find the campaign and ensure it is active
campaign, err := svc.ds.DistributedQueryCampaign(campaignID)
if err != nil {
conn.WriteJSONError(fmt.Sprintf("cannot find campaign for ID %d", campaignID))
return
}
if campaign.Status != kolide.QueryRunning {
conn.WriteJSONError(fmt.Sprintf("campaign %d not running", campaignID))
return
}
// Open the channel from which we will receive incoming query results
// (probably from the redis pubsub implementation)
readChan, err := svc.resultStore.ReadChannel(context.Background(), *campaign)
if err != nil {
conn.WriteJSONError(fmt.Sprintf("cannot open read channel for campaign %d ", campaignID))
return
}
// Loop, pushing updates to results and expected totals
for {
select {
case res := <-readChan:
// Receive a result and push it over the websocket
switch res := res.(type) {
case kolide.DistributedQueryResult:
err = conn.WriteJSONMessage("result", res)
if err != nil {
fmt.Println("error writing to channel")
}
}
case <-time.After(1 * time.Second):
// Update the expected hosts total
hostIDs, labelIDs, err := svc.ds.DistributedQueryCampaignTargetIDs(campaign.ID)
if err != nil {
if err = conn.WriteJSONError("error retrieving campaign targets"); err != nil {
return
}
}
var totals targetTotals
totals.Total, totals.Online, err = svc.CountHostsInTargets(
context.Background(), hostIDs, labelIDs,
)
if err != nil {
if err = conn.WriteJSONError("error retrieving target counts"); err != nil {
return
}
}
if err = conn.WriteJSONMessage("totals", totals); err != nil {
return
}
}
}
}

View File

@ -0,0 +1,16 @@
package service
import (
"encoding/json"
"net/http"
"golang.org/x/net/context"
)
func decodeCreateDistributedQueryCampaignRequest(ctx context.Context, r *http.Request) (interface{}, error) {
var req createDistributedQueryCampaignRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
}
return req, nil
}

View File

@ -55,11 +55,3 @@ func decodeListQueriesRequest(ctx context.Context, r *http.Request) (interface{}
}
return listQueriesRequest{ListOptions: opt}, nil
}
func decodeCreateDistributedQueryCampaignRequest(ctx context.Context, r *http.Request) (interface{}, error) {
var req createDistributedQueryCampaignRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
}
return req, nil
}