mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 08:55:24 +00:00
Add Google Cloud PubSub logging (#2049)
Adds Google Cloud PubSub logging for status and results. This also changes the Write interface for logging modules to add a context.Context (only used by pubsub currently).
This commit is contained in:
parent
26ff78fcf3
commit
969d5f25af
142
Gopkg.lock
generated
142
Gopkg.lock
generated
@ -1,6 +1,22 @@
|
||||
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
|
||||
|
||||
|
||||
[[projects]]
|
||||
digest = "1:fa2516ad37e0aa2925ba44d2942f59feb079591798091737a07b7e3294ce484f"
|
||||
name = "cloud.google.com/go"
|
||||
packages = [
|
||||
"compute/metadata",
|
||||
"iam",
|
||||
"internal/optional",
|
||||
"internal/version",
|
||||
"pubsub",
|
||||
"pubsub/apiv1",
|
||||
"pubsub/internal/distribution",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "fcb9a2d5f791d07be64506ab54434de65989d370"
|
||||
version = "v0.37.4"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:93e1e2249cdc682b89b9526e12f4016fd1bf74892f90e3e1380ca2f9c4c3e390"
|
||||
name = "git.apache.org/thrift.git"
|
||||
@ -188,13 +204,15 @@
|
||||
version = "v1.7.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:17fe264ee908afc795734e8c4e63db2accabaf57326dbf21763a7d6b86096260"
|
||||
digest = "1:bc4fe35949eeb15fc1f3c87c5f6b041f939af038a1a541f1305790d9914f6344"
|
||||
name = "github.com/golang/protobuf"
|
||||
packages = [
|
||||
"proto",
|
||||
"protoc-gen-go/descriptor",
|
||||
"ptypes",
|
||||
"ptypes/any",
|
||||
"ptypes/duration",
|
||||
"ptypes/empty",
|
||||
"ptypes/timestamp",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
@ -220,6 +238,14 @@
|
||||
revision = "064e2069ce9c359c118179501254f67d7d37ba24"
|
||||
version = "0.2"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:f1f70abea1ab125d48396343b4c053f8fecfbdb943037bf3d29dc80c90fe60b3"
|
||||
name = "github.com/googleapis/gax-go"
|
||||
packages = ["v2"]
|
||||
pruneopts = "UT"
|
||||
revision = "beaecbbdd8af86aa3acf14180d53828ce69400b2"
|
||||
version = "v2.0.4"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:160eabf7a69910fd74f29c692718bc2437c1c1c7d4c9dea9712357752a70e5df"
|
||||
name = "github.com/gorilla/context"
|
||||
@ -244,6 +270,14 @@
|
||||
revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b"
|
||||
version = "v1.2.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:67474f760e9ac3799f740db2c489e6423a4cde45520673ec123ac831ad849cb8"
|
||||
name = "github.com/hashicorp/golang-lru"
|
||||
packages = ["simplelru"]
|
||||
pruneopts = "UT"
|
||||
revision = "7087cb70de9f7a8bc0a10c375cb0d2280a8edf9c"
|
||||
version = "v0.5.1"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:a361611b8c8c75a1091f00027767f7779b29cb37c456a71b8f2604c88057ab40"
|
||||
@ -577,6 +611,32 @@
|
||||
revision = "cfb38830724cc34fedffe9a2a29fb54fa9169cd1"
|
||||
version = "v1.20.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:b55b18883730e7d4b34687685cb27b9c576ebc29ae75f72918935015eaaccaa2"
|
||||
name = "go.opencensus.io"
|
||||
packages = [
|
||||
".",
|
||||
"internal",
|
||||
"internal/tagencoding",
|
||||
"metric/metricdata",
|
||||
"metric/metricproducer",
|
||||
"plugin/ocgrpc",
|
||||
"plugin/ochttp",
|
||||
"plugin/ochttp/propagation/b3",
|
||||
"resource",
|
||||
"stats",
|
||||
"stats/internal",
|
||||
"stats/view",
|
||||
"tag",
|
||||
"trace",
|
||||
"trace/internal",
|
||||
"trace/propagation",
|
||||
"trace/tracestate",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "75c0cca22312e51bfd4fafdbe9197ae399e18b38"
|
||||
version = "v0.20.2"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:001a4e7a40e50ff2ef32e2556bca50c4f77daa457db3ac6afc8bea9bb2122cfb"
|
||||
@ -591,10 +651,11 @@
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:9349bd005ea80ec1b37032d01daedc81168348a9c295f7846188adfb2f405554"
|
||||
digest = "1:a8e48e266382119a868a837b5bc83c118ae95dff0a1467f59c8fb967c40a4bc3"
|
||||
name = "golang.org/x/net"
|
||||
packages = [
|
||||
"context",
|
||||
"context/ctxhttp",
|
||||
"http/httpguts",
|
||||
"http2",
|
||||
"http2/hpack",
|
||||
@ -606,6 +667,31 @@
|
||||
pruneopts = "UT"
|
||||
revision = "5f9ae10d9af5b1c89ae6904293b14b064d4ada23"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:645cb780e4f3177111b40588f0a7f5950efcfb473e7ff41d8d81b2ba5eaa6ed5"
|
||||
name = "golang.org/x/oauth2"
|
||||
packages = [
|
||||
".",
|
||||
"google",
|
||||
"internal",
|
||||
"jws",
|
||||
"jwt",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "9f3314589c9a9136388751d9adae6b0ed400978a"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:04a5b0e4138f98eef79ce12a955a420ee358e9f787044cc3a553ac3c3ade997e"
|
||||
name = "golang.org/x/sync"
|
||||
packages = [
|
||||
"errgroup",
|
||||
"semaphore",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "56d357773e8497dfd526f0727e187720d1093757"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:a8b6f3b4dc60e2fa266203e846ae58bde7fd2a737201080827375e21455b0d2e"
|
||||
@ -641,23 +727,61 @@
|
||||
version = "v0.3.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:c25289f43ac4a68d88b02245742347c94f1e108c534dda442188015ff80669b3"
|
||||
digest = "1:dd6e1a70c3d069cb58cb5cd234a6aca65c31b87eecce857b0b140aec96fcee91"
|
||||
name = "google.golang.org/api"
|
||||
packages = [
|
||||
"googleapi/transport",
|
||||
"internal",
|
||||
"iterator",
|
||||
"option",
|
||||
"support/bundler",
|
||||
"transport",
|
||||
"transport/grpc",
|
||||
"transport/http",
|
||||
"transport/http/internal/propagation",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "0cbcb99a9ea0c8023c794b2693cbe1def82ed4d7"
|
||||
version = "v0.3.2"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:d2cfb607095cee410f25a3c04eed77f76c4721d1bb0cbc894f5d8624b33394c8"
|
||||
name = "google.golang.org/appengine"
|
||||
packages = ["cloudsql"]
|
||||
packages = [
|
||||
".",
|
||||
"cloudsql",
|
||||
"internal",
|
||||
"internal/app_identity",
|
||||
"internal/base",
|
||||
"internal/datastore",
|
||||
"internal/log",
|
||||
"internal/modules",
|
||||
"internal/remote_api",
|
||||
"internal/socket",
|
||||
"internal/urlfetch",
|
||||
"socket",
|
||||
"urlfetch",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "ae0ab99deb4dc413a2b4bd6c8bdd0eb67f1e4d06"
|
||||
version = "v1.2.0"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:cd018653a358d4b743a9d3bee89e825521f2ab2f2ec0770164bf7632d8d73ab7"
|
||||
digest = "1:3225a050ce9eb2c772c3f4a173b89dba31f78be26da6e12651b93a6836cbea50"
|
||||
name = "google.golang.org/genproto"
|
||||
packages = ["googleapis/rpc/status"]
|
||||
packages = [
|
||||
"googleapis/api/annotations",
|
||||
"googleapis/iam/v1",
|
||||
"googleapis/pubsub/v1",
|
||||
"googleapis/rpc/status",
|
||||
"protobuf/field_mask",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "86e600f69ee4704c6efbf6a2a40a5c10700e76c2"
|
||||
revision = "64821d5d210748c883cd2b809589555ae4654203"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:f47fb9bd1a9173285be23a1a1342e019abddfd787216e7f2c555ddba837e98ce"
|
||||
digest = "1:ae6f7c82c76f9b9cead4e57c5c8a3aac3bd3e6c9e33c3281364e7fb4e52554b2"
|
||||
name = "google.golang.org/grpc"
|
||||
packages = [
|
||||
".",
|
||||
@ -667,6 +791,7 @@
|
||||
"codes",
|
||||
"connectivity",
|
||||
"credentials",
|
||||
"credentials/oauth",
|
||||
"encoding",
|
||||
"encoding/proto",
|
||||
"grpclb/grpc_lb_v1/messages",
|
||||
@ -708,6 +833,7 @@
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
input-imports = [
|
||||
"cloud.google.com/go/pubsub",
|
||||
"github.com/VividCortex/mysqlerr",
|
||||
"github.com/WatchBeam/clock",
|
||||
"github.com/aws/aws-sdk-go/aws",
|
||||
|
@ -121,3 +121,7 @@
|
||||
[prune]
|
||||
go-tests = true
|
||||
unused-packages = true
|
||||
|
||||
[[constraint]]
|
||||
name = "cloud.google.com/go"
|
||||
version = "0.37.4"
|
||||
|
@ -482,7 +482,7 @@ The interval at which Fleet will ask osquery agents to update their results for
|
||||
|
||||
Which log output plugin should be used for osquery status logs received from clients.
|
||||
|
||||
Options are `filesystem` and `firehose`.
|
||||
Options are `filesystem`, `firehose`, and 'pubsub'.
|
||||
|
||||
- Default value: `filesystem`
|
||||
- Environment variable: `KOLIDE_OSQUERY_STATUS_LOG_PLUGIN`
|
||||
@ -497,7 +497,7 @@ Options are `filesystem` and `firehose`.
|
||||
|
||||
Which log output plugin should be used for osquery result logs received from clients.
|
||||
|
||||
Options are `filesystem` and `firehose`.
|
||||
Options are `filesystem`, `firehose`, and 'pubsub'.
|
||||
|
||||
- Default value: `filesystem`
|
||||
- Environment variable: `KOLIDE_OSQUERY_RESULT_LOG_PLUGIN`
|
||||
@ -720,3 +720,55 @@ Name of the Firehose stream to write osquery result logs received from clients.
|
||||
firehose:
|
||||
result_stream: osquery_result
|
||||
```
|
||||
|
||||
#### PubSub
|
||||
|
||||
### `pubsub_project`
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
|
||||
The identifier of the Google Cloud project containing the pubsub topics to
|
||||
publish logs to.
|
||||
|
||||
Note that the pubsub plugin uses [Application Default Credentials (ADCs)](https://cloud.google.com/docs/authentication/production)
|
||||
for authentication with the service.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_PUBSUB_PROJECT`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
pubsub:
|
||||
project: my-gcp-project
|
||||
```
|
||||
|
||||
### `pubsub_result_topic`
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
|
||||
The identifier of the pubsub topic that client results will be published to.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_PUBSUB_RESULT_TOPIC`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
pubsub:
|
||||
result_topic: osquery_result
|
||||
```
|
||||
|
||||
### `pubsub_status_topic`
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
|
||||
The identifier of the pubsub topic that osquery status logs will be published to.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_PUBSUB_STATUS_TOPIC`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
pubsub:
|
||||
status_topic: osquery_status
|
||||
```
|
||||
|
||||
|
@ -10,6 +10,7 @@ Fleet supports the following logging plugins for osquery logs:
|
||||
|
||||
- [Filesystem](#filesystem) - Logs are written to the local Fleet server filesystem.
|
||||
- [Firehose](#firehose) - Logs are written to AWS Firehose streams.
|
||||
- [PubSub](#pubsub) - Logs are written to Google Cloud PubSub topics.
|
||||
|
||||
To set the osquery logging plugins, use the `--osquery_result_log_plugin` and `--osquery_status_log_plugin` flags (or [equivalents for environment variables or configuration files](../infrastructure/configuring-the-fleet-binary.md#options)).
|
||||
|
||||
@ -30,3 +31,12 @@ With the filesystem plugin, osquery result and/or status logs are written to the
|
||||
With the Firehose plugin, osquery result and/or status logs are written to [AWS Firehose](https://aws.amazon.com/kinesis/data-firehose/) streams. This is a very good method for aggregating osquery logs into AWS S3 storage.
|
||||
|
||||
Note that Firehose logging has limits [discussed in the documentation](https://docs.aws.amazon.com/firehose/latest/dev/limits.html). When Fleet encounters logs that are too big for Firehose, notifications will be output in the Fleet logs and those logs _will not_ be sent to Firehose.
|
||||
|
||||
### PubSub
|
||||
|
||||
- Plugin name: `pubsub`
|
||||
- Flag namespace: [pubsub](../infrastructure/configuring-the-fleet-binary.md#pubsub)
|
||||
|
||||
With the PubSub plugin, osquery result and/or status logs are written to [PubSub](https://cloud.google.com/pubsub/) topics.
|
||||
|
||||
Note that messages over 10MB will be dropped, with a notification sent to the fleet logs, as these can never be processed by PubSub.
|
||||
|
@ -98,6 +98,13 @@ type FirehoseConfig struct {
|
||||
ResultStream string `yaml:"result_stream"`
|
||||
}
|
||||
|
||||
// PubSubConfig defines configs the for Google PubSub logging plugin
|
||||
type PubSubConfig struct {
|
||||
Project string
|
||||
StatusTopic string `yaml:"status_topic"`
|
||||
ResultTopic string `yaml:"result_topic"`
|
||||
}
|
||||
|
||||
// FilesystemConfig defines configs for the Filesystem logging plugin
|
||||
type FilesystemConfig struct {
|
||||
StatusLogFile string `yaml:"status_log_file"`
|
||||
@ -119,6 +126,7 @@ type KolideConfig struct {
|
||||
Osquery OsqueryConfig
|
||||
Logging LoggingConfig
|
||||
Firehose FirehoseConfig
|
||||
PubSub PubSubConfig
|
||||
Filesystem FilesystemConfig
|
||||
}
|
||||
|
||||
@ -221,6 +229,11 @@ func (man Manager) addConfigs() {
|
||||
man.addConfigString("firehose.result_stream", "",
|
||||
"Firehose stream name for result logs")
|
||||
|
||||
// PubSub
|
||||
man.addConfigString("pubsub.project", "", "Google Cloud Project to use")
|
||||
man.addConfigString("pubsub.status_topic", "", "PubSub topic for status logs")
|
||||
man.addConfigString("pubsub.result_topic", "", "PubSub topic for result logs")
|
||||
|
||||
// Filesystem
|
||||
man.addConfigString("filesystem.status_log_file", "/tmp/osquery_status",
|
||||
"Log file path to use for status logs")
|
||||
@ -294,6 +307,11 @@ func (man Manager) LoadConfig() KolideConfig {
|
||||
StatusStream: man.getConfigString("firehose.status_stream"),
|
||||
ResultStream: man.getConfigString("firehose.result_stream"),
|
||||
},
|
||||
PubSub: PubSubConfig{
|
||||
Project: man.getConfigString("pubsub.project"),
|
||||
StatusTopic: man.getConfigString("pubsub.status_topic"),
|
||||
ResultTopic: man.getConfigString("pubsub.result_topic"),
|
||||
},
|
||||
Filesystem: FilesystemConfig{
|
||||
StatusLogFile: man.getConfigString("filesystem.status_log_file"),
|
||||
ResultLogFile: man.getConfigString("filesystem.result_log_file"),
|
||||
|
@ -1,11 +1,14 @@
|
||||
package kolide
|
||||
|
||||
import "encoding/json"
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// JSONLogger defines an interface for loggers that can write JSON to various
|
||||
// output sources.
|
||||
type JSONLogger interface {
|
||||
// Write writes the JSON log entries to the appropriate destination,
|
||||
// returning any errors that occurred.
|
||||
Write(logs []json.RawMessage) error
|
||||
Write(ctx context.Context, logs []json.RawMessage) error
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package logging
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
@ -59,7 +60,7 @@ type flusher interface {
|
||||
}
|
||||
|
||||
// Write writes the provided logs to the filesystem
|
||||
func (l *filesystemLogWriter) Write(logs []json.RawMessage) error {
|
||||
func (l *filesystemLogWriter) Write(ctx context.Context, logs []json.RawMessage) error {
|
||||
for _, log := range logs {
|
||||
// Add newline to separate logs in output file
|
||||
log = append(log, '\n')
|
||||
|
@ -1,6 +1,7 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
@ -14,6 +15,7 @@ import (
|
||||
)
|
||||
|
||||
func TestFilesystemLogger(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tempPath, err := ioutil.TempDir("", "test")
|
||||
require.Nil(t, err)
|
||||
fileName := path.Join(tempPath, "filesystemLogWriter")
|
||||
@ -35,7 +37,7 @@ func TestFilesystemLogger(t *testing.T) {
|
||||
}
|
||||
|
||||
for i := 0; i < batches; i++ {
|
||||
err := lgr.Write(logs)
|
||||
err := lgr.Write(ctx, logs)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
@ -43,7 +45,7 @@ func TestFilesystemLogger(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
// can't write to a closed logger
|
||||
err = lgr.Write(logs)
|
||||
err = lgr.Write(ctx, logs)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// call close twice noop
|
||||
@ -58,6 +60,7 @@ func TestFilesystemLogger(t *testing.T) {
|
||||
}
|
||||
|
||||
func BenchmarkFilesystemLogger(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
tempPath, err := ioutil.TempDir("", "test")
|
||||
if err != nil {
|
||||
b.Fatal("temp dir failed", err)
|
||||
@ -78,7 +81,7 @@ func BenchmarkFilesystemLogger(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := lgr.Write(logs)
|
||||
err := lgr.Write(ctx, logs)
|
||||
if err != nil {
|
||||
b.Fatal("write failed ", err)
|
||||
}
|
||||
@ -90,6 +93,7 @@ func BenchmarkFilesystemLogger(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkLumberjack(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
tempPath, err := ioutil.TempDir("", "test")
|
||||
if err != nil {
|
||||
b.Fatal("temp dir failed", err)
|
||||
@ -109,14 +113,14 @@ func BenchmarkLumberjack(b *testing.B) {
|
||||
}
|
||||
// first lumberjack write opens file so we count that as part of initialization
|
||||
// just to make sure we're comparing apples to apples with our logger
|
||||
err = lgr.Write(logs)
|
||||
err = lgr.Write(ctx, logs)
|
||||
if err != nil {
|
||||
b.Fatal("first write failed ", err)
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := lgr.Write(logs)
|
||||
err := lgr.Write(ctx, logs)
|
||||
if err != nil {
|
||||
b.Fatal("write failed ", err)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"time"
|
||||
@ -78,7 +79,7 @@ func (f *firehoseLogWriter) validateStream() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *firehoseLogWriter) Write(logs []json.RawMessage) error {
|
||||
func (f *firehoseLogWriter) Write(ctx context.Context, logs []json.RawMessage) error {
|
||||
var records []*firehose.Record
|
||||
totalBytes := 0
|
||||
for _, log := range logs {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
@ -44,6 +45,7 @@ func getLogsFromInput(input *firehose.PutRecordBatchInput) []json.RawMessage {
|
||||
}
|
||||
|
||||
func TestFirehoseNonRetryableFailure(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
callCount := 0
|
||||
putFunc := func(*firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
|
||||
callCount += 1
|
||||
@ -51,12 +53,13 @@ func TestFirehoseNonRetryableFailure(t *testing.T) {
|
||||
}
|
||||
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 1, callCount)
|
||||
}
|
||||
|
||||
func TestFirehoseRetryableFailure(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
callCount := 0
|
||||
putFunc := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
|
||||
callCount += 1
|
||||
@ -72,12 +75,13 @@ func TestFirehoseRetryableFailure(t *testing.T) {
|
||||
}
|
||||
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 3, callCount)
|
||||
}
|
||||
|
||||
func TestFirehoseNormalPut(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
callCount := 0
|
||||
putFunc := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
|
||||
callCount += 1
|
||||
@ -87,12 +91,13 @@ func TestFirehoseNormalPut(t *testing.T) {
|
||||
}
|
||||
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, callCount)
|
||||
}
|
||||
|
||||
func TestFirehoseSomeFailures(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
f := &mock.FirehoseMock{}
|
||||
callCount := 0
|
||||
|
||||
@ -145,12 +150,13 @@ func TestFirehoseSomeFailures(t *testing.T) {
|
||||
}
|
||||
f.PutRecordBatchFunc = call1
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, callCount)
|
||||
}
|
||||
|
||||
func TestFirehoseFailAllRecords(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
f := &mock.FirehoseMock{}
|
||||
callCount := 0
|
||||
|
||||
@ -180,12 +186,13 @@ func TestFirehoseFailAllRecords(t *testing.T) {
|
||||
}
|
||||
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 3, callCount)
|
||||
}
|
||||
|
||||
func TestFirehoseRecordTooBig(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
newLogs := make([]json.RawMessage, len(logs))
|
||||
copy(newLogs, logs)
|
||||
logs[0] = make(json.RawMessage, firehoseMaxSizeOfRecord+1, firehoseMaxSizeOfRecord+1)
|
||||
@ -198,12 +205,13 @@ func TestFirehoseRecordTooBig(t *testing.T) {
|
||||
}
|
||||
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, callCount)
|
||||
}
|
||||
|
||||
func TestFirehoseSplitBatchBySize(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
// Make each record just under 1 MB so that it takes 3 total batches of
|
||||
// just under 4 MB each
|
||||
logs := make([]json.RawMessage, 12)
|
||||
@ -219,12 +227,13 @@ func TestFirehoseSplitBatchBySize(t *testing.T) {
|
||||
}
|
||||
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, callCount)
|
||||
}
|
||||
|
||||
func TestFirehoseSplitBatchByCount(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logs := make([]json.RawMessage, 2000)
|
||||
for i := 0; i < len(logs); i++ {
|
||||
logs[i] = json.RawMessage(`{}`)
|
||||
@ -238,7 +247,7 @@ func TestFirehoseSplitBatchByCount(t *testing.T) {
|
||||
}
|
||||
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
|
||||
writer := makeFirehoseWriterWithMock(f, "foobar")
|
||||
err := writer.Write(logs)
|
||||
err := writer.Write(ctx, logs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 4, callCount)
|
||||
}
|
||||
|
@ -44,6 +44,15 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create firehose status logger")
|
||||
}
|
||||
case "pubsub":
|
||||
status, err = NewPubSubLogWriter(
|
||||
config.PubSub.Project,
|
||||
config.PubSub.StatusTopic,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create pubsub status logger")
|
||||
}
|
||||
default:
|
||||
return nil, errors.Errorf(
|
||||
"unknown status log plugin: %s", config.Osquery.StatusLogPlugin,
|
||||
@ -75,6 +84,15 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create firehose result logger")
|
||||
}
|
||||
case "pubsub":
|
||||
result, err = NewPubSubLogWriter(
|
||||
config.PubSub.Project,
|
||||
config.PubSub.ResultTopic,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create pubsub result logger")
|
||||
}
|
||||
default:
|
||||
return nil, errors.Errorf(
|
||||
"unknown result log plugin: %s", config.Osquery.StatusLogPlugin,
|
||||
|
75
server/logging/pubsub.go
Normal file
75
server/logging/pubsub.go
Normal file
@ -0,0 +1,75 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"cloud.google.com/go/pubsub"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type pubSubLogWriter struct {
|
||||
topic *pubsub.Topic
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func NewPubSubLogWriter(projectId string, topicName string, logger log.Logger) (*pubSubLogWriter, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
client, err := pubsub.NewClient(ctx, projectId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create pubsub client")
|
||||
}
|
||||
|
||||
topic := client.Topic(topicName)
|
||||
|
||||
level.Info(logger).Log(
|
||||
"msg", "GCP PubSub writer configured",
|
||||
"project", projectId,
|
||||
"topic", topicName,
|
||||
)
|
||||
|
||||
return &pubSubLogWriter{
|
||||
topic: topic,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *pubSubLogWriter) Write(ctx context.Context, logs []json.RawMessage) error {
|
||||
results := make([]*pubsub.PublishResult, len(logs))
|
||||
|
||||
// Add all of the messages to the global pubsub queue
|
||||
for i, log := range logs {
|
||||
data, err := log.MarshalJSON()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "marshal message into JSON")
|
||||
}
|
||||
|
||||
if len(data) > pubsub.MaxPublishRequestBytes {
|
||||
level.Info(w.logger).Log(
|
||||
"msg", "dropping log over 10MB PubSub limit",
|
||||
"size", len(data),
|
||||
"log", string(log[:100])+"...",
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
message := &pubsub.Message{
|
||||
Data: data,
|
||||
}
|
||||
|
||||
results[i] = w.topic.Publish(ctx, message)
|
||||
}
|
||||
|
||||
// Wait for each message to be pushed to the server
|
||||
for _, result := range results {
|
||||
_, err := result.Get(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "pubsub publish")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -211,14 +211,14 @@ func (svc service) GetClientConfig(ctx context.Context) (map[string]interface{},
|
||||
}
|
||||
|
||||
func (svc service) SubmitStatusLogs(ctx context.Context, logs []json.RawMessage) error {
|
||||
if err := svc.osqueryLogWriter.Status.Write(logs); err != nil {
|
||||
if err := svc.osqueryLogWriter.Status.Write(ctx, logs); err != nil {
|
||||
return osqueryError{message: "error writing status logs: " + err.Error()}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc service) SubmitResultLogs(ctx context.Context, logs []json.RawMessage) error {
|
||||
if err := svc.osqueryLogWriter.Result.Write(logs); err != nil {
|
||||
if err := svc.osqueryLogWriter.Result.Write(ctx, logs); err != nil {
|
||||
return osqueryError{message: "error writing result logs: " + err.Error()}
|
||||
}
|
||||
return nil
|
||||
@ -575,8 +575,8 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
|
||||
// Write the results to the pubsub store
|
||||
res := kolide.DistributedQueryResult{
|
||||
DistributedQueryCampaignID: uint(campaignID),
|
||||
Host: host,
|
||||
Rows: rows,
|
||||
Host: host,
|
||||
Rows: rows,
|
||||
}
|
||||
if failed {
|
||||
// osquery errors are not currently helpful, but we should fix
|
||||
@ -612,9 +612,9 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
|
||||
status = kolide.ExecutionFailed
|
||||
}
|
||||
exec := &kolide.DistributedQueryExecution{
|
||||
HostID: host.ID,
|
||||
HostID: host.ID,
|
||||
DistributedQueryCampaignID: uint(campaignID),
|
||||
Status: status,
|
||||
Status: status,
|
||||
}
|
||||
|
||||
_, err = svc.ds.NewDistributedQueryExecution(exec)
|
||||
|
@ -121,7 +121,7 @@ type testJSONLogger struct {
|
||||
logs []json.RawMessage
|
||||
}
|
||||
|
||||
func (n *testJSONLogger) Write(logs []json.RawMessage) error {
|
||||
func (n *testJSONLogger) Write(ctx context.Context, logs []json.RawMessage) error {
|
||||
n.logs = logs
|
||||
return nil
|
||||
}
|
||||
@ -859,12 +859,12 @@ func TestNewDistributedQueryCampaign(t *testing.T) {
|
||||
assert.Equal(t, gotQuery.ID, gotCampaign.QueryID)
|
||||
assert.Equal(t, []*kolide.DistributedQueryCampaignTarget{
|
||||
&kolide.DistributedQueryCampaignTarget{
|
||||
Type: kolide.TargetHost,
|
||||
Type: kolide.TargetHost,
|
||||
DistributedQueryCampaignID: campaign.ID,
|
||||
TargetID: 2,
|
||||
},
|
||||
&kolide.DistributedQueryCampaignTarget{
|
||||
Type: kolide.TargetLabel,
|
||||
Type: kolide.TargetLabel,
|
||||
DistributedQueryCampaignID: campaign.ID,
|
||||
TargetID: 1,
|
||||
},
|
||||
|
@ -1,6 +1,7 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -17,6 +18,7 @@ import (
|
||||
// TestRotateLoggerSIGHUP verifies that the osqueryd logfile is rotated by
|
||||
// sending a SIGHUP signal.
|
||||
func TestRotateLoggerSIGHUP(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
filePrefix := "kolide-log-rotate-test"
|
||||
f, err := ioutil.TempFile("/tmp", filePrefix)
|
||||
require.Nil(t, err)
|
||||
@ -25,7 +27,7 @@ func TestRotateLoggerSIGHUP(t *testing.T) {
|
||||
logFile, err := logging.NewFilesystemLogWriter(f.Name(), log.NewNopLogger(), true)
|
||||
|
||||
// write a log line
|
||||
logFile.Write([]json.RawMessage{json.RawMessage("msg1")})
|
||||
logFile.Write(ctx, []json.RawMessage{json.RawMessage("msg1")})
|
||||
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGHUP)
|
||||
@ -42,7 +44,7 @@ func TestRotateLoggerSIGHUP(t *testing.T) {
|
||||
|
||||
// write a new log line and verify that the original file includes
|
||||
// the new log line but not any of the old ones.
|
||||
logFile.Write([]json.RawMessage{json.RawMessage("msg2")})
|
||||
logFile.Write(ctx, []json.RawMessage{json.RawMessage("msg2")})
|
||||
logMsg, err := ioutil.ReadFile(f.Name())
|
||||
require.Nil(t, err)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user