Add AWS Lambda as logging plugin (#347)

This plugin invokes the provided function with each log line as the
payload.

Closes #342
This commit is contained in:
Zach Wasserman 2021-02-24 10:02:26 -08:00 committed by GitHub
parent e8b510a108
commit de0b3324b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 470 additions and 7 deletions

View File

@ -62,7 +62,7 @@ jobs:
"helm-temp/output-defaults.yaml" \
'FLEET_FILESYSTEM_STATUS_LOG_FILE FLEET_FILESYSTEM_RESULT_LOG_FILE FLEET_FILESYSTEM_ENABLE_LOG_ROTATION FLEET_FILESYSTEM_ENABLE_LOG_COMPRESSION' \
'fleet-tls osquery-logs'
- name: check pubusb values
- name: check pubsub values
run: |
.github/scripts/helm-check-expected.sh \
"helm-temp/logger-pubsub.yaml" \

View File

@ -188,6 +188,7 @@ spec:
- name: FLEET_FILESYSTEM_ENABLE_LOG_COMPRESSION
value: "{{ .Values.osquery.logging.filesystem.enableCompression }}"
{{- end }}
{{- if or (eq .Values.osquery.logging.statusPlugin "firehose") (eq .Values.osquery.logging.resultPlugin "firehose") }}
- name: FLEET_FIREHOSE_REGION
value: "{{ .Values.osquery.logging.firehose.region }}"
@ -238,6 +239,32 @@ spec:
{{- end }}
{{- end }}
{{- if or (eq .Values.osquery.logging.statusPlugin "lambda") (eq .Values.osquery.logging.resultPlugin "lambda") }}
- name: FLEET_LAMBDA_REGION
value: "{{ .Values.osquery.logging.lambda.region }}"
{{- if eq .Values.osquery.logging.statusPlugin "lambda" }}
- name: FLEET_LAMBDA_STATUS_FUNCTION
value: "{{ .Values.osquery.logging.lambda.statusFunction }}"
{{- end }}
{{- if eq .Values.osquery.logging.resultPlugin "lambda" }}
- name: FLEET_LAMBDA_RESULT_FUNCTION
value: "{{ .Values.osquery.logging.lambda.resultFunction }}"
{{- end }}
{{- if ne .Values.osquery.logging.lambda.accessKeyID "" }}
- name: FLEET_LAMBDA_ACCESS_KEY_ID
value: "{{ .Values.osquery.logging.lambda.accessKeyID }}"
- name: FLEET_LAMBDA_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: "{{ .Values.osquery.secretName }}"
key: "{{ .Values.osquery.logging.lambda.secretKey }}"
{{ else }}
- name: FLEET_LAMBDA_STS_ASSUME_ROLE_ARN
value: "{{ .Values.osquery.logging.lambda.stsAssumeRoleARN }}"
{{- end }}
{{- end }}
{{- if or (eq .Values.osquery.logging.statusPlugin "pubsub") (eq .Values.osquery.logging.resultPlugin "pubsub") }}
- name: FLEET_PUBSUB_PROJECT
value: "{{ .Values.osquery.logging.pubsub.project }}"

View File

@ -91,6 +91,15 @@ osquery:
statusStream: ""
resultStream: ""
# To configure the AWS Lambda logger, change the values below
lambda:
region: ""
accessKeyID: ""
secretKey: lambda
stsAssumeRoleARN: ""
statusFunction: ""
resultFunction: ""
# To configure the GCP PubSub logger, change the values below
pubsub:
project: ""

View File

@ -3,6 +3,7 @@
- [Filesystem](#filesystem)
- [Firehose](#firehose)
- [Kinesis](#kinesis)
- [Lambda](#lambda)
- [PubSub](#pubsub)
- [Stdout](#stdout)
@ -17,6 +18,7 @@ Fleet supports the following logging plugins for osquery logs:
- [Filesystem](#filesystem) - Logs are written to the local Fleet server filesystem.
- [Firehose](#firehose) - Logs are written to AWS Firehose streams.
- [Kinesis](#kinesis) - Logs are written to AWS Kinesis streams.
- [Lambda](#lambda) - Logs are written to AWS Lambda functions.
- [PubSub](#pubsub) - Logs are written to Google Cloud PubSub topics.
- [Stdout](#stdout) - Logs are written to stdout.
@ -53,6 +55,21 @@ documentation](https://docs.aws.amazon.com/kinesis/latest/dev/limits.html).
When Fleet encounters logs that are too big for Kinesis, notifications will be
output in the Fleet logs and those logs _will not_ be sent to Kinesis.
### Lambda
- Plugin name: `lambda`
- Flag namespace: [lambda](../2-Deployment/2-Configuration.md#lambda)
With the Lambda plugin, osquery result and/or status logs are written to
[AWS Lambda](https://aws.amazon.com/lambda/) functions.
Lambda processes logs from Fleet synchronously, so the Lambda function used must not take enough processing time that the osquery client times out while writing logs. If there is heavy processing to be done, use Lambda to store the logs in another datastore/queue before performing the long-running process.
Note that Lambda logging has limits [discussed in the
documentation](https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html). The maximum size of a log sent to Lambda is 6MB.
When Fleet encounters logs that are too big for Lambda, notifications will be
output in the Fleet logs and those logs _will not_ be sent to Lambda.
### PubSub
- Plugin name: `pubsub`

View File

@ -579,7 +579,7 @@ Setting this to a higher value can reduce baseline load on the Fleet server in l
Which log output plugin should be used for osquery status logs received from clients.
Options are `filesystem`, `firehose`, `kinesis`, `pubsub`, and `stdout`.
Options are `filesystem`, `firehose`, `kinesis`, `lambda`, `pubsub`, and `stdout`.
- Default value: `filesystem`
- Environment variable: `FLEET_OSQUERY_STATUS_LOG_PLUGIN`
@ -594,7 +594,7 @@ Options are `filesystem`, `firehose`, `kinesis`, `pubsub`, and `stdout`.
Which log output plugin should be used for osquery result logs received from clients.
Options are `filesystem`, `firehose`, `kinesis`, `pubsub`, and `stdout`.
Options are `filesystem`, `firehose`, `kinesis`, `lambda`, `pubsub`, and `stdout`.
- Default value: `filesystem`
- Environment variable: `FLEET_OSQUERY_RESULT_LOG_PLUGIN`
@ -974,6 +974,117 @@ the stream listed:
* `kinesis:DescribeStream`
* `kinesis:PutRecords`
##### Lambda
###### `lambda_region`
This flag only has effect if `osquery_status_log_plugin` is set to `lambda`.
AWS region to use for Lambda connection
- Default value: none
- Environment variable: `FLEET_LAMBDA_REGION`
- Config file format:
```
lambda:
region: ca-central-1
```
###### `lambda_access_key_id`
This flag only has effect if `osquery_status_log_plugin` or
`osquery_result_log_plugin` are set to `lambda`.
If `lambda_access_key_id` and `lambda_secret_access_key` are omitted, Fleet
will try to use
[AWS STS](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html)
credentials.
AWS access key ID to use for Lambda authentication.
- Default value: none
- Environment variable: `FLEET_LAMBDA_ACCESS_KEY_ID`
- Config file format:
```
lambda:
access_key_id: AKIAIOSFODNN7EXAMPLE
```
###### `lambda_secret_access_key`
This flag only has effect if `osquery_status_log_plugin` or
`osquery_result_log_plugin` are set to `lambda`.
AWS secret access key to use for Lambda authentication.
- Default value: none
- Environment variable: `FLEET_LAMBDA_SECRET_ACCESS_KEY`
- Config file format:
```
lambda:
secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
```
###### `lambda_sts_assume_role_arn`
This flag only has effect if `osquery_status_log_plugin` or
`osquery_result_log_plugin` are set to `lambda`.
AWS STS role ARN to use for Lambda authentication.
- Default value: none
- Environment variable: `FLEET_LAMBDA_STS_ASSUME_ROLE_ARN`
- Config file format:
```
lambda:
sts_assume_role_arn: arn:aws:iam::1234567890:role/lambda-role
```
###### `lambda_status_function`
This flag only has effect if `osquery_status_log_plugin` is set to `lambda`.
Name of the Lambda function to write osquery status logs received from clients.
- Default value: none
- Environment variable: `FLEET_LAMBDA_STATUS_FUNCTION`
- Config file format:
```
lambda:
status_function: statusFunction
```
The IAM role used to send to Lambda must allow the following permissions on
the function listed:
* `lambda:InvokeFunction`
###### `lambda_result_function`
This flag only has effect if `osquery_result_log_plugin` is set to `lambda`.
Name of the Lambda function to write osquery result logs received from clients.
- Default value: none
- Environment variable: `FLEET_LAMBDA_RESULT_FUNCTION`
- Config file format:
```
lambda:
result_function: resultFunction
```
The IAM role used to send to Lambda must allow the following permissions on
the function listed:
* `lambda:InvokeFunction`
##### PubSub
###### `pubsub_project`

View File

@ -116,6 +116,16 @@ type KinesisConfig struct {
ResultStream string `yaml:"result_stream"`
}
// LambdaConfig defines configs for the AWS Lambda logging plugin
type LambdaConfig struct {
Region string
AccessKeyID string `yaml:"access_key_id"`
SecretAccessKey string `yaml:"secret_access_key"`
StsAssumeRoleArn string `yaml:"sts_assume_role_arn"`
StatusFunction string `yaml:"status_function"`
ResultFunction string `yaml:"result_function"`
}
// S3Config defines config to enable file carving storage to an S3 bucket
type S3Config struct {
Bucket string
@ -155,6 +165,7 @@ type KolideConfig struct {
Logging LoggingConfig
Firehose FirehoseConfig
Kinesis KinesisConfig
Lambda LambdaConfig
S3 S3Config
PubSub PubSubConfig
Filesystem FilesystemConfig
@ -286,6 +297,17 @@ func (man Manager) addConfigs() {
man.addConfigString("kinesis.result_stream", "",
"Kinesis stream name for result logs")
// Lambda
man.addConfigString("lambda.region", "", "AWS Region to use")
man.addConfigString("lambda.access_key_id", "", "Access Key ID for AWS authentication")
man.addConfigString("lambda.secret_access_key", "", "Secret Access Key for AWS authentication")
man.addConfigString("lambda.sts_assume_role_arn", "",
"ARN of role to assume for AWS")
man.addConfigString("lambda.status_function", "",
"Lambda function name for status logs")
man.addConfigString("lambda.result_function", "",
"Lambda function name for result logs")
// S3 for file carving
man.addConfigString("s3.bucket", "", "Bucket where to store file carves")
man.addConfigString("s3.prefix", "", "Prefix under which carves are stored")
@ -413,6 +435,14 @@ func (man Manager) LoadConfig() KolideConfig {
ResultStream: man.getConfigString("kinesis.result_stream"),
StsAssumeRoleArn: man.getConfigString("kinesis.sts_assume_role_arn"),
},
Lambda: LambdaConfig{
Region: man.getConfigString("lambda.region"),
AccessKeyID: man.getConfigString("lambda.access_key_id"),
SecretAccessKey: man.getConfigString("lambda.secret_access_key"),
StatusFunction: man.getConfigString("lambda.status_function"),
ResultFunction: man.getConfigString("lambda.result_function"),
StsAssumeRoleArn: man.getConfigString("lambda.sts_assume_role_arn"),
},
S3: S3Config{
Bucket: man.getConfigString("s3.bucket"),
Prefix: man.getConfigString("s3.prefix"),

126
server/logging/lambda.go Normal file
View File

@ -0,0 +1,126 @@
package logging
import (
"context"
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/service/lambda/lambdaiface"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
)
const (
// See
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
// for documentation on limits.
//
// (Payload size is lower for async requests)
lambdaMaxSizeOfPayload = 6 * 1000 * 1000 // 6MB
)
type lambdaLogWriter struct {
client lambdaiface.LambdaAPI
functionName string
logger log.Logger
}
func NewLambdaLogWriter(region, id, secret, stsAssumeRoleArn, functionName string, logger log.Logger) (*lambdaLogWriter, error) {
conf := &aws.Config{
Region: &region,
}
// Only provide static credentials if we have them
// otherwise use the default credentials provider chain
if id != "" && secret != "" {
conf.Credentials = credentials.NewStaticCredentials(id, secret, "")
}
sess, err := session.NewSession(conf)
if err != nil {
return nil, errors.Wrap(err, "create Lambda client")
}
if stsAssumeRoleArn != "" {
creds := stscreds.NewCredentials(sess, stsAssumeRoleArn)
conf.Credentials = creds
sess, err = session.NewSession(conf)
if err != nil {
return nil, errors.Wrap(err, "create Lambda client")
}
}
client := lambda.New(sess)
f := &lambdaLogWriter{
client: client,
functionName: functionName,
logger: logger,
}
if err := f.validateFunction(); err != nil {
return nil, errors.Wrap(err, "validate lambda")
}
return f, nil
}
func (f *lambdaLogWriter) validateFunction() error {
out, err := f.client.Invoke(
&lambda.InvokeInput{
FunctionName: &f.functionName,
InvocationType: aws.String("DryRun"),
},
)
if err != nil {
return errors.Wrapf(err, "dry run %s", f.functionName)
}
if out.FunctionError != nil {
return errors.Errorf(
"dry run %s function error: %s",
f.functionName,
*out.FunctionError,
)
}
return nil
}
func (f *lambdaLogWriter) Write(ctx context.Context, logs []json.RawMessage) error {
for _, log := range logs {
// We don't really have a good option for what to do with logs
// that are too big for Lambda. This behavior is consistent
// with other logging plugins.
if len(log) > lambdaMaxSizeOfPayload {
level.Info(f.logger).Log(
"msg", "dropping log over 6MB Lambda limit",
"size", len(log),
"log", string(log[:100])+"...",
)
continue
}
out, err := f.client.Invoke(
&lambda.InvokeInput{
FunctionName: &f.functionName,
Payload: []byte(log),
},
)
if err != nil {
return errors.Wrapf(err, "run %s", f.functionName)
}
if out.FunctionError != nil {
return errors.Errorf(
"run %s function error: %s",
f.functionName,
*out.FunctionError,
)
}
}
return nil
}

View File

@ -0,0 +1,81 @@
package logging
import (
"context"
"errors"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/service/lambda/lambdaiface"
"github.com/fleetdm/fleet/server/logging/mock"
"github.com/fleetdm/fleet/server/test"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
)
func makeLambdaWriterWithMock(client lambdaiface.LambdaAPI, functionName string) *lambdaLogWriter {
return &lambdaLogWriter{
client: client,
functionName: functionName,
logger: log.NewNopLogger(),
}
}
func TestLambdaValidateFunctionError(t *testing.T) {
m := &mock.LambdaMock{}
m.On("Invoke", &lambda.InvokeInput{FunctionName: aws.String("foobar"), InvocationType: aws.String("DryRun")}).
Return(nil, errors.New("failed"))
writer := makeLambdaWriterWithMock(m, "foobar")
err := writer.validateFunction()
assert.Error(t, err)
m.AssertExpectations(test.Quiet(t))
}
func TestLambdaValidateFunctionErrorFunction(t *testing.T) {
m := &mock.LambdaMock{}
m.On("Invoke", &lambda.InvokeInput{FunctionName: aws.String("foobar"), InvocationType: aws.String("DryRun")}).
Return(&lambda.InvokeOutput{FunctionError: aws.String("failed")}, nil)
writer := makeLambdaWriterWithMock(m, "foobar")
err := writer.validateFunction()
assert.Error(t, err)
m.AssertExpectations(test.Quiet(t))
}
func TestLambdaValidateFunctionSuccess(t *testing.T) {
m := &mock.LambdaMock{}
m.On("Invoke", &lambda.InvokeInput{FunctionName: aws.String("foobar"), InvocationType: aws.String("DryRun")}).
Return(&lambda.InvokeOutput{}, nil)
writer := makeLambdaWriterWithMock(m, "foobar")
err := writer.validateFunction()
assert.NoError(t, err)
m.AssertExpectations(test.Quiet(t))
}
func TestLambdaError(t *testing.T) {
m := &mock.LambdaMock{}
m.On("Invoke", tmock.MatchedBy(
func(in *lambda.InvokeInput) bool {
return *in.FunctionName == "foobar" && in.InvocationType == nil
},
)).Return(nil, errors.New("failed"))
writer := makeLambdaWriterWithMock(m, "foobar")
err := writer.Write(context.Background(), logs)
assert.Error(t, err)
m.AssertExpectations(test.Quiet(t))
}
func TestLambdaSuccess(t *testing.T) {
m := &mock.LambdaMock{}
m.On("Invoke", tmock.MatchedBy(
func(in *lambda.InvokeInput) bool {
return len(in.Payload) > 0 && *in.FunctionName == "foobar" && in.InvocationType == nil
},
)).Return(&lambda.InvokeOutput{}, nil).
Times(len(logs))
writer := makeLambdaWriterWithMock(m, "foobar")
err := writer.Write(context.Background(), logs)
assert.NoError(t, err)
m.AssertExpectations(test.Quiet(t))
}

View File

@ -3,10 +3,10 @@
package logging
import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/fleetdm/fleet/server/config"
"github.com/fleetdm/fleet/server/kolide"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
)
@ -22,7 +22,7 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
switch config.Osquery.StatusLogPlugin {
case "":
// Allow "" to mean filesystem for backwards compatibility
level.Info(logger).Log("msg", "kolide_status_log_plugin not explicitly specified. Assuming 'filesystem'")
level.Info(logger).Log("msg", "fleet_status_log_plugin not explicitly specified. Assuming 'filesystem'")
fallthrough
case "filesystem":
status, err = NewFilesystemLogWriter(
@ -58,6 +58,18 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
if err != nil {
return nil, errors.Wrap(err, "create kinesis status logger")
}
case "lambda":
status, err = NewLambdaLogWriter(
config.Lambda.Region,
config.Lambda.AccessKeyID,
config.Lambda.SecretAccessKey,
config.Lambda.StsAssumeRoleArn,
config.Lambda.StatusFunction,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "create lambda status logger")
}
case "pubsub":
status, err = NewPubSubLogWriter(
config.PubSub.Project,
@ -81,7 +93,7 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
switch config.Osquery.ResultLogPlugin {
case "":
// Allow "" to mean filesystem for backwards compatibility
level.Info(logger).Log("msg", "kolide_result_log_plugin not explicitly specified. Assuming 'filesystem'")
level.Info(logger).Log("msg", "fleet_result_log_plugin not explicitly specified. Assuming 'filesystem'")
fallthrough
case "filesystem":
result, err = NewFilesystemLogWriter(
@ -117,6 +129,18 @@ func New(config config.KolideConfig, logger log.Logger) (*OsqueryLogger, error)
if err != nil {
return nil, errors.Wrap(err, "create kinesis result logger")
}
case "lambda":
result, err = NewLambdaLogWriter(
config.Lambda.Region,
config.Lambda.AccessKeyID,
config.Lambda.SecretAccessKey,
config.Lambda.StsAssumeRoleArn,
config.Lambda.ResultFunction,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "create lambda result logger")
}
case "pubsub":
result, err = NewPubSubLogWriter(
config.PubSub.Project,

View File

@ -0,0 +1,21 @@
package mock
import (
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/service/lambda/lambdaiface"
"github.com/stretchr/testify/mock"
)
type LambdaMock struct {
mock.Mock
lambdaiface.LambdaAPI
}
func (l *LambdaMock) Invoke(input *lambda.InvokeInput) (*lambda.InvokeOutput, error) {
args := l.Called(input)
out, err := args.Get(0), args.Error(1)
if out == nil {
return nil, err
}
return out.(*lambda.InvokeOutput), err
}

17
server/test/testingt.go Normal file
View File

@ -0,0 +1,17 @@
package test
import "testing"
type quiet struct {
*testing.T
}
// Quiet returns a wrapper around testing.T that silences Logf calls
func Quiet(t *testing.T) *quiet {
return &quiet{t}
}
func (q *quiet) Logf(format string, args ...interface{}) {
// No logging
return
}