diff --git a/docs/3-Deployment/2-Configuration.md b/docs/3-Deployment/2-Configuration.md index 4bb32e573..8b9596c29 100644 --- a/docs/3-Deployment/2-Configuration.md +++ b/docs/3-Deployment/2-Configuration.md @@ -1183,6 +1183,28 @@ The identifier of the pubsub topic that osquery status logs will be published to status_topic: osquery_status ``` +###### `pubsub_add_attributes` + +This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`. + +Add Pub/Sub attributes to messages. When enabled, the plugin parses the osquery result +messages, and adds the following Pub/Sub message attributes: + +- `name` - the `name` attribute from the message body +- `timestamp` - the `unixTime` attribute from the message body, converted to rfc3339 format +- Each decoration from the message + +This feature is useful when combined with [subscription filters](https://cloud.google.com/pubsub/docs/filtering). + +- Default value: false +- Environment variable: `FLEET_PUBSUB_ADD_ATTRIBUTES` +- Config file format: + + ``` + pubsub: + status_topic: osquery_status + ``` + ##### S3 file carving backend ###### `s3_bucket` diff --git a/server/config/config.go b/server/config/config.go index 011cca4bc..b0d94011c 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -140,9 +140,10 @@ type S3Config struct { // PubSubConfig defines configs the for Google PubSub logging plugin type PubSubConfig struct { - Project string - StatusTopic string `yaml:"status_topic"` - ResultTopic string `yaml:"result_topic"` + Project string + StatusTopic string `yaml:"status_topic"` + ResultTopic string `yaml:"result_topic"` + AddAttributes bool `yaml:"add_attributes"` } // FilesystemConfig defines configs for the Filesystem logging plugin @@ -328,6 +329,7 @@ func (man Manager) addConfigs() { man.addConfigString("pubsub.project", "", "Google Cloud Project to use") man.addConfigString("pubsub.status_topic", "", "PubSub topic for status logs") man.addConfigString("pubsub.result_topic", "", "PubSub topic for result logs") + man.addConfigBool("pubsub.add_attributes", false, "Add PubSub attributes in addition to the message body") // Filesystem man.addConfigString("filesystem.status_log_file", "/tmp/osquery_status", @@ -463,9 +465,10 @@ func (man Manager) LoadConfig() KolideConfig { StsAssumeRoleArn: man.getConfigString("s3.sts_assume_role_arn"), }, PubSub: PubSubConfig{ - Project: man.getConfigString("pubsub.project"), - StatusTopic: man.getConfigString("pubsub.status_topic"), - ResultTopic: man.getConfigString("pubsub.result_topic"), + Project: man.getConfigString("pubsub.project"), + StatusTopic: man.getConfigString("pubsub.status_topic"), + ResultTopic: man.getConfigString("pubsub.result_topic"), + AddAttributes: man.getConfigBool("pubsub.add_attributes"), }, Filesystem: FilesystemConfig{ StatusLogFile: man.getConfigString("filesystem.status_log_file"), diff --git a/server/logging/logging.go b/server/logging/logging.go index 33755208d..b870698bd 100644 --- a/server/logging/logging.go +++ b/server/logging/logging.go @@ -74,6 +74,7 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error) status, err = NewPubSubLogWriter( config.PubSub.Project, config.PubSub.StatusTopic, + false, logger, ) if err != nil { @@ -145,6 +146,7 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error) result, err = NewPubSubLogWriter( config.PubSub.Project, config.PubSub.ResultTopic, + config.PubSub.AddAttributes, logger, ) if err != nil { diff --git a/server/logging/pubsub.go b/server/logging/pubsub.go index 9ea8d0e43..981c236d8 100644 --- a/server/logging/pubsub.go +++ b/server/logging/pubsub.go @@ -3,6 +3,7 @@ package logging import ( "context" "encoding/json" + "time" "cloud.google.com/go/pubsub" "github.com/go-kit/kit/log" @@ -11,11 +12,18 @@ import ( ) type pubSubLogWriter struct { - topic *pubsub.Topic - logger log.Logger + topic *pubsub.Topic + logger log.Logger + addAttributes bool } -func NewPubSubLogWriter(projectId string, topicName string, logger log.Logger) (*pubSubLogWriter, error) { +type PubSubAttributes struct { + Name string `json:"name"` + UnixTime int64 `json:"unixTime"` + Decorations map[string]string `json:"decorations"` +} + +func NewPubSubLogWriter(projectId string, topicName string, addAttributes bool, logger log.Logger) (*pubSubLogWriter, error) { ctx := context.Background() client, err := pubsub.NewClient(ctx, projectId) @@ -29,14 +37,24 @@ func NewPubSubLogWriter(projectId string, topicName string, logger log.Logger) ( "msg", "GCP PubSub writer configured", "project", projectId, "topic", topicName, + "add_attributes", addAttributes, ) return &pubSubLogWriter{ - topic: topic, - logger: logger, + topic: topic, + logger: logger, + addAttributes: addAttributes, }, nil } +func estimateAttributeSize(attributes map[string]string) int { + var sz int + for k, v := range attributes { + sz += len(k) + len(v) + 2 + } + return sz +} + func (w *pubSubLogWriter) Write(ctx context.Context, logs []json.RawMessage) error { results := make([]*pubsub.PublishResult, len(logs)) @@ -47,7 +65,22 @@ func (w *pubSubLogWriter) Write(ctx context.Context, logs []json.RawMessage) err return errors.Wrap(err, "marshal message into JSON") } - if len(data) > pubsub.MaxPublishRequestBytes { + attributes := make(map[string]string) + + if w.addAttributes { + var unmarshaled PubSubAttributes + + if err := json.Unmarshal(log, &unmarshaled); err != nil { + return errors.Wrap(err, "unmarshalling log message JSON") + } + attributes["name"] = unmarshaled.Name + attributes["timestamp"] = time.Unix(unmarshaled.UnixTime, 0).Format(time.RFC3339) + for k, v := range unmarshaled.Decorations { + attributes[k] = v + } + } + + if len(data)+estimateAttributeSize(attributes) > pubsub.MaxPublishRequestBytes { level.Info(w.logger).Log( "msg", "dropping log over 10MB PubSub limit", "size", len(data), @@ -57,7 +90,8 @@ func (w *pubSubLogWriter) Write(ctx context.Context, logs []json.RawMessage) err } message := &pubsub.Message{ - Data: data, + Data: data, + Attributes: attributes, } results[i] = w.topic.Publish(ctx, message)