fleet/server/service/service_osquery.go
Zach Wasserman 417ef2c9b6
Refactor teams service methods (#910)
- Move team-related service methods to `ee/server/service`.
- Instantiate different service on startup based on license key.
- Refactor service errors into separate package.
- Add support for running E2E tests in both Core and Basic tiers.
2021-05-31 17:07:51 -07:00

1138 lines
32 KiB
Go

package service
import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
hostctx "github.com/fleetdm/fleet/server/contexts/host"
"github.com/fleetdm/fleet/server/kolide"
"github.com/fleetdm/fleet/server/pubsub"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/spf13/cast"
)
type osqueryError struct {
message string
nodeInvalid bool
}
func (e osqueryError) Error() string {
return e.message
}
func (e osqueryError) NodeInvalid() bool {
return e.nodeInvalid
}
// Sometimes osquery gives us empty string where we expect an integer.
// We change the to "0" so it can be handled by the appropriate string to
// integer conversion function, as these will err on ""
func emptyToZero(val string) string {
if val == "" {
return "0"
}
return val
}
func (svc Service) AuthenticateHost(ctx context.Context, nodeKey string) (*kolide.Host, error) {
if nodeKey == "" {
return nil, osqueryError{
message: "authentication error: missing node key",
nodeInvalid: true,
}
}
host, err := svc.ds.AuthenticateHost(nodeKey)
if err != nil {
switch err.(type) {
case kolide.NotFoundError:
return nil, osqueryError{
message: "authentication error: invalid node key: " + nodeKey,
nodeInvalid: true,
}
default:
return nil, osqueryError{
message: "authentication error: " + err.Error(),
}
}
}
// Update the "seen" time used to calculate online status. These updates are
// batched for MySQL performance reasons. Because this is done
// asynchronously, it is possible for the server to shut down before
// updating the seen time for these hosts. This seems to be an acceptable
// tradeoff as an online host will continue to check in and quickly be
// marked online again.
svc.seenHostSet.addHostID(host.ID)
host.SeenTime = svc.clock.Now()
return host, nil
}
func (svc Service) EnrollAgent(ctx context.Context, enrollSecret, hostIdentifier string, hostDetails map[string](map[string]string)) (string, error) {
secret, err := svc.ds.VerifyEnrollSecret(enrollSecret)
if err != nil {
return "", osqueryError{
message: "enroll failed: " + err.Error(),
nodeInvalid: true,
}
}
nodeKey, err := kolide.RandomText(svc.config.Osquery.NodeKeySize)
if err != nil {
return "", osqueryError{
message: "generate node key failed: " + err.Error(),
nodeInvalid: true,
}
}
hostIdentifier = getHostIdentifier(svc.logger, svc.config.Osquery.HostIdentifier, hostIdentifier, hostDetails)
host, err := svc.ds.EnrollHost(hostIdentifier, nodeKey, secret.TeamID, svc.config.Osquery.EnrollCooldown)
if err != nil {
return "", osqueryError{message: "save enroll failed: " + err.Error(), nodeInvalid: true}
}
// Save enrollment details if provided
save := false
if r, ok := hostDetails["os_version"]; ok {
detailQueries["os_version"].IngestFunc(svc.logger, host, []map[string]string{r})
save = true
}
if r, ok := hostDetails["osquery_info"]; ok {
detailQueries["osquery_info"].IngestFunc(svc.logger, host, []map[string]string{r})
save = true
}
if r, ok := hostDetails["system_info"]; ok {
detailQueries["system_info"].IngestFunc(svc.logger, host, []map[string]string{r})
save = true
}
if save {
if err := svc.ds.SaveHost(host); err != nil {
return "", osqueryError{message: "saving host details: " + err.Error(), nodeInvalid: true}
}
}
return host.NodeKey, nil
}
func getHostIdentifier(logger log.Logger, identifierOption, providedIdentifier string, details map[string](map[string]string)) string {
switch identifierOption {
case "provided":
// Use the host identifier already provided in the request.
return providedIdentifier
case "instance":
r, ok := details["osquery_info"]
if !ok {
level.Info(logger).Log(
"msg", "could not get host identifier",
"reason", "missing osquery_info",
"identifier", "instance",
)
} else if r["instance_id"] == "" {
level.Info(logger).Log(
"msg", "could not get host identifier",
"reason", "missing instance_id in osquery_info",
"identifier", "instance",
)
} else {
return r["instance_id"]
}
case "uuid":
r, ok := details["osquery_info"]
if !ok {
level.Info(logger).Log(
"msg", "could not get host identifier",
"reason", "missing osquery_info",
"identifier", "uuid",
)
} else if r["uuid"] == "" {
level.Info(logger).Log(
"msg", "could not get host identifier",
"reason", "missing instance_id in osquery_info",
"identifier", "uuid",
)
} else {
return r["uuid"]
}
case "hostname":
r, ok := details["system_info"]
if !ok {
level.Info(logger).Log(
"msg", "could not get host identifier",
"reason", "missing system_info",
"identifier", "hostname",
)
} else if r["hostname"] == "" {
level.Info(logger).Log(
"msg", "could not get host identifier",
"reason", "missing instance_id in system_info",
"identifier", "hostname",
)
} else {
return r["hostname"]
}
default:
panic("Unknown option for host_identifier: " + identifierOption)
}
return providedIdentifier
}
func (svc *Service) GetClientConfig(ctx context.Context) (map[string]interface{}, error) {
host, ok := hostctx.FromContext(ctx)
if !ok {
return nil, osqueryError{message: "internal error: missing host from request context"}
}
baseConfig, err := svc.AgentOptionsForHost(ctx, &host)
if err != nil {
return nil, osqueryError{message: "internal error: fetching base config: " + err.Error()}
}
var config map[string]interface{}
err = json.Unmarshal(baseConfig, &config)
if err != nil {
return nil, osqueryError{message: "internal error: parsing base configuration: " + err.Error()}
}
packs, err := svc.ds.ListPacksForHost(host.ID)
if err != nil {
return nil, osqueryError{message: "database error: " + err.Error()}
}
packConfig := kolide.Packs{}
for _, pack := range packs {
// first, we must figure out what queries are in this pack
queries, err := svc.ds.ListScheduledQueriesInPack(pack.ID, kolide.ListOptions{})
if err != nil {
return nil, osqueryError{message: "database error: " + err.Error()}
}
// the serializable osquery config struct expects content in a
// particular format, so we do the conversion here
configQueries := kolide.Queries{}
for _, query := range queries {
queryContent := kolide.QueryContent{
Query: query.Query,
Interval: query.Interval,
Platform: query.Platform,
Version: query.Version,
Removed: query.Removed,
Shard: query.Shard,
Denylist: query.Denylist,
}
if query.Removed != nil {
queryContent.Removed = query.Removed
}
if query.Snapshot != nil && *query.Snapshot {
queryContent.Snapshot = query.Snapshot
}
configQueries[query.Name] = queryContent
}
// finally, we add the pack to the client config struct with all of
// the pack's queries
packConfig[pack.Name] = kolide.PackContent{
Platform: pack.Platform,
Queries: configQueries,
}
}
if len(packConfig) > 0 {
packJSON, err := json.Marshal(packConfig)
if err != nil {
return nil, osqueryError{message: "internal error: marshal pack JSON: " + err.Error()}
}
config["packs"] = json.RawMessage(packJSON)
}
// Save interval values if they have been updated.
saveHost := false
if options, ok := config["options"].(map[string]interface{}); ok {
distributedIntervalVal, ok := options["distributed_interval"]
distributedInterval, err := cast.ToUintE(distributedIntervalVal)
if ok && err == nil && host.DistributedInterval != distributedInterval {
host.DistributedInterval = distributedInterval
saveHost = true
}
loggerTLSPeriodVal, ok := options["logger_tls_period"]
loggerTLSPeriod, err := cast.ToUintE(loggerTLSPeriodVal)
if ok && err == nil && host.LoggerTLSPeriod != loggerTLSPeriod {
host.LoggerTLSPeriod = loggerTLSPeriod
saveHost = true
}
// Note config_tls_refresh can only be set in the osquery flags (and has
// also been deprecated in osquery for quite some time) so is ignored
// here.
configRefreshVal, ok := options["config_refresh"]
configRefresh, err := cast.ToUintE(configRefreshVal)
if ok && err == nil && host.ConfigTLSRefresh != configRefresh {
host.ConfigTLSRefresh = configRefresh
saveHost = true
}
}
if saveHost {
err := svc.ds.SaveHost(&host)
if err != nil {
return nil, err
}
}
return config, nil
}
func (svc *Service) SubmitStatusLogs(ctx context.Context, logs []json.RawMessage) error {
if err := svc.osqueryLogWriter.Status.Write(ctx, logs); err != nil {
return osqueryError{message: "error writing status logs: " + err.Error()}
}
return nil
}
func (svc *Service) SubmitResultLogs(ctx context.Context, logs []json.RawMessage) error {
if err := svc.osqueryLogWriter.Result.Write(ctx, logs); err != nil {
return osqueryError{message: "error writing result logs: " + err.Error()}
}
return nil
}
// hostLabelQueryPrefix is appended before the query name when a query is
// provided as a label query. This allows the results to be retrieved when
// osqueryd writes the distributed query results.
const hostLabelQueryPrefix = "fleet_label_query_"
// hostDetailQueryPrefix is appended before the query name when a query is
// provided as a detail query.
const hostDetailQueryPrefix = "fleet_detail_query_"
// hostAdditionalQueryPrefix is appended before the query name when a query is
// provided as an additional query (additional info for hosts to retrieve).
const hostAdditionalQueryPrefix = "fleet_additional_query_"
// hostDistributedQueryPrefix is appended before the query name when a query is
// run from a distributed query campaign
const hostDistributedQueryPrefix = "fleet_distributed_query_"
type detailQuery struct {
Query string
// Platforms is a list of platforms to run the query on. If this value is
// empty, run on all platforms.
Platforms []string
IngestFunc func(logger log.Logger, host *kolide.Host, rows []map[string]string) error
}
// runForPlatform determines whether this detail query should run on the given platform
func (q *detailQuery) runForPlatform(platform string) bool {
if len(q.Platforms) == 0 {
return true
}
for _, p := range q.Platforms {
if p == platform {
return true
}
}
return false
}
// detailQueries defines the detail queries that should be run on the host, as
// well as how the results of those queries should be ingested into the
// kolide.Host data model. This map should not be modified at runtime.
var detailQueries = map[string]detailQuery{
"network_interface": {
Query: `select address, mac
from interface_details id join interface_addresses ia
on ia.interface = id.interface where length(mac) > 0
order by (ibytes + obytes) desc`,
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) (err error) {
if len(rows) == 0 {
logger.Log("component", "service", "method", "IngestFunc", "err",
"detail_query_network_interface expected 1 or more results")
return nil
}
// Rows are ordered by traffic, so we will get the most active
// interface by iterating in order
var firstIPv4, firstIPv6 map[string]string
for _, row := range rows {
ip := net.ParseIP(row["address"])
if ip == nil {
continue
}
// Skip link-local and loopback interfaces
if ip.IsLinkLocalUnicast() || ip.IsLoopback() {
continue
}
if strings.Contains(row["address"], ":") {
//IPv6
if firstIPv6 == nil {
firstIPv6 = row
}
} else {
// IPv4
if firstIPv4 == nil {
firstIPv4 = row
}
}
}
var selected map[string]string
switch {
// Prefer IPv4
case firstIPv4 != nil:
selected = firstIPv4
// Otherwise IPv6
case firstIPv6 != nil:
selected = firstIPv6
// If only link-local and loopback found, still use the first
// interface so that we don't get an empty value.
default:
selected = rows[0]
}
host.PrimaryIP = selected["address"]
host.PrimaryMac = selected["mac"]
return nil
},
},
"os_version": {
Query: "select * from os_version limit 1",
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
if len(rows) != 1 {
logger.Log("component", "service", "method", "IngestFunc", "err",
fmt.Sprintf("detail_query_os_version expected single result got %d", len(rows)))
return nil
}
host.OSVersion = fmt.Sprintf(
"%s %s.%s.%s",
rows[0]["name"],
rows[0]["major"],
rows[0]["minor"],
rows[0]["patch"],
)
host.OSVersion = strings.Trim(host.OSVersion, ".")
if build, ok := rows[0]["build"]; ok {
host.Build = build
}
host.Platform = rows[0]["platform"]
host.PlatformLike = rows[0]["platform_like"]
host.CodeName = rows[0]["code_name"]
// On centos6 there is an osquery bug that leaves
// platform empty. Here we workaround.
if host.Platform == "" &&
strings.Contains(strings.ToLower(rows[0]["name"]), "centos") {
host.Platform = "centos"
}
return nil
},
},
"osquery_flags": {
// Collect the interval info (used for online status
// calculation) from the osquery flags. We typically control
// distributed_interval (but it's not required), and typically
// do not control config_tls_refresh.
Query: `select name, value from osquery_flags where name in ("distributed_interval", "config_tls_refresh", "config_refresh", "logger_tls_period")`,
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
var configTLSRefresh, configRefresh uint
var configRefreshSeen, configTLSRefreshSeen bool
for _, row := range rows {
switch row["name"] {
case "distributed_interval":
interval, err := strconv.Atoi(emptyToZero(row["value"]))
if err != nil {
return errors.Wrap(err, "parsing distributed_interval")
}
host.DistributedInterval = uint(interval)
case "config_tls_refresh":
// Prior to osquery 2.4.6, the flag was
// called `config_tls_refresh`.
interval, err := strconv.Atoi(emptyToZero(row["value"]))
if err != nil {
return errors.Wrap(err, "parsing config_tls_refresh")
}
configTLSRefresh = uint(interval)
configTLSRefreshSeen = true
case "config_refresh":
// After 2.4.6 `config_tls_refresh` was
// aliased to `config_refresh`.
interval, err := strconv.Atoi(emptyToZero(row["value"]))
if err != nil {
return errors.Wrap(err, "parsing config_refresh")
}
configRefresh = uint(interval)
configRefreshSeen = true
case "logger_tls_period":
interval, err := strconv.Atoi(emptyToZero(row["value"]))
if err != nil {
return errors.Wrap(err, "parsing logger_tls_period")
}
host.LoggerTLSPeriod = uint(interval)
}
}
// Since the `config_refresh` flag existed prior to
// 2.4.6 and had a different meaning, we prefer
// `config_tls_refresh` if it was set, and use
// `config_refresh` as a fallback.
if configTLSRefreshSeen {
host.ConfigTLSRefresh = configTLSRefresh
} else if configRefreshSeen {
host.ConfigTLSRefresh = configRefresh
}
return nil
},
},
"osquery_info": {
Query: "select * from osquery_info limit 1",
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
if len(rows) != 1 {
logger.Log("component", "service", "method", "IngestFunc", "err",
fmt.Sprintf("detail_query_osquery_info expected single result got %d", len(rows)))
return nil
}
host.OsqueryVersion = rows[0]["version"]
return nil
},
},
"system_info": {
Query: "select * from system_info limit 1",
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
if len(rows) != 1 {
logger.Log("component", "service", "method", "IngestFunc", "err",
fmt.Sprintf("detail_query_system_info expected single result got %d", len(rows)))
return nil
}
var err error
host.PhysicalMemory, err = strconv.ParseInt(emptyToZero(rows[0]["physical_memory"]), 10, 64)
if err != nil {
return err
}
host.HostName = rows[0]["hostname"]
host.UUID = rows[0]["uuid"]
host.CPUType = rows[0]["cpu_type"]
host.CPUSubtype = rows[0]["cpu_subtype"]
host.CPUBrand = rows[0]["cpu_brand"]
host.CPUPhysicalCores, err = strconv.Atoi(emptyToZero(rows[0]["cpu_physical_cores"]))
if err != nil {
return err
}
host.CPULogicalCores, err = strconv.Atoi(emptyToZero(rows[0]["cpu_logical_cores"]))
if err != nil {
return err
}
host.HardwareVendor = rows[0]["hardware_vendor"]
host.HardwareModel = rows[0]["hardware_model"]
host.HardwareVersion = rows[0]["hardware_version"]
host.HardwareSerial = rows[0]["hardware_serial"]
host.ComputerName = rows[0]["computer_name"]
return nil
},
},
"uptime": {
Query: "select * from uptime limit 1",
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
if len(rows) != 1 {
logger.Log("component", "service", "method", "IngestFunc", "err",
fmt.Sprintf("detail_query_uptime expected single result got %d", len(rows)))
return nil
}
uptimeSeconds, err := strconv.Atoi(emptyToZero(rows[0]["total_seconds"]))
if err != nil {
return err
}
host.Uptime = time.Duration(uptimeSeconds) * time.Second
return nil
},
},
"software_macos": {
Query: `
SELECT
name AS name,
bundle_short_version AS version,
'Application (macOS)' AS type,
'apps' AS source
FROM apps
UNION
SELECT
name AS name,
version AS version,
'Package (Python)' AS type,
'python_packages' AS source
FROM python_packages
UNION
SELECT
name AS name,
version AS version,
'Browser plugin (Chrome)' AS type,
'chrome_extensions' AS source
FROM chrome_extensions
UNION
SELECT
name AS name,
version AS version,
'Browser plugin (Firefox)' AS type,
'firefox_addons' AS source
FROM firefox_addons
UNION
SELECT
name As name,
version AS version,
'Browser plugin (Safari)' AS type,
'safari_extensions' AS source
FROM safari_extensions
UNION
SELECT
name AS name,
version AS version,
'Package (Homebrew)' AS type,
'homebrew_packages' AS source
FROM homebrew_packages;
`,
Platforms: []string{"darwin"},
IngestFunc: ingestSoftware,
},
"software_linux": {
Query: `
SELECT
name AS name,
version AS version,
'Package (APT)' AS type,
'apt_sources' AS source
FROM apt_sources
UNION
SELECT
name AS name,
version AS version,
'Package (deb)' AS type,
'deb_packages' AS source
FROM deb_packages
UNION
SELECT
package AS name,
version AS version,
'Package (Portage)' AS type,
'portage_packages' AS source
FROM portage_packages
UNION
SELECT
name AS name,
version AS version,
'Package (RPM)' AS type,
'rpm_packages' AS source
FROM rpm_packages
UNION
SELECT
name AS name,
'' AS version,
'Package (YUM)' AS type,
'yum_sources' AS source
FROM yum_sources
UNION
SELECT
name AS name,
version AS version,
'Package (NPM)' AS type,
'npm_packages' AS source
FROM npm_packages
UNION
SELECT
name AS name,
version AS version,
'Package (Atom)' AS type,
'atom_packages' AS source
FROM atom_packages
UNION
SELECT
name AS name,
version AS version,
'Package (Python)' AS type,
'python_packages' AS source
FROM python_packages;
`,
Platforms: []string{"linux", "rhel", "ubuntu", "centos"},
IngestFunc: ingestSoftware,
},
"software_windows": {
Query: `
SELECT
name AS name,
version AS version,
'Program (Windows)' AS type,
'programs' AS source
FROM programs
UNION
SELECT
name AS name,
version AS version,
'Package (Python)' AS type,
'python_packages' AS source
FROM python_packages
UNION
SELECT
name AS name,
version AS version,
'Browser plugin (IE)' AS type,
'ie_extensions' AS source
FROM ie_extensions
UNION
SELECT
name AS name,
version AS version,
'Browser plugin (Chrome)' AS type,
'chrome_extensions' AS source
FROM chrome_extensions
UNION
SELECT
name AS name,
version AS version,
'Browser plugin (Firefox)' AS type,
'firefox_addons' AS source
FROM firefox_addons
UNION
SELECT
name AS name,
version AS version,
'Package (Chocolatey)' AS type,
'chocolatey_packages' AS source
FROM chocolatey_packages
UNION
SELECT
name AS name,
version AS version,
'Package (Atom)' AS type,
'atom_packages' AS source
FROM atom_packages
UNION
SELECT
name AS name,
version AS version,
'Package (Python)' AS type,
'python_packages' AS source
FROM python_packages;
`,
Platforms: []string{"windows"},
IngestFunc: ingestSoftware,
},
"scheduled_query_stats": {
Query: `
SELECT *,
(SELECT value from osquery_flags where name = 'pack_delimiter') AS delimiter
FROM osquery_schedule
`,
IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
packs := map[string][]kolide.ScheduledQueryStats{}
for _, row := range rows {
providedName := row["name"]
if providedName == "" {
level.Debug(logger).Log(
"msg", "host reported scheduled query with empty name",
"host", host.HostName,
)
continue
}
delimiter := row["delimiter"]
if delimiter == "" {
level.Debug(logger).Log(
"msg", "host reported scheduled query with empty delimiter",
"host", host.HostName,
)
continue
}
// Split with a limit of 2 in case query name includes the
// delimiter. Not much we can do if pack name includes the
// delimiter.
trimmedName := strings.TrimPrefix(providedName, "pack"+delimiter)
parts := strings.SplitN(trimmedName, delimiter, 2)
if len(parts) != 2 {
level.Debug(logger).Log(
"msg", "could not split pack and query names",
"host", host.HostName,
"name", providedName,
"delimiter", delimiter,
)
continue
}
packName, scheduledName := parts[0], parts[1]
stats := kolide.ScheduledQueryStats{
ScheduledQueryName: scheduledName,
PackName: packName,
AverageMemory: cast.ToInt(row["average_memory"]),
Denylisted: cast.ToBool(row["denylisted"]),
Executions: cast.ToInt(row["executions"]),
Interval: cast.ToInt(row["interval"]),
// Cast to int first to allow cast.ToTime to interpret the unix timestamp.
LastExecuted: time.Unix(cast.ToInt64(row["last_executed"]), 0).UTC(),
OutputSize: cast.ToInt(row["output_size"]),
SystemTime: cast.ToInt(row["system_time"]),
UserTime: cast.ToInt(row["user_time"]),
WallTime: cast.ToInt(row["wall_time"]),
}
packs[packName] = append(packs[packName], stats)
}
host.PackStats = []kolide.PackStats{}
for packName, stats := range packs {
host.PackStats = append(
host.PackStats,
kolide.PackStats{
PackName: packName,
QueryStats: stats,
},
)
}
return nil
},
},
}
func ingestSoftware(logger log.Logger, host *kolide.Host, rows []map[string]string) error {
software := kolide.HostSoftware{Modified: true}
for _, row := range rows {
name := row["name"]
version := row["version"]
source := row["source"]
if name == "" {
level.Debug(logger).Log(
"msg", "host reported software with empty name",
"host", host.HostName,
"version", version,
"source", source,
)
continue
}
if source == "" {
level.Debug(logger).Log(
"msg", "host reported software with empty name",
"host", host.HostName,
"version", version,
"name", name,
)
continue
}
s := kolide.Software{Name: name, Version: version, Source: source}
software.Software = append(software.Software, s)
}
host.HostSoftware = software
return nil
}
// hostDetailQueries returns the map of queries that should be executed by
// osqueryd to fill in the host details
func (svc *Service) hostDetailQueries(host kolide.Host) (map[string]string, error) {
queries := make(map[string]string)
if host.DetailUpdateTime.After(svc.clock.Now().Add(-svc.config.Osquery.DetailUpdateInterval)) && !host.RefetchRequested {
// No need to update already fresh details
return queries, nil
}
for name, query := range detailQueries {
if query.runForPlatform(host.Platform) {
if strings.HasPrefix(name, "software_") {
// Feature flag this because of as-yet-untested performance
// considerations.
if os.Getenv("FLEET_BETA_SOFTWARE_INVENTORY") == "" {
continue
}
}
queries[hostDetailQueryPrefix+name] = query.Query
}
}
// Get additional queries
config, err := svc.ds.AppConfig()
if err != nil {
return nil, osqueryError{message: "get additional queries: " + err.Error()}
}
if config.AdditionalQueries == nil {
// No additional queries set
return queries, nil
}
var additionalQueries map[string]string
if err := json.Unmarshal(*config.AdditionalQueries, &additionalQueries); err != nil {
return nil, osqueryError{message: "unmarshal additional queries: " + err.Error()}
}
for name, query := range additionalQueries {
queries[hostAdditionalQueryPrefix+name] = query
}
return queries, nil
}
func (svc *Service) GetDistributedQueries(ctx context.Context) (map[string]string, uint, error) {
host, ok := hostctx.FromContext(ctx)
if !ok {
return nil, 0, osqueryError{message: "internal error: missing host from request context"}
}
queries, err := svc.hostDetailQueries(host)
if err != nil {
return nil, 0, err
}
// Retrieve the label queries that should be updated
cutoff := svc.clock.Now().Add(-svc.config.Osquery.LabelUpdateInterval)
labelQueries, err := svc.ds.LabelQueriesForHost(&host, cutoff)
if err != nil {
return nil, 0, osqueryError{message: "retrieving label queries: " + err.Error()}
}
for name, query := range labelQueries {
queries[hostLabelQueryPrefix+name] = query
}
liveQueries, err := svc.liveQueryStore.QueriesForHost(host.ID)
if err != nil {
return nil, 0, osqueryError{message: "retrieve live queries: " + err.Error()}
}
for name, query := range liveQueries {
queries[hostDistributedQueryPrefix+name] = query
}
accelerate := uint(0)
if host.HostName == "" || host.Platform == "" {
// Assume this host is just enrolling, and accelerate checkins
// (to allow for platform restricted labels to run quickly
// after platform is retrieved from details)
accelerate = 10
}
return queries, accelerate, nil
}
// ingestDetailQuery takes the results of a detail query and modifies the
// provided kolide.Host appropriately.
func (svc *Service) ingestDetailQuery(host *kolide.Host, name string, rows []map[string]string) error {
trimmedQuery := strings.TrimPrefix(name, hostDetailQueryPrefix)
query, ok := detailQueries[trimmedQuery]
if !ok {
return osqueryError{message: "unknown detail query " + trimmedQuery}
}
err := query.IngestFunc(svc.logger, host, rows)
if err != nil {
return osqueryError{
message: fmt.Sprintf("ingesting query %s: %s", name, err.Error()),
}
}
// Refetch is no longer needed after ingesting details.
host.RefetchRequested = false
return nil
}
// ingestLabelQuery records the results of label queries run by a host
func (svc *Service) ingestLabelQuery(host kolide.Host, query string, rows []map[string]string, results map[uint]bool) error {
trimmedQuery := strings.TrimPrefix(query, hostLabelQueryPrefix)
trimmedQueryNum, err := strconv.Atoi(emptyToZero(trimmedQuery))
if err != nil {
return errors.Wrap(err, "converting query from string to int")
}
// A label query matches if there is at least one result for that
// query. We must also store negative results.
results[uint(trimmedQueryNum)] = len(rows) > 0
return nil
}
// ingestDistributedQuery takes the results of a distributed query and modifies the
// provided kolide.Host appropriately.
func (svc *Service) ingestDistributedQuery(host kolide.Host, name string, rows []map[string]string, failed bool, errMsg string) error {
trimmedQuery := strings.TrimPrefix(name, hostDistributedQueryPrefix)
campaignID, err := strconv.Atoi(emptyToZero(trimmedQuery))
if err != nil {
return osqueryError{message: "unable to parse campaign ID: " + trimmedQuery}
}
// Write the results to the pubsub store
res := kolide.DistributedQueryResult{
DistributedQueryCampaignID: uint(campaignID),
Host: host,
Rows: rows,
}
if failed {
res.Error = &errMsg
}
err = svc.resultStore.WriteResult(res)
if err != nil {
nErr, ok := err.(pubsub.Error)
if !ok || !nErr.NoSubscriber() {
return osqueryError{message: "writing results: " + err.Error()}
}
// If there are no subscribers, the campaign is "orphaned"
// and should be closed so that we don't continue trying to
// execute that query when we can't write to any subscriber
campaign, err := svc.ds.DistributedQueryCampaign(uint(campaignID))
if err != nil {
if err := svc.liveQueryStore.StopQuery(strconv.Itoa(int(campaignID))); err != nil {
return osqueryError{message: "stop orphaned campaign after load failure: " + err.Error()}
}
return osqueryError{message: "loading orphaned campaign: " + err.Error()}
}
if campaign.CreatedAt.After(svc.clock.Now().Add(-5 * time.Second)) {
// Give the client 5 seconds to connect before considering the
// campaign orphaned
return osqueryError{message: "campaign waiting for listener (please retry)"}
}
if campaign.Status != kolide.QueryComplete {
campaign.Status = kolide.QueryComplete
if err := svc.ds.SaveDistributedQueryCampaign(campaign); err != nil {
return osqueryError{message: "closing orphaned campaign: " + err.Error()}
}
}
if err := svc.liveQueryStore.StopQuery(strconv.Itoa(int(campaignID))); err != nil {
return osqueryError{message: "stopping orphaned campaign: " + err.Error()}
}
// No need to record query completion in this case
return nil
}
err = svc.liveQueryStore.QueryCompletedByHost(strconv.Itoa(int(campaignID)), host.ID)
if err != nil {
return osqueryError{message: "record query completion: " + err.Error()}
}
return nil
}
func (svc *Service) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults, statuses map[string]kolide.OsqueryStatus, messages map[string]string) error {
host, ok := hostctx.FromContext(ctx)
if !ok {
return osqueryError{message: "internal error: missing host from request context"}
}
// Check for label queries and if so, load host additional. If we don't do
// this, we will end up unintentionally dropping any existing host
// additional info.
for query := range results {
if strings.HasPrefix(query, hostLabelQueryPrefix) {
fullHost, err := svc.ds.Host(host.ID)
if err != nil {
return osqueryError{message: "internal error: load host additional: " + err.Error()}
}
host = *fullHost
break
}
}
var err error
detailUpdated := false // Whether detail or additional was updated
additionalResults := make(kolide.OsqueryDistributedQueryResults)
labelResults := map[uint]bool{}
for query, rows := range results {
switch {
case strings.HasPrefix(query, hostDetailQueryPrefix):
err = svc.ingestDetailQuery(&host, query, rows)
detailUpdated = true
case strings.HasPrefix(query, hostAdditionalQueryPrefix):
name := strings.TrimPrefix(query, hostAdditionalQueryPrefix)
additionalResults[name] = rows
detailUpdated = true
case strings.HasPrefix(query, hostLabelQueryPrefix):
err = svc.ingestLabelQuery(host, query, rows, labelResults)
case strings.HasPrefix(query, hostDistributedQueryPrefix):
// osquery docs say any nonzero (string) value for
// status indicates a query error
status, ok := statuses[query]
failed := (ok && status != kolide.StatusOK)
err = svc.ingestDistributedQuery(host, query, rows, failed, messages[query])
default:
err = osqueryError{message: "unknown query prefix: " + query}
}
if err != nil {
return osqueryError{message: "failed to ingest result: " + err.Error()}
}
}
if len(labelResults) > 0 {
host.LabelUpdateTime = svc.clock.Now()
err = svc.ds.RecordLabelQueryExecutions(&host, labelResults, svc.clock.Now())
if err != nil {
return osqueryError{message: "failed to save labels: " + err.Error()}
}
}
if detailUpdated {
host.DetailUpdateTime = svc.clock.Now()
additionalJSON, err := json.Marshal(additionalResults)
if err != nil {
return osqueryError{message: "failed to marshal additional: " + err.Error()}
}
additional := json.RawMessage(additionalJSON)
host.Additional = &additional
}
if len(labelResults) > 0 || detailUpdated {
err = svc.ds.SaveHost(&host)
if err != nil {
return osqueryError{message: "failed to update host details: " + err.Error()}
}
if host.HostSoftware.Modified {
if err := svc.ds.SaveHostSoftware(&host); err != nil {
return osqueryError{message: "failed to save host software: " + err.Error()}
}
}
if detailUpdated {
if err := svc.ds.SaveHostAdditional(&host); err != nil {
return osqueryError{message: "failed to save host additional: " + err.Error()}
}
}
}
return nil
}