mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 08:55:24 +00:00
Add statistics for number of hosts that have not submitted results for distributed queries (#6495)
This commit is contained in:
parent
9a1adeb61b
commit
cc34585a47
2
changes/issue-6448-stats-hosts-not-responding
Normal file
2
changes/issue-6448-stats-hosts-not-responding
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
- Added statistics for number of hosts per deployment that have not submitted results for distibuted
|
||||||
|
queries to statistics payload delivered to fleetdm.com
|
@ -29,7 +29,7 @@ func errHandler(ctx context.Context, logger kitlog.Logger, msg string, err error
|
|||||||
ctxerr.Handle(ctx, err)
|
ctxerr.Handle(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, identifier string, license *fleet.LicenseInfo, enrollHostLimiter fleet.EnrollHostLimiter) {
|
func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, identifier string, config config.FleetConfig, license *fleet.LicenseInfo, enrollHostLimiter fleet.EnrollHostLimiter) {
|
||||||
logger = kitlog.With(logger, "cron", lockKeyLeader)
|
logger = kitlog.With(logger, "cron", lockKeyLeader)
|
||||||
|
|
||||||
ticker := time.NewTicker(10 * time.Second)
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
@ -95,7 +95,7 @@ func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, ident
|
|||||||
|
|
||||||
// NOTE(mna): this is not a route from the fleet server (not in server/service/handler.go) so it
|
// NOTE(mna): this is not a route from the fleet server (not in server/service/handler.go) so it
|
||||||
// will not automatically support the /latest/ versioning. Leaving it as /v1/ for that reason.
|
// will not automatically support the /latest/ versioning. Leaving it as /v1/ for that reason.
|
||||||
err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics", license)
|
err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics", config, license)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errHandler(ctx, logger, "sending statistics", err)
|
errHandler(ctx, logger, "sending statistics", err)
|
||||||
}
|
}
|
||||||
|
@ -649,7 +649,7 @@ const (
|
|||||||
lockKeyWorker = "worker"
|
lockKeyWorker = "worker"
|
||||||
)
|
)
|
||||||
|
|
||||||
func trySendStatistics(ctx context.Context, ds fleet.Datastore, frequency time.Duration, url string, license *fleet.LicenseInfo) error {
|
func trySendStatistics(ctx context.Context, ds fleet.Datastore, frequency time.Duration, url string, config configpkg.FleetConfig, license *fleet.LicenseInfo) error {
|
||||||
ac, err := ds.AppConfig(ctx)
|
ac, err := ds.AppConfig(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -658,7 +658,7 @@ func trySendStatistics(ctx context.Context, ds fleet.Datastore, frequency time.D
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, shouldSend, err := ds.ShouldSendStatistics(ctx, frequency, license)
|
stats, shouldSend, err := ds.ShouldSendStatistics(ctx, frequency, config, license)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -691,7 +691,7 @@ func runCrons(
|
|||||||
// StartCollectors starts a goroutine per collector, using ctx to cancel.
|
// StartCollectors starts a goroutine per collector, using ctx to cancel.
|
||||||
task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task"))
|
task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task"))
|
||||||
|
|
||||||
go cronDB(ctx, ds, kitlog.With(logger, "cron", "cleanups"), ourIdentifier, license, enrollHostLimiter)
|
go cronDB(ctx, ds, kitlog.With(logger, "cron", "cleanups"), ourIdentifier, config, license, enrollHostLimiter)
|
||||||
go cronVulnerabilities(
|
go cronVulnerabilities(
|
||||||
ctx, ds, kitlog.With(logger, "cron", "vulnerabilities"), ourIdentifier, config.Vulnerabilities)
|
ctx, ds, kitlog.With(logger, "cron", "vulnerabilities"), ourIdentifier, config.Vulnerabilities)
|
||||||
go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour)
|
go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour)
|
||||||
|
@ -26,6 +26,8 @@ import (
|
|||||||
func TestMaybeSendStatistics(t *testing.T) {
|
func TestMaybeSendStatistics(t *testing.T) {
|
||||||
ds := new(mock.Store)
|
ds := new(mock.Store)
|
||||||
|
|
||||||
|
fleetConfig := config.FleetConfig{Osquery: config.OsqueryConfig{DetailUpdateInterval: 1 * time.Hour}}
|
||||||
|
|
||||||
requestBody := ""
|
requestBody := ""
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -39,7 +41,7 @@ func TestMaybeSendStatistics(t *testing.T) {
|
|||||||
return &fleet.AppConfig{ServerSettings: fleet.ServerSettings{EnableAnalytics: true}}, nil
|
return &fleet.AppConfig{ServerSettings: fleet.ServerSettings{EnableAnalytics: true}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ds.ShouldSendStatisticsFunc = func(ctx context.Context, frequency time.Duration, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
ds.ShouldSendStatisticsFunc = func(ctx context.Context, frequency time.Duration, config config.FleetConfig, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
||||||
return fleet.StatisticsPayload{
|
return fleet.StatisticsPayload{
|
||||||
AnonymousIdentifier: "ident",
|
AnonymousIdentifier: "ident",
|
||||||
FleetVersion: "1.2.3",
|
FleetVersion: "1.2.3",
|
||||||
@ -68,15 +70,17 @@ func TestMaybeSendStatistics(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := trySendStatistics(context.Background(), ds, fleet.StatisticsFrequency, ts.URL, &fleet.LicenseInfo{Tier: "premium"})
|
err := trySendStatistics(context.Background(), ds, fleet.StatisticsFrequency, ts.URL, fleetConfig, &fleet.LicenseInfo{Tier: "premium"})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, recorded)
|
assert.True(t, recorded)
|
||||||
assert.Equal(t, `{"anonymousIdentifier":"ident","fleetVersion":"1.2.3","licenseTier":"premium","numHostsEnrolled":999,"numUsers":99,"numTeams":9,"numPolicies":0,"numLabels":3,"softwareInventoryEnabled":true,"vulnDetectionEnabled":true,"systemUsersEnabled":true,"hostsStatusWebHookEnabled":true,"numWeeklyActiveUsers":111,"hostsEnrolledByOperatingSystem":{"linux":[{"version":"1.2.3","numEnrolled":22}]},"storedErrors":[]}`, requestBody)
|
assert.Equal(t, `{"anonymousIdentifier":"ident","fleetVersion":"1.2.3","licenseTier":"premium","numHostsEnrolled":999,"numUsers":99,"numTeams":9,"numPolicies":0,"numLabels":3,"softwareInventoryEnabled":true,"vulnDetectionEnabled":true,"systemUsersEnabled":true,"hostsStatusWebHookEnabled":true,"numWeeklyActiveUsers":111,"hostsEnrolledByOperatingSystem":{"linux":[{"version":"1.2.3","numEnrolled":22}]},"storedErrors":[],"numHostsNotResponding":0}`, requestBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMaybeSendStatisticsSkipsSendingIfNotNeeded(t *testing.T) {
|
func TestMaybeSendStatisticsSkipsSendingIfNotNeeded(t *testing.T) {
|
||||||
ds := new(mock.Store)
|
ds := new(mock.Store)
|
||||||
|
|
||||||
|
fleetConfig := config.FleetConfig{Osquery: config.OsqueryConfig{DetailUpdateInterval: 1 * time.Hour}}
|
||||||
|
|
||||||
called := false
|
called := false
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -88,7 +92,7 @@ func TestMaybeSendStatisticsSkipsSendingIfNotNeeded(t *testing.T) {
|
|||||||
return &fleet.AppConfig{ServerSettings: fleet.ServerSettings{EnableAnalytics: true}}, nil
|
return &fleet.AppConfig{ServerSettings: fleet.ServerSettings{EnableAnalytics: true}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ds.ShouldSendStatisticsFunc = func(ctx context.Context, frequency time.Duration, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
ds.ShouldSendStatisticsFunc = func(ctx context.Context, frequency time.Duration, cfg config.FleetConfig, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
||||||
return fleet.StatisticsPayload{}, false, nil
|
return fleet.StatisticsPayload{}, false, nil
|
||||||
}
|
}
|
||||||
recorded := false
|
recorded := false
|
||||||
@ -97,7 +101,7 @@ func TestMaybeSendStatisticsSkipsSendingIfNotNeeded(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := trySendStatistics(context.Background(), ds, fleet.StatisticsFrequency, ts.URL, &fleet.LicenseInfo{Tier: "premium"})
|
err := trySendStatistics(context.Background(), ds, fleet.StatisticsFrequency, ts.URL, fleetConfig, &fleet.LicenseInfo{Tier: "premium"})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, recorded)
|
assert.False(t, recorded)
|
||||||
assert.False(t, called)
|
assert.False(t, called)
|
||||||
@ -106,6 +110,8 @@ func TestMaybeSendStatisticsSkipsSendingIfNotNeeded(t *testing.T) {
|
|||||||
func TestMaybeSendStatisticsSkipsIfNotConfigured(t *testing.T) {
|
func TestMaybeSendStatisticsSkipsIfNotConfigured(t *testing.T) {
|
||||||
ds := new(mock.Store)
|
ds := new(mock.Store)
|
||||||
|
|
||||||
|
fleetConfig := config.FleetConfig{Osquery: config.OsqueryConfig{DetailUpdateInterval: 1 * time.Hour}}
|
||||||
|
|
||||||
called := false
|
called := false
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -117,7 +123,7 @@ func TestMaybeSendStatisticsSkipsIfNotConfigured(t *testing.T) {
|
|||||||
return &fleet.AppConfig{}, nil
|
return &fleet.AppConfig{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := trySendStatistics(context.Background(), ds, fleet.StatisticsFrequency, ts.URL, &fleet.LicenseInfo{Tier: "premium"})
|
err := trySendStatistics(context.Background(), ds, fleet.StatisticsFrequency, ts.URL, fleetConfig, &fleet.LicenseInfo{Tier: "premium"})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, called)
|
assert.False(t, called)
|
||||||
}
|
}
|
||||||
|
@ -12,8 +12,10 @@ import (
|
|||||||
|
|
||||||
"github.com/cenkalti/backoff/v4"
|
"github.com/cenkalti/backoff/v4"
|
||||||
"github.com/doug-martin/goqu/v9"
|
"github.com/doug-martin/goqu/v9"
|
||||||
|
"github.com/fleetdm/fleet/v4/server/config"
|
||||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
||||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
@ -1909,3 +1911,40 @@ func (ds *Datastore) ListHostBatteries(ctx context.Context, hid uint) ([]*fleet.
|
|||||||
}
|
}
|
||||||
return batteries, nil
|
return batteries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// countHostNotResponding counts the hosts that haven't been submitting results for sent queries.
|
||||||
|
//
|
||||||
|
// Notes:
|
||||||
|
// - We use `2 * interval`, because of the artificial jitter added to the intervals in Fleet.
|
||||||
|
// - Default values for:
|
||||||
|
// - host.DistributedInterval is usually 10s.
|
||||||
|
// - svc.config.Osquery.DetailUpdateInterval is usually 1h.
|
||||||
|
// - Count only includes hosts seen during the last 7 days.
|
||||||
|
func countHostsNotRespondingDB(ctx context.Context, db sqlx.QueryerContext, logger log.Logger, config config.FleetConfig) (int, error,
|
||||||
|
) {
|
||||||
|
interval := config.Osquery.DetailUpdateInterval.Seconds()
|
||||||
|
|
||||||
|
// The primary `WHERE` clause is intended to capture where Fleet hasn't received a distributed write
|
||||||
|
// from the host during the interval since the host was last seen. Thus we assume the host
|
||||||
|
// is having some issue in executing distributed queries or sending the results.
|
||||||
|
// The subquery `WHERE` clause excludes from the count any hosts that were inactive during the
|
||||||
|
// current seven-day statistics reporting period.
|
||||||
|
sql := `
|
||||||
|
SELECT h.host_id FROM (
|
||||||
|
SELECT * FROM hosts JOIN host_seen_times hst ON hosts.id = hst.host_id
|
||||||
|
WHERE hst.seen_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
|
||||||
|
) h
|
||||||
|
WHERE
|
||||||
|
TIME_TO_SEC(TIMEDIFF(h.seen_time, h.detail_updated_at)) >= (GREATEST(h.distributed_interval, ?) * 2)
|
||||||
|
`
|
||||||
|
|
||||||
|
var ids []int
|
||||||
|
if err := sqlx.SelectContext(ctx, db, &ids, sql, interval); err != nil {
|
||||||
|
return len(ids), ctxerr.Wrap(ctx, err, "count hosts not responding")
|
||||||
|
}
|
||||||
|
if len(ids) > 0 {
|
||||||
|
// We log to help troubleshooting in case this happens.
|
||||||
|
level.Info(logger).Log("err", fmt.Sprintf("hosts detected that are not responding distributed queries %v", ids))
|
||||||
|
}
|
||||||
|
return len(ids), nil
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
|
|
||||||
"github.com/WatchBeam/clock"
|
"github.com/WatchBeam/clock"
|
||||||
"github.com/fleetdm/fleet/v4/server"
|
"github.com/fleetdm/fleet/v4/server"
|
||||||
|
"github.com/fleetdm/fleet/v4/server/config"
|
||||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||||
"github.com/fleetdm/fleet/v4/server/ptr"
|
"github.com/fleetdm/fleet/v4/server/ptr"
|
||||||
"github.com/fleetdm/fleet/v4/server/test"
|
"github.com/fleetdm/fleet/v4/server/test"
|
||||||
@ -119,6 +120,7 @@ func TestHosts(t *testing.T) {
|
|||||||
{"HostIDsByOSVersion", testHostIDsByOSVersion},
|
{"HostIDsByOSVersion", testHostIDsByOSVersion},
|
||||||
{"ShouldCleanTeamPolicies", testShouldCleanTeamPolicies},
|
{"ShouldCleanTeamPolicies", testShouldCleanTeamPolicies},
|
||||||
{"ReplaceHostBatteries", testHostsReplaceHostBatteries},
|
{"ReplaceHostBatteries", testHostsReplaceHostBatteries},
|
||||||
|
{"CountHostsNotResponding", testCountHostsNotResponding},
|
||||||
}
|
}
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
t.Run(c.name, func(t *testing.T) {
|
t.Run(c.name, func(t *testing.T) {
|
||||||
@ -4360,3 +4362,118 @@ func testHostsReplaceHostBatteries(t *testing.T, ds *Datastore) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.ElementsMatch(t, h2Bat, bat2)
|
require.ElementsMatch(t, h2Bat, bat2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCountHostsNotResponding(t *testing.T, ds *Datastore) {
|
||||||
|
ctx := context.Background()
|
||||||
|
config := config.FleetConfig{Osquery: config.OsqueryConfig{DetailUpdateInterval: 1 * time.Hour}}
|
||||||
|
|
||||||
|
// responsive
|
||||||
|
_, err := ds.NewHost(ctx, &fleet.Host{
|
||||||
|
OsqueryHostID: "1",
|
||||||
|
NodeKey: "1",
|
||||||
|
Platform: "linux",
|
||||||
|
Hostname: "host1",
|
||||||
|
DistributedInterval: 10,
|
||||||
|
DetailUpdatedAt: time.Now().Add(-1 * time.Hour),
|
||||||
|
LabelUpdatedAt: time.Now(),
|
||||||
|
PolicyUpdatedAt: time.Now(),
|
||||||
|
SeenTime: time.Now(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
count, err := countHostsNotRespondingDB(ctx, ds.writer, ds.logger, config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, count)
|
||||||
|
|
||||||
|
// not responsive
|
||||||
|
_, err = ds.NewHost(ctx, &fleet.Host{
|
||||||
|
ID: 2,
|
||||||
|
OsqueryHostID: "2",
|
||||||
|
NodeKey: "2",
|
||||||
|
Platform: "linux",
|
||||||
|
Hostname: "host2",
|
||||||
|
DistributedInterval: 10,
|
||||||
|
DetailUpdatedAt: time.Now().Add(-3 * time.Hour),
|
||||||
|
LabelUpdatedAt: time.Now().Add(-3 * time.Hour),
|
||||||
|
PolicyUpdatedAt: time.Now().Add(-3 * time.Hour),
|
||||||
|
SeenTime: time.Now(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
count, err = countHostsNotRespondingDB(ctx, ds.writer, ds.logger, config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, count) // count increased by 1
|
||||||
|
|
||||||
|
// responsive
|
||||||
|
_, err = ds.NewHost(ctx, &fleet.Host{
|
||||||
|
OsqueryHostID: "3",
|
||||||
|
NodeKey: "3",
|
||||||
|
Platform: "linux",
|
||||||
|
Hostname: "host3",
|
||||||
|
DistributedInterval: 10,
|
||||||
|
DetailUpdatedAt: time.Now().Add(-49 * time.Hour),
|
||||||
|
LabelUpdatedAt: time.Now().Add(-48 * time.Hour),
|
||||||
|
PolicyUpdatedAt: time.Now().Add(-48 * time.Hour),
|
||||||
|
SeenTime: time.Now().Add(-48 * time.Hour),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
count, err = countHostsNotRespondingDB(ctx, ds.writer, ds.logger, config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, count) // count unchanged
|
||||||
|
|
||||||
|
// not responsive
|
||||||
|
_, err = ds.NewHost(ctx, &fleet.Host{
|
||||||
|
OsqueryHostID: "4",
|
||||||
|
NodeKey: "4",
|
||||||
|
Platform: "linux",
|
||||||
|
Hostname: "host4",
|
||||||
|
DistributedInterval: 10,
|
||||||
|
DetailUpdatedAt: time.Now().Add(-51 * time.Hour),
|
||||||
|
LabelUpdatedAt: time.Now().Add(-48 * time.Hour),
|
||||||
|
PolicyUpdatedAt: time.Now().Add(-48 * time.Hour),
|
||||||
|
SeenTime: time.Now().Add(-48 * time.Hour),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
count, err = countHostsNotRespondingDB(ctx, ds.writer, ds.logger, config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, count) // count increased by 1
|
||||||
|
|
||||||
|
// was responsive but hasn't been seen in past 7 days so it is not counted
|
||||||
|
_, err = ds.NewHost(ctx, &fleet.Host{
|
||||||
|
OsqueryHostID: "5",
|
||||||
|
NodeKey: "5",
|
||||||
|
Platform: "linux",
|
||||||
|
Hostname: "host5",
|
||||||
|
DistributedInterval: 10,
|
||||||
|
DetailUpdatedAt: time.Now().Add(-8 * 24 * time.Hour).Add(-1 * time.Hour),
|
||||||
|
LabelUpdatedAt: time.Now().Add(-8 * 24 * time.Hour),
|
||||||
|
PolicyUpdatedAt: time.Now().Add(-8 * 24 * time.Hour),
|
||||||
|
SeenTime: time.Now().Add(-8 * 24 * time.Hour),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
count, err = countHostsNotRespondingDB(ctx, ds.writer, ds.logger, config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, count) // count unchanged
|
||||||
|
|
||||||
|
// distributed interval (1h1m) is greater than osquery detail interval (1h)
|
||||||
|
// so measurement period for non-responsiveness is 2h2m
|
||||||
|
_, err = ds.NewHost(ctx, &fleet.Host{
|
||||||
|
OsqueryHostID: "6",
|
||||||
|
NodeKey: "6",
|
||||||
|
Platform: "linux",
|
||||||
|
Hostname: "host6",
|
||||||
|
DistributedInterval: uint((1*time.Hour + 1*time.Minute).Seconds()), // 1h1m
|
||||||
|
DetailUpdatedAt: time.Now().Add(-2 * time.Hour).Add(-1 * time.Minute), // 2h1m
|
||||||
|
LabelUpdatedAt: time.Now().Add(-2 * time.Hour).Add(-1 * time.Minute),
|
||||||
|
PolicyUpdatedAt: time.Now().Add(-2 * time.Hour).Add(-1 * time.Minute),
|
||||||
|
SeenTime: time.Now(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
count, err = countHostsNotRespondingDB(ctx, ds.writer, ds.logger, config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, count) // count unchanged
|
||||||
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fleetdm/fleet/v4/server"
|
"github.com/fleetdm/fleet/v4/server"
|
||||||
|
"github.com/fleetdm/fleet/v4/server/config"
|
||||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
||||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
@ -17,7 +18,7 @@ type statistics struct {
|
|||||||
Identifier string `db:"anonymous_identifier"`
|
Identifier string `db:"anonymous_identifier"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *Datastore) ShouldSendStatistics(ctx context.Context, frequency time.Duration, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
func (ds *Datastore) ShouldSendStatistics(ctx context.Context, frequency time.Duration, config config.FleetConfig, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
||||||
computeStats := func(stats *fleet.StatisticsPayload, since time.Time) error {
|
computeStats := func(stats *fleet.StatisticsPayload, since time.Time) error {
|
||||||
enrolledHostsByOS, amountEnrolledHosts, err := amountEnrolledHostsByOSDB(ctx, ds.writer)
|
enrolledHostsByOS, amountEnrolledHosts, err := amountEnrolledHostsByOSDB(ctx, ds.writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -51,6 +52,10 @@ func (ds *Datastore) ShouldSendStatistics(ctx context.Context, frequency time.Du
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return ctxerr.Wrap(ctx, err, "statistics error store")
|
return ctxerr.Wrap(ctx, err, "statistics error store")
|
||||||
}
|
}
|
||||||
|
amountHostsNotResponding, err := countHostsNotRespondingDB(ctx, ds.writer, ds.logger, config)
|
||||||
|
if err != nil {
|
||||||
|
return ctxerr.Wrap(ctx, err, "amount hosts not responding")
|
||||||
|
}
|
||||||
|
|
||||||
stats.NumHostsEnrolled = amountEnrolledHosts
|
stats.NumHostsEnrolled = amountEnrolledHosts
|
||||||
stats.NumUsers = amountUsers
|
stats.NumUsers = amountUsers
|
||||||
@ -64,6 +69,7 @@ func (ds *Datastore) ShouldSendStatistics(ctx context.Context, frequency time.Du
|
|||||||
stats.NumWeeklyActiveUsers = amountWeeklyUsers
|
stats.NumWeeklyActiveUsers = amountWeeklyUsers
|
||||||
stats.HostsEnrolledByOperatingSystem = enrolledHostsByOS
|
stats.HostsEnrolledByOperatingSystem = enrolledHostsByOS
|
||||||
stats.StoredErrors = storedErrs
|
stats.StoredErrors = storedErrs
|
||||||
|
stats.NumHostsNotResponding = amountHostsNotResponding
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fleetdm/fleet/v4/server/config"
|
||||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
||||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||||
"github.com/fleetdm/fleet/v4/server/ptr"
|
"github.com/fleetdm/fleet/v4/server/ptr"
|
||||||
@ -31,7 +32,7 @@ func TestStatistics(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
||||||
var eh = ctxerr.MockHandler{}
|
eh := ctxerr.MockHandler{}
|
||||||
// Mock the error handler to always return an error
|
// Mock the error handler to always return an error
|
||||||
eh.RetrieveImpl = func(flush bool) ([]*ctxerr.StoredError, error) {
|
eh.RetrieveImpl = func(flush bool) ([]*ctxerr.StoredError, error) {
|
||||||
require.False(t, flush)
|
require.False(t, flush)
|
||||||
@ -39,8 +40,10 @@ func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
|||||||
{Count: 10, Chain: json.RawMessage(`[{"stack": ["a","b","c","d"]}]`)},
|
{Count: 10, Chain: json.RawMessage(`[{"stack": ["a","b","c","d"]}]`)},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
var ctxb = context.Background()
|
ctxb := context.Background()
|
||||||
var ctx = ctxerr.NewContext(ctxb, eh)
|
ctx := ctxerr.NewContext(ctxb, eh)
|
||||||
|
|
||||||
|
fleetConfig := config.FleetConfig{Osquery: config.OsqueryConfig{DetailUpdateInterval: 1 * time.Hour}}
|
||||||
|
|
||||||
// Create new host for test
|
// Create new host for test
|
||||||
_, err := ds.NewHost(ctx, &fleet.Host{
|
_, err := ds.NewHost(ctx, &fleet.Host{
|
||||||
@ -125,7 +128,7 @@ func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
|||||||
license := &fleet.LicenseInfo{Tier: "premium"}
|
license := &fleet.LicenseInfo{Tier: "premium"}
|
||||||
|
|
||||||
// First time running, we send statistics
|
// First time running, we send statistics
|
||||||
stats, shouldSend, err := ds.ShouldSendStatistics(ctx, fleet.StatisticsFrequency, license)
|
stats, shouldSend, err := ds.ShouldSendStatistics(ctx, fleet.StatisticsFrequency, fleetConfig, license)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, shouldSend)
|
assert.True(t, shouldSend)
|
||||||
assert.NotEmpty(t, stats.AnonymousIdentifier)
|
assert.NotEmpty(t, stats.AnonymousIdentifier)
|
||||||
@ -149,7 +152,7 @@ func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// If we try right away, it shouldn't ask to send
|
// If we try right away, it shouldn't ask to send
|
||||||
stats, shouldSend, err = ds.ShouldSendStatistics(ctx, fleet.StatisticsFrequency, license)
|
stats, shouldSend, err = ds.ShouldSendStatistics(ctx, fleet.StatisticsFrequency, fleetConfig, license)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, shouldSend)
|
assert.False(t, shouldSend)
|
||||||
|
|
||||||
@ -221,7 +224,7 @@ func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Lower the frequency to trigger an "outdated" sent
|
// Lower the frequency to trigger an "outdated" sent
|
||||||
stats, shouldSend, err = ds.ShouldSendStatistics(ctx, time.Millisecond, license)
|
stats, shouldSend, err = ds.ShouldSendStatistics(ctx, time.Millisecond, fleetConfig, license)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, shouldSend)
|
assert.True(t, shouldSend)
|
||||||
assert.Equal(t, firstIdentifier, stats.AnonymousIdentifier)
|
assert.Equal(t, firstIdentifier, stats.AnonymousIdentifier)
|
||||||
@ -252,7 +255,7 @@ func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
|||||||
// wait a bit and resend statistics
|
// wait a bit and resend statistics
|
||||||
time.Sleep(1100 * time.Millisecond) // ensure the DB timestamp is not in the same second
|
time.Sleep(1100 * time.Millisecond) // ensure the DB timestamp is not in the same second
|
||||||
|
|
||||||
stats, shouldSend, err = ds.ShouldSendStatistics(ctx, time.Millisecond, license)
|
stats, shouldSend, err = ds.ShouldSendStatistics(ctx, time.Millisecond, fleetConfig, license)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, shouldSend)
|
assert.True(t, shouldSend)
|
||||||
assert.Equal(t, firstIdentifier, stats.AnonymousIdentifier)
|
assert.Equal(t, firstIdentifier, stats.AnonymousIdentifier)
|
||||||
@ -260,4 +263,26 @@ func testStatisticsShouldSend(t *testing.T, ds *Datastore) {
|
|||||||
assert.Equal(t, stats.NumUsers, 2)
|
assert.Equal(t, stats.NumUsers, 2)
|
||||||
assert.Equal(t, stats.NumWeeklyActiveUsers, 1)
|
assert.Equal(t, stats.NumWeeklyActiveUsers, 1)
|
||||||
assert.Equal(t, string(stats.StoredErrors), `[{"count":10,"loc":["a","b","c"]}]`)
|
assert.Equal(t, string(stats.StoredErrors), `[{"count":10,"loc":["a","b","c"]}]`)
|
||||||
|
|
||||||
|
// Add host to test hosts not responding stats
|
||||||
|
_, err = ds.NewHost(ctx, &fleet.Host{
|
||||||
|
DetailUpdatedAt: time.Now().Add(-3 * time.Hour),
|
||||||
|
LabelUpdatedAt: time.Now(),
|
||||||
|
PolicyUpdatedAt: time.Now(),
|
||||||
|
SeenTime: time.Now(),
|
||||||
|
NodeKey: "6",
|
||||||
|
UUID: "6",
|
||||||
|
Hostname: "non-responsive.local",
|
||||||
|
PrimaryIP: "192.168.1.6",
|
||||||
|
PrimaryMac: "30-65-EC-6F-C4-66",
|
||||||
|
OsqueryHostID: "NR",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
stats, shouldSend, err = ds.ShouldSendStatistics(ctx, time.Millisecond, fleetConfig, license)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, shouldSend)
|
||||||
|
assert.Equal(t, firstIdentifier, stats.AnonymousIdentifier)
|
||||||
|
assert.Equal(t, 6, stats.NumHostsEnrolled)
|
||||||
|
assert.Equal(t, 1, stats.NumHostsNotResponding)
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fleetdm/fleet/v4/server/config"
|
||||||
"github.com/fleetdm/fleet/v4/server/health"
|
"github.com/fleetdm/fleet/v4/server/health"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -399,7 +400,7 @@ type Datastore interface {
|
|||||||
///////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
// StatisticsStore
|
// StatisticsStore
|
||||||
|
|
||||||
ShouldSendStatistics(ctx context.Context, frequency time.Duration, license *LicenseInfo) (StatisticsPayload, bool, error)
|
ShouldSendStatistics(ctx context.Context, frequency time.Duration, config config.FleetConfig, license *LicenseInfo) (StatisticsPayload, bool, error)
|
||||||
RecordStatisticsSent(ctx context.Context) error
|
RecordStatisticsSent(ctx context.Context) error
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -21,6 +21,8 @@ type StatisticsPayload struct {
|
|||||||
NumWeeklyActiveUsers int `json:"numWeeklyActiveUsers"`
|
NumWeeklyActiveUsers int `json:"numWeeklyActiveUsers"`
|
||||||
HostsEnrolledByOperatingSystem map[string][]HostsCountByOSVersion `json:"hostsEnrolledByOperatingSystem"`
|
HostsEnrolledByOperatingSystem map[string][]HostsCountByOSVersion `json:"hostsEnrolledByOperatingSystem"`
|
||||||
StoredErrors json.RawMessage `json:"storedErrors"`
|
StoredErrors json.RawMessage `json:"storedErrors"`
|
||||||
|
// NumHostsNotResponding is a count of hosts that connect to Fleet successfully but fail to submit results for distributed queries.
|
||||||
|
NumHostsNotResponding int `json:"numHostsNotResponding"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type HostsCountByOSVersion struct {
|
type HostsCountByOSVersion struct {
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fleetdm/fleet/v4/server/config"
|
||||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -314,7 +315,7 @@ type NewActivityFunc func(ctx context.Context, user *fleet.User, activityType st
|
|||||||
|
|
||||||
type ListActivitiesFunc func(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error)
|
type ListActivitiesFunc func(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error)
|
||||||
|
|
||||||
type ShouldSendStatisticsFunc func(ctx context.Context, frequency time.Duration, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error)
|
type ShouldSendStatisticsFunc func(ctx context.Context, frequency time.Duration, config config.FleetConfig, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error)
|
||||||
|
|
||||||
type RecordStatisticsSentFunc func(ctx context.Context) error
|
type RecordStatisticsSentFunc func(ctx context.Context) error
|
||||||
|
|
||||||
@ -1794,9 +1795,9 @@ func (s *DataStore) ListActivities(ctx context.Context, opt fleet.ListOptions) (
|
|||||||
return s.ListActivitiesFunc(ctx, opt)
|
return s.ListActivitiesFunc(ctx, opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DataStore) ShouldSendStatistics(ctx context.Context, frequency time.Duration, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
func (s *DataStore) ShouldSendStatistics(ctx context.Context, frequency time.Duration, config config.FleetConfig, license *fleet.LicenseInfo) (fleet.StatisticsPayload, bool, error) {
|
||||||
s.ShouldSendStatisticsFuncInvoked = true
|
s.ShouldSendStatisticsFuncInvoked = true
|
||||||
return s.ShouldSendStatisticsFunc(ctx, frequency, license)
|
return s.ShouldSendStatisticsFunc(ctx, frequency, config, license)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DataStore) RecordStatisticsSent(ctx context.Context) error {
|
func (s *DataStore) RecordStatisticsSent(ctx context.Context) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user