package service import ( "context" "encoding/json" "fmt" "net" "strconv" "strings" "time" hostctx "github.com/fleetdm/fleet/server/contexts/host" "github.com/fleetdm/fleet/server/kolide" "github.com/fleetdm/fleet/server/pubsub" "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/spf13/cast" ) type osqueryError struct { message string nodeInvalid bool } func (e osqueryError) Error() string { return e.message } func (e osqueryError) NodeInvalid() bool { return e.nodeInvalid } // Sometimes osquery gives us empty string where we expect an integer. // We change the to "0" so it can be handled by the appropriate string to // integer conversion function, as these will err on "" func emptyToZero(val string) string { if val == "" { return "0" } return val } func (svc service) AuthenticateHost(ctx context.Context, nodeKey string) (*kolide.Host, error) { if nodeKey == "" { return nil, osqueryError{ message: "authentication error: missing node key", nodeInvalid: true, } } host, err := svc.ds.AuthenticateHost(nodeKey) if err != nil { switch err.(type) { case kolide.NotFoundError: return nil, osqueryError{ message: "authentication error: invalid node key: " + nodeKey, nodeInvalid: true, } default: return nil, osqueryError{ message: "authentication error: " + err.Error(), } } } // Update the "seen" time used to calculate online status err = svc.ds.MarkHostSeen(host, svc.clock.Now()) if err != nil { return nil, osqueryError{message: "failed to mark host seen: " + err.Error()} } return host, nil } func (svc service) EnrollAgent(ctx context.Context, enrollSecret, hostIdentifier string, hostDetails map[string](map[string]string)) (string, error) { secretName, err := svc.ds.VerifyEnrollSecret(enrollSecret) if err != nil { return "", osqueryError{ message: "enroll failed: " + err.Error(), nodeInvalid: true, } } nodeKey, err := kolide.RandomText(svc.config.Osquery.NodeKeySize) if err != nil { return "", osqueryError{ message: "generate node key failed: " + err.Error(), nodeInvalid: true, } } host, err := svc.ds.EnrollHost(hostIdentifier, nodeKey, secretName) if err != nil { return "", osqueryError{message: "save enroll failed: " + err.Error(), nodeInvalid: true} } // Save enrollment details if provided save := false if r, ok := hostDetails["os_version"]; ok { detailQueries["os_version"].IngestFunc(svc.logger, host, []map[string]string{r}) save = true } if r, ok := hostDetails["osquery_info"]; ok { detailQueries["osquery_info"].IngestFunc(svc.logger, host, []map[string]string{r}) save = true } if r, ok := hostDetails["system_info"]; ok { detailQueries["system_info"].IngestFunc(svc.logger, host, []map[string]string{r}) save = true } if save { if err := svc.ds.SaveHost(host); err != nil { return "", osqueryError{message: "saving host details: " + err.Error(), nodeInvalid: true} } } return host.NodeKey, nil } func (svc service) GetClientConfig(ctx context.Context) (map[string]interface{}, error) { host, ok := hostctx.FromContext(ctx) if !ok { return nil, osqueryError{message: "internal error: missing host from request context"} } baseConfig, err := svc.ds.OptionsForPlatform(host.Platform) if err != nil { return nil, osqueryError{message: "internal error: fetching base config: " + err.Error()} } var config map[string]interface{} err = json.Unmarshal(baseConfig, &config) if err != nil { return nil, osqueryError{message: "internal error: parsing base configuration: " + err.Error()} } packs, err := svc.ds.ListPacksForHost(host.ID) if err != nil { return nil, osqueryError{message: "database error: " + err.Error()} } packConfig := kolide.Packs{} for _, pack := range packs { // first, we must figure out what queries are in this pack queries, err := svc.ds.ListScheduledQueriesInPack(pack.ID, kolide.ListOptions{}) if err != nil { return nil, osqueryError{message: "database error: " + err.Error()} } // the serializable osquery config struct expects content in a // particular format, so we do the conversion here configQueries := kolide.Queries{} for _, query := range queries { queryContent := kolide.QueryContent{ Query: query.Query, Interval: query.Interval, Platform: query.Platform, Version: query.Version, Removed: query.Removed, Shard: query.Shard, Denylist: query.Denylist, } if query.Removed != nil { queryContent.Removed = query.Removed } if query.Snapshot != nil && *query.Snapshot { queryContent.Snapshot = query.Snapshot } configQueries[query.Name] = queryContent } // finally, we add the pack to the client config struct with all of // the pack's queries packConfig[pack.Name] = kolide.PackContent{ Platform: pack.Platform, Queries: configQueries, } } if len(packConfig) > 0 { packJSON, err := json.Marshal(packConfig) if err != nil { return nil, osqueryError{message: "internal error: marshal pack JSON: " + err.Error()} } config["packs"] = json.RawMessage(packJSON) } // Save interval values if they have been updated. Note // config_tls_refresh can only be set in the osquery flags so is // ignored here. saveHost := false if options, ok := config["options"].(map[string]interface{}); ok { distributedIntervalVal, ok := options["distributed_interval"] distributedInterval, err := cast.ToUintE(distributedIntervalVal) if ok && err == nil && host.DistributedInterval != distributedInterval { host.DistributedInterval = distributedInterval saveHost = true } loggerTLSPeriodVal, ok := options["logger_tls_period"] loggerTLSPeriod, err := cast.ToUintE(loggerTLSPeriodVal) if ok && err == nil && host.LoggerTLSPeriod != loggerTLSPeriod { host.LoggerTLSPeriod = loggerTLSPeriod saveHost = true } } if saveHost { err := svc.ds.SaveHost(&host) if err != nil { return nil, err } } return config, nil } func (svc service) SubmitStatusLogs(ctx context.Context, logs []json.RawMessage) error { if err := svc.osqueryLogWriter.Status.Write(ctx, logs); err != nil { return osqueryError{message: "error writing status logs: " + err.Error()} } return nil } func (svc service) SubmitResultLogs(ctx context.Context, logs []json.RawMessage) error { if err := svc.osqueryLogWriter.Result.Write(ctx, logs); err != nil { return osqueryError{message: "error writing result logs: " + err.Error()} } return nil } // hostLabelQueryPrefix is appended before the query name when a query is // provided as a label query. This allows the results to be retrieved when // osqueryd writes the distributed query results. const hostLabelQueryPrefix = "fleet_label_query_" // hostDetailQueryPrefix is appended before the query name when a query is // provided as a detail query. const hostDetailQueryPrefix = "fleet_detail_query_" // hostAdditionalQueryPrefix is appended before the query name when a query is // provided as an additional query (additional info for hosts to retrieve). const hostAdditionalQueryPrefix = "fleet_additional_query_" // hostDistributedQueryPrefix is appended before the query name when a query is // run from a distributed query campaign const hostDistributedQueryPrefix = "fleet_distributed_query_" // detailQueries defines the detail queries that should be run on the host, as // well as how the results of those queries should be ingested into the // kolide.Host data model. This map should not be modified at runtime. var detailQueries = map[string]struct { Query string IngestFunc func(logger log.Logger, host *kolide.Host, rows []map[string]string) error }{ "network_interface": { Query: `select address, mac from interface_details id join interface_addresses ia on ia.interface = id.interface where length(mac) > 0 order by (ibytes + obytes) desc`, IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) (err error) { if len(rows) == 0 { logger.Log("component", "service", "method", "IngestFunc", "err", "detail_query_network_interface expected 1 or more results") return nil } for _, row := range rows { ip := net.ParseIP(row["address"]) if ip == nil { continue } // Skip link-local and loopback interfaces if ip.IsLinkLocalUnicast() || ip.IsLoopback() { continue } // Rows are ordered by traffic, so we will get the most active // interface by iterating in order host.PrimaryIP = row["address"] host.PrimaryMac = row["mac"] return nil } // If only link-local and loopback found, still use the first // interface. host.PrimaryIP = rows[0]["address"] host.PrimaryMac = rows[0]["mac"] return nil }, }, "os_version": { Query: "select * from os_version limit 1", IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error { if len(rows) != 1 { logger.Log("component", "service", "method", "IngestFunc", "err", fmt.Sprintf("detail_query_os_version expected single result got %d", len(rows))) return nil } host.OSVersion = fmt.Sprintf( "%s %s.%s.%s", rows[0]["name"], rows[0]["major"], rows[0]["minor"], rows[0]["patch"], ) host.OSVersion = strings.Trim(host.OSVersion, ".") if build, ok := rows[0]["build"]; ok { host.Build = build } host.Platform = rows[0]["platform"] host.PlatformLike = rows[0]["platform_like"] host.CodeName = rows[0]["code_name"] // On centos6 there is an osquery bug that leaves // platform empty. Here we workaround. if host.Platform == "" && strings.Contains(strings.ToLower(rows[0]["name"]), "centos") { host.Platform = "centos" } return nil }, }, "osquery_flags": { // Collect the interval info (used for online status // calculation) from the osquery flags. We typically control // distributed_interval (but it's not required), and typically // do not control config_tls_refresh. Query: `select name, value from osquery_flags where name in ("distributed_interval", "config_tls_refresh", "config_refresh", "logger_tls_period")`, IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error { var configTLSRefresh, configRefresh uint for _, row := range rows { switch row["name"] { case "distributed_interval": interval, err := strconv.Atoi(emptyToZero(row["value"])) if err != nil { return errors.Wrap(err, "parsing distributed_interval") } host.DistributedInterval = uint(interval) case "config_tls_refresh": // Prior to osquery 2.4.6, the flag was // called `config_tls_refresh`. interval, err := strconv.Atoi(emptyToZero(row["value"])) if err != nil { return errors.Wrap(err, "parsing config_tls_refresh") } configTLSRefresh = uint(interval) case "config_refresh": // After 2.4.6 `config_tls_refresh` was // aliased to `config_refresh`. interval, err := strconv.Atoi(emptyToZero(row["value"])) if err != nil { return errors.Wrap(err, "parsing config_refresh") } configRefresh = uint(interval) case "logger_tls_period": interval, err := strconv.Atoi(emptyToZero(row["value"])) if err != nil { return errors.Wrap(err, "parsing logger_tls_period") } host.LoggerTLSPeriod = uint(interval) } } // Since the `config_refresh` flag existed prior to // 2.4.6 and had a different meaning, we prefer // `config_tls_refresh` if it was set, and use // `config_refresh` as a fallback. if configTLSRefresh != 0 { host.ConfigTLSRefresh = configTLSRefresh } else { host.ConfigTLSRefresh = configRefresh } return nil }, }, "osquery_info": { Query: "select * from osquery_info limit 1", IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error { if len(rows) != 1 { logger.Log("component", "service", "method", "IngestFunc", "err", fmt.Sprintf("detail_query_osquery_info expected single result got %d", len(rows))) return nil } host.OsqueryVersion = rows[0]["version"] return nil }, }, "system_info": { Query: "select * from system_info limit 1", IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error { if len(rows) != 1 { logger.Log("component", "service", "method", "IngestFunc", "err", fmt.Sprintf("detail_query_system_info expected single result got %d", len(rows))) return nil } var err error host.PhysicalMemory, err = strconv.ParseInt(emptyToZero(rows[0]["physical_memory"]), 10, 64) if err != nil { return err } host.HostName = rows[0]["hostname"] host.UUID = rows[0]["uuid"] host.CPUType = rows[0]["cpu_type"] host.CPUSubtype = rows[0]["cpu_subtype"] host.CPUBrand = rows[0]["cpu_brand"] host.CPUPhysicalCores, err = strconv.Atoi(emptyToZero(rows[0]["cpu_physical_cores"])) if err != nil { return err } host.CPULogicalCores, err = strconv.Atoi(emptyToZero(rows[0]["cpu_logical_cores"])) if err != nil { return err } host.HardwareVendor = rows[0]["hardware_vendor"] host.HardwareModel = rows[0]["hardware_model"] host.HardwareVersion = rows[0]["hardware_version"] host.HardwareSerial = rows[0]["hardware_serial"] host.ComputerName = rows[0]["computer_name"] return nil }, }, "uptime": { Query: "select * from uptime limit 1", IngestFunc: func(logger log.Logger, host *kolide.Host, rows []map[string]string) error { if len(rows) != 1 { logger.Log("component", "service", "method", "IngestFunc", "err", fmt.Sprintf("detail_query_uptime expected single result got %d", len(rows))) return nil } uptimeSeconds, err := strconv.Atoi(emptyToZero(rows[0]["total_seconds"])) if err != nil { return err } host.Uptime = time.Duration(uptimeSeconds) * time.Second return nil }, }, } // hostDetailQueries returns the map of queries that should be executed by // osqueryd to fill in the host details func (svc service) hostDetailQueries(host kolide.Host) (map[string]string, error) { queries := make(map[string]string) if host.DetailUpdateTime.After(svc.clock.Now().Add(-svc.config.Osquery.DetailUpdateInterval)) { // No need to update already fresh details return queries, nil } for name, query := range detailQueries { queries[hostDetailQueryPrefix+name] = query.Query } // Get additional queries config, err := svc.ds.AppConfig() if err != nil { return nil, osqueryError{message: "get additional queries: " + err.Error()} } if config.AdditionalQueries == nil { // No additional queries set return queries, nil } var additionalQueries map[string]string if err := json.Unmarshal(*config.AdditionalQueries, &additionalQueries); err != nil { return nil, osqueryError{message: "unmarshal additional queries: " + err.Error()} } for name, query := range additionalQueries { queries[hostAdditionalQueryPrefix+name] = query } return queries, nil } func (svc service) GetDistributedQueries(ctx context.Context) (map[string]string, uint, error) { host, ok := hostctx.FromContext(ctx) if !ok { return nil, 0, osqueryError{message: "internal error: missing host from request context"} } queries, err := svc.hostDetailQueries(host) if err != nil { return nil, 0, err } // Retrieve the label queries that should be updated cutoff := svc.clock.Now().Add(-svc.config.Osquery.LabelUpdateInterval) labelQueries, err := svc.ds.LabelQueriesForHost(&host, cutoff) if err != nil { return nil, 0, osqueryError{message: "retrieving label queries: " + err.Error()} } for name, query := range labelQueries { queries[hostLabelQueryPrefix+name] = query } liveQueries, err := svc.liveQueryStore.QueriesForHost(host.ID) if err != nil { return nil, 0, osqueryError{message: "retrieve live queries: " + err.Error()} } for name, query := range liveQueries { queries[hostDistributedQueryPrefix+name] = query } accelerate := uint(0) if host.HostName == "" && host.Platform == "" { // Assume this host is just enrolling, and accelerate checkins // (to allow for platform restricted labels to run quickly // after platform is retrieved from details) accelerate = 10 } return queries, accelerate, nil } // ingestDetailQuery takes the results of a detail query and modifies the // provided kolide.Host appropriately. func (svc service) ingestDetailQuery(host *kolide.Host, name string, rows []map[string]string) error { trimmedQuery := strings.TrimPrefix(name, hostDetailQueryPrefix) query, ok := detailQueries[trimmedQuery] if !ok { return osqueryError{message: "unknown detail query " + trimmedQuery} } err := query.IngestFunc(svc.logger, host, rows) if err != nil { return osqueryError{ message: fmt.Sprintf("ingesting query %s: %s", name, err.Error()), } } return nil } // ingestLabelQuery records the results of label queries run by a host func (svc service) ingestLabelQuery(host kolide.Host, query string, rows []map[string]string, results map[uint]bool) error { trimmedQuery := strings.TrimPrefix(query, hostLabelQueryPrefix) trimmedQueryNum, err := strconv.Atoi(emptyToZero(trimmedQuery)) if err != nil { return errors.Wrap(err, "converting query from string to int") } // A label query matches if there is at least one result for that // query. We must also store negative results. results[uint(trimmedQueryNum)] = len(rows) > 0 return nil } // ingestDistributedQuery takes the results of a distributed query and modifies the // provided kolide.Host appropriately. func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []map[string]string, failed bool, errMsg string) error { trimmedQuery := strings.TrimPrefix(name, hostDistributedQueryPrefix) campaignID, err := strconv.Atoi(emptyToZero(trimmedQuery)) if err != nil { return osqueryError{message: "unable to parse campaign ID: " + trimmedQuery} } // Write the results to the pubsub store res := kolide.DistributedQueryResult{ DistributedQueryCampaignID: uint(campaignID), Host: host, Rows: rows, } if failed { res.Error = &errMsg } err = svc.resultStore.WriteResult(res) if err != nil { nErr, ok := err.(pubsub.Error) if !ok || !nErr.NoSubscriber() { return osqueryError{message: "writing results: " + err.Error()} } // If there are no subscribers, the campaign is "orphaned" // and should be closed so that we don't continue trying to // execute that query when we can't write to any subscriber campaign, err := svc.ds.DistributedQueryCampaign(uint(campaignID)) if err != nil { return osqueryError{message: "loading orphaned campaign: " + err.Error()} } if campaign.CreatedAt.After(svc.clock.Now().Add(-5 * time.Second)) { // Give the client 5 seconds to connect before considering the // campaign orphaned return osqueryError{message: "campaign waiting for listener (please retry)"} } if campaign.Status != kolide.QueryComplete { campaign.Status = kolide.QueryComplete if err := svc.ds.SaveDistributedQueryCampaign(campaign); err != nil { return osqueryError{message: "closing orphaned campaign: " + err.Error()} } } if err := svc.liveQueryStore.StopQuery(strconv.Itoa(int(campaignID))); err != nil { return osqueryError{message: "stopping orphaned campaign: " + err.Error()} } // No need to record query completion in this case return nil } err = svc.liveQueryStore.QueryCompletedByHost(strconv.Itoa(int(campaignID)), host.ID) if err != nil { return osqueryError{message: "record query completion: " + err.Error()} } return nil } func (svc service) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults, statuses map[string]kolide.OsqueryStatus, messages map[string]string) error { host, ok := hostctx.FromContext(ctx) if !ok { return osqueryError{message: "internal error: missing host from request context"} } var err error detailUpdated := false // Whether detail or additional was updated additionalResults := make(kolide.OsqueryDistributedQueryResults) labelResults := map[uint]bool{} for query, rows := range results { switch { case strings.HasPrefix(query, hostDetailQueryPrefix): err = svc.ingestDetailQuery(&host, query, rows) detailUpdated = true case strings.HasPrefix(query, hostAdditionalQueryPrefix): name := strings.TrimPrefix(query, hostAdditionalQueryPrefix) additionalResults[name] = rows detailUpdated = true case strings.HasPrefix(query, hostLabelQueryPrefix): err = svc.ingestLabelQuery(host, query, rows, labelResults) case strings.HasPrefix(query, hostDistributedQueryPrefix): // osquery docs say any nonzero (string) value for // status indicates a query error status, ok := statuses[query] failed := (ok && status != kolide.StatusOK) err = svc.ingestDistributedQuery(host, query, rows, failed, messages[query]) default: err = osqueryError{message: "unknown query prefix: " + query} } if err != nil { return osqueryError{message: "failed to ingest result: " + err.Error()} } } if len(labelResults) > 0 { host.LabelUpdateTime = svc.clock.Now() err = svc.ds.RecordLabelQueryExecutions(&host, labelResults, svc.clock.Now()) if err != nil { return osqueryError{message: "failed to save labels: " + err.Error()} } } if detailUpdated { host.DetailUpdateTime = svc.clock.Now() additionalJSON, err := json.Marshal(additionalResults) if err != nil { return osqueryError{message: "failed to marshal additional: " + err.Error()} } additional := json.RawMessage(additionalJSON) host.Additional = &additional } if len(labelResults) > 0 || detailUpdated { err = svc.ds.SaveHost(&host) if err != nil { return osqueryError{message: "failed to update host details: " + err.Error()} } } return nil }