Copy log fields into GCP PubSub attributes (#712)

Add a config setting to allow copying message fields and decorations into Google Pub/Sub attributes, making it possible to use these values for subscription filters.
This commit is contained in:
Michael Samuel 2021-05-09 05:29:52 +10:00 committed by GitHub
parent 1cb514c460
commit fb45806088
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 13 deletions

View File

@ -1183,6 +1183,28 @@ The identifier of the pubsub topic that osquery status logs will be published to
status_topic: osquery_status
```
###### `pubsub_add_attributes`
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
Add Pub/Sub attributes to messages. When enabled, the plugin parses the osquery result
messages, and adds the following Pub/Sub message attributes:
- `name` - the `name` attribute from the message body
- `timestamp` - the `unixTime` attribute from the message body, converted to rfc3339 format
- Each decoration from the message
This feature is useful when combined with [subscription filters](https://cloud.google.com/pubsub/docs/filtering).
- Default value: false
- Environment variable: `FLEET_PUBSUB_ADD_ATTRIBUTES`
- Config file format:
```
pubsub:
status_topic: osquery_status
```
##### S3 file carving backend
###### `s3_bucket`

View File

@ -140,9 +140,10 @@ type S3Config struct {
// PubSubConfig defines configs the for Google PubSub logging plugin
type PubSubConfig struct {
Project string
StatusTopic string `yaml:"status_topic"`
ResultTopic string `yaml:"result_topic"`
Project string
StatusTopic string `yaml:"status_topic"`
ResultTopic string `yaml:"result_topic"`
AddAttributes bool `yaml:"add_attributes"`
}
// FilesystemConfig defines configs for the Filesystem logging plugin
@ -328,6 +329,7 @@ func (man Manager) addConfigs() {
man.addConfigString("pubsub.project", "", "Google Cloud Project to use")
man.addConfigString("pubsub.status_topic", "", "PubSub topic for status logs")
man.addConfigString("pubsub.result_topic", "", "PubSub topic for result logs")
man.addConfigBool("pubsub.add_attributes", false, "Add PubSub attributes in addition to the message body")
// Filesystem
man.addConfigString("filesystem.status_log_file", "/tmp/osquery_status",
@ -463,9 +465,10 @@ func (man Manager) LoadConfig() KolideConfig {
StsAssumeRoleArn: man.getConfigString("s3.sts_assume_role_arn"),
},
PubSub: PubSubConfig{
Project: man.getConfigString("pubsub.project"),
StatusTopic: man.getConfigString("pubsub.status_topic"),
ResultTopic: man.getConfigString("pubsub.result_topic"),
Project: man.getConfigString("pubsub.project"),
StatusTopic: man.getConfigString("pubsub.status_topic"),
ResultTopic: man.getConfigString("pubsub.result_topic"),
AddAttributes: man.getConfigBool("pubsub.add_attributes"),
},
Filesystem: FilesystemConfig{
StatusLogFile: man.getConfigString("filesystem.status_log_file"),

View File

@ -74,6 +74,7 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
status, err = NewPubSubLogWriter(
config.PubSub.Project,
config.PubSub.StatusTopic,
false,
logger,
)
if err != nil {
@ -145,6 +146,7 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
result, err = NewPubSubLogWriter(
config.PubSub.Project,
config.PubSub.ResultTopic,
config.PubSub.AddAttributes,
logger,
)
if err != nil {

View File

@ -3,6 +3,7 @@ package logging
import (
"context"
"encoding/json"
"time"
"cloud.google.com/go/pubsub"
"github.com/go-kit/kit/log"
@ -11,11 +12,18 @@ import (
)
type pubSubLogWriter struct {
topic *pubsub.Topic
logger log.Logger
topic *pubsub.Topic
logger log.Logger
addAttributes bool
}
func NewPubSubLogWriter(projectId string, topicName string, logger log.Logger) (*pubSubLogWriter, error) {
type PubSubAttributes struct {
Name string `json:"name"`
UnixTime int64 `json:"unixTime"`
Decorations map[string]string `json:"decorations"`
}
func NewPubSubLogWriter(projectId string, topicName string, addAttributes bool, logger log.Logger) (*pubSubLogWriter, error) {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectId)
@ -29,14 +37,24 @@ func NewPubSubLogWriter(projectId string, topicName string, logger log.Logger) (
"msg", "GCP PubSub writer configured",
"project", projectId,
"topic", topicName,
"add_attributes", addAttributes,
)
return &pubSubLogWriter{
topic: topic,
logger: logger,
topic: topic,
logger: logger,
addAttributes: addAttributes,
}, nil
}
func estimateAttributeSize(attributes map[string]string) int {
var sz int
for k, v := range attributes {
sz += len(k) + len(v) + 2
}
return sz
}
func (w *pubSubLogWriter) Write(ctx context.Context, logs []json.RawMessage) error {
results := make([]*pubsub.PublishResult, len(logs))
@ -47,7 +65,22 @@ func (w *pubSubLogWriter) Write(ctx context.Context, logs []json.RawMessage) err
return errors.Wrap(err, "marshal message into JSON")
}
if len(data) > pubsub.MaxPublishRequestBytes {
attributes := make(map[string]string)
if w.addAttributes {
var unmarshaled PubSubAttributes
if err := json.Unmarshal(log, &unmarshaled); err != nil {
return errors.Wrap(err, "unmarshalling log message JSON")
}
attributes["name"] = unmarshaled.Name
attributes["timestamp"] = time.Unix(unmarshaled.UnixTime, 0).Format(time.RFC3339)
for k, v := range unmarshaled.Decorations {
attributes[k] = v
}
}
if len(data)+estimateAttributeSize(attributes) > pubsub.MaxPublishRequestBytes {
level.Info(w.logger).Log(
"msg", "dropping log over 10MB PubSub limit",
"size", len(data),
@ -57,7 +90,8 @@ func (w *pubSubLogWriter) Write(ctx context.Context, logs []json.RawMessage) err
}
message := &pubsub.Message{
Data: data,
Data: data,
Attributes: attributes,
}
results[i] = w.topic.Publish(ctx, message)