2020-08-19 21:56:44 +00:00
|
|
|
package logging
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
2021-11-22 14:13:26 +00:00
|
|
|
"errors"
|
2020-09-27 16:44:55 +00:00
|
|
|
"fmt"
|
2020-08-19 21:56:44 +00:00
|
|
|
"math"
|
|
|
|
"math/rand"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
|
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
|
|
|
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
|
|
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
|
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
|
|
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
2021-11-22 14:13:26 +00:00
|
|
|
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
2020-08-19 21:56:44 +00:00
|
|
|
"github.com/go-kit/kit/log"
|
|
|
|
"github.com/go-kit/kit/log/level"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
kinesisMaxRetries = 8
|
|
|
|
|
|
|
|
// See
|
|
|
|
// https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords
|
|
|
|
// for documentation on limits.
|
|
|
|
kinesisMaxRecordsInBatch = 500
|
|
|
|
kinesisMaxSizeOfRecord = 1000 * 1000 // 1,000 KB
|
|
|
|
kinesisMaxSizeOfBatch = 5 * 1000 * 1000 // 5 MB
|
|
|
|
)
|
|
|
|
|
|
|
|
type kinesisLogWriter struct {
|
|
|
|
client kinesisiface.KinesisAPI
|
|
|
|
stream string
|
|
|
|
logger log.Logger
|
|
|
|
rand *rand.Rand
|
|
|
|
}
|
|
|
|
|
2021-07-30 15:45:49 +00:00
|
|
|
func NewKinesisLogWriter(region, endpointURL, id, secret, stsAssumeRoleArn, stream string, logger log.Logger) (*kinesisLogWriter, error) {
|
2020-08-19 21:56:44 +00:00
|
|
|
conf := &aws.Config{
|
2021-08-24 17:35:03 +00:00
|
|
|
Region: ®ion,
|
2021-07-30 15:45:49 +00:00
|
|
|
Endpoint: &endpointURL, // empty string or nil will use default values
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Only provide static credentials if we have them
|
|
|
|
// otherwise use the default credentials provider chain
|
|
|
|
if id != "" && secret != "" {
|
|
|
|
conf.Credentials = credentials.NewStaticCredentials(id, secret, "")
|
|
|
|
}
|
|
|
|
|
|
|
|
sess, err := session.NewSession(conf)
|
|
|
|
if err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return nil, fmt.Errorf("create Kinesis client: %w", err)
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if stsAssumeRoleArn != "" {
|
|
|
|
creds := stscreds.NewCredentials(sess, stsAssumeRoleArn)
|
|
|
|
conf.Credentials = creds
|
|
|
|
|
|
|
|
sess, err = session.NewSession(conf)
|
|
|
|
|
|
|
|
if err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return nil, fmt.Errorf("create Kinesis client: %w", err)
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
client := kinesis.New(sess)
|
|
|
|
|
|
|
|
// This will be used to generate random partition keys to balance
|
|
|
|
// records across Kinesis shards.
|
|
|
|
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
|
|
|
|
k := &kinesisLogWriter{
|
|
|
|
client: client,
|
|
|
|
stream: stream,
|
|
|
|
logger: logger,
|
|
|
|
rand: rand,
|
|
|
|
}
|
|
|
|
if err := k.validateStream(); err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return nil, fmt.Errorf("create Kinesis writer: %w", err)
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
return k, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (k *kinesisLogWriter) validateStream() error {
|
|
|
|
out, err := k.client.DescribeStream(
|
|
|
|
&kinesis.DescribeStreamInput{
|
|
|
|
StreamName: &k.stream,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
if err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return fmt.Errorf("describe stream %s: %w", k.stream, err)
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (*(*out.StreamDescription).StreamStatus) != kinesis.StreamStatusActive {
|
2021-11-22 14:13:26 +00:00
|
|
|
return fmt.Errorf("stream %s not active", k.stream)
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (k *kinesisLogWriter) Write(ctx context.Context, logs []json.RawMessage) error {
|
|
|
|
var records []*kinesis.PutRecordsRequestEntry
|
|
|
|
totalBytes := 0
|
|
|
|
for _, log := range logs {
|
2022-03-02 22:23:23 +00:00
|
|
|
// so we get nice NDJSON
|
|
|
|
log = append(log, '\n')
|
2020-08-19 21:56:44 +00:00
|
|
|
// Evenly distribute logs across shards by assigning each
|
|
|
|
// kinesis.PutRecordsRequestEntry a random partition key.
|
2020-09-27 16:44:55 +00:00
|
|
|
partitionKey := fmt.Sprint(k.rand.Intn(256))
|
2020-08-19 21:56:44 +00:00
|
|
|
|
|
|
|
// We don't really have a good option for what to do with logs
|
|
|
|
// that are too big for Kinesis. This behavior is consistent
|
|
|
|
// with osquery's behavior in the Kinesis logger plugin, and
|
|
|
|
// the beginning bytes of the log should help the Fleet admin
|
|
|
|
// diagnose the query generating huge results.
|
|
|
|
if len(log)+len(partitionKey) > kinesisMaxSizeOfRecord {
|
|
|
|
level.Info(k.logger).Log(
|
|
|
|
"msg", "dropping log over 1MB Kinesis limit",
|
|
|
|
"size", len(log),
|
|
|
|
"log", string(log[:100])+"...",
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// If adding this log will exceed the limit on number of
|
|
|
|
// records in the batch, or the limit on total size of the
|
|
|
|
// records in the batch, we need to push this batch before
|
|
|
|
// adding any more.
|
|
|
|
if len(records) >= kinesisMaxRecordsInBatch ||
|
|
|
|
totalBytes+len(log)+len(partitionKey) > kinesisMaxSizeOfBatch {
|
|
|
|
if err := k.putRecords(0, records); err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return ctxerr.Wrap(ctx, err, "put records")
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
totalBytes = 0
|
|
|
|
records = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
records = append(records, &kinesis.PutRecordsRequestEntry{Data: []byte(log), PartitionKey: aws.String(partitionKey)})
|
|
|
|
totalBytes += len(log) + len(partitionKey)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Push the final batch
|
|
|
|
if len(records) > 0 {
|
|
|
|
if err := k.putRecords(0, records); err != nil {
|
2021-11-22 14:13:26 +00:00
|
|
|
return ctxerr.Wrap(ctx, err, "put records")
|
2020-08-19 21:56:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (k *kinesisLogWriter) putRecords(try int, records []*kinesis.PutRecordsRequestEntry) error {
|
|
|
|
if try > 0 {
|
|
|
|
time.Sleep(100 * time.Millisecond * time.Duration(math.Pow(2.0, float64(try))))
|
|
|
|
}
|
|
|
|
input := &kinesis.PutRecordsInput{
|
|
|
|
StreamName: &k.stream,
|
|
|
|
Records: records,
|
|
|
|
}
|
|
|
|
|
|
|
|
output, err := k.client.PutRecords(input)
|
|
|
|
if err != nil {
|
2021-11-15 14:11:38 +00:00
|
|
|
var ae awserr.Error
|
|
|
|
if errors.As(err, &ae) {
|
2020-08-19 21:56:44 +00:00
|
|
|
if try < kinesisMaxRetries {
|
|
|
|
// Retry with backoff
|
|
|
|
return k.putRecords(try+1, records)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Not retryable or retries expired
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check errors on individual records
|
|
|
|
if output.FailedRecordCount != nil && *output.FailedRecordCount > 0 {
|
|
|
|
if try >= kinesisMaxRetries {
|
|
|
|
// Retrieve first error message to provide to user.
|
|
|
|
// There could be up to kinesisMaxRecordsInBatch
|
|
|
|
// errors here and we don't want to flood that.
|
|
|
|
var errMsg string
|
|
|
|
for _, record := range output.Records {
|
|
|
|
if record.ErrorCode != nil && record.ErrorMessage != nil {
|
|
|
|
errMsg = *record.ErrorMessage
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-22 14:13:26 +00:00
|
|
|
return fmt.Errorf(
|
2020-08-19 21:56:44 +00:00
|
|
|
"failed to put %d records, retries exhausted. First error: %s",
|
2021-07-12 17:18:02 +00:00
|
|
|
*output.FailedRecordCount, errMsg,
|
2020-08-19 21:56:44 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
var failedRecords []*kinesis.PutRecordsRequestEntry
|
|
|
|
// Collect failed records for retry
|
|
|
|
for i, record := range output.Records {
|
|
|
|
if record.ErrorCode != nil {
|
|
|
|
failedRecords = append(failedRecords, records[i])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return k.putRecords(try+1, failedRecords)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|