fleet/server/logging/firehose_test.go
Zachary Wasserman e59714242e
Add Firehose logging capabilities for result and status logs (#2022)
- Refactor configuration for logging to use separate plugins
- Move existing filesystem logging to filesystem plugin
- Create new AWS firehose plugin
- Update documentation around logging
2019-04-08 11:47:15 -07:00

284 lines
9.1 KiB
Go

package logging
import (
"encoding/json"
"errors"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/service/firehose/firehoseiface"
"github.com/go-kit/kit/log"
"github.com/kolide/fleet/server/logging/mock"
"github.com/stretchr/testify/assert"
)
var (
logs = []json.RawMessage{
json.RawMessage(`{"foo": "bar"}`),
json.RawMessage(`{"flim": "flam"}`),
json.RawMessage(`{"jim": "jom"}`),
}
)
func makeFirehoseWriterWithMock(client firehoseiface.FirehoseAPI, stream string) *firehoseLogWriter {
return &firehoseLogWriter{
client: client,
stream: stream,
logger: log.NewNopLogger(),
}
}
func getLogsFromInput(input *firehose.PutRecordBatchInput) []json.RawMessage {
var logs []json.RawMessage
for _, record := range input.Records {
logs = append(logs, record.Data)
}
return logs
}
func TestFirehoseNonRetryableFailure(t *testing.T) {
callCount := 0
putFunc := func(*firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
callCount += 1
return nil, errors.New("generic error")
}
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.Error(t, err)
assert.Equal(t, 1, callCount)
}
func TestFirehoseRetryableFailure(t *testing.T) {
callCount := 0
putFunc := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
callCount += 1
assert.Equal(t, logs, getLogsFromInput(input))
assert.Equal(t, "foobar", *input.DeliveryStreamName)
if callCount < 3 {
return nil, awserr.New(firehose.ErrCodeServiceUnavailableException, "", nil)
} else {
// Returning a non-retryable error earlier helps keep
// this test faster
return nil, errors.New("generic error")
}
}
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.Error(t, err)
assert.Equal(t, 3, callCount)
}
func TestFirehoseNormalPut(t *testing.T) {
callCount := 0
putFunc := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
callCount += 1
assert.Equal(t, logs, getLogsFromInput(input))
assert.Equal(t, "foobar", *input.DeliveryStreamName)
return &firehose.PutRecordBatchOutput{FailedPutCount: aws.Int64(0)}, nil
}
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.NoError(t, err)
assert.Equal(t, 1, callCount)
}
func TestFirehoseSomeFailures(t *testing.T) {
f := &mock.FirehoseMock{}
callCount := 0
call3 := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
// final invocation
callCount += 1
assert.Equal(t, logs[1:2], getLogsFromInput(input))
return &firehose.PutRecordBatchOutput{
FailedPutCount: aws.Int64(0),
}, nil
}
call2 := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
// Set to invoke call3 next time
f.PutRecordBatchFunc = call3
callCount += 1
assert.Equal(t, logs[1:], getLogsFromInput(input))
return &firehose.PutRecordBatchOutput{
FailedPutCount: aws.Int64(1),
RequestResponses: []*firehose.PutRecordBatchResponseEntry{
&firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("error"),
},
&firehose.PutRecordBatchResponseEntry{
RecordId: aws.String("foo"),
},
},
}, nil
}
call1 := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
// Use call2 function for next call
f.PutRecordBatchFunc = call2
callCount += 1
assert.Equal(t, logs, getLogsFromInput(input))
return &firehose.PutRecordBatchOutput{
FailedPutCount: aws.Int64(1),
RequestResponses: []*firehose.PutRecordBatchResponseEntry{
&firehose.PutRecordBatchResponseEntry{
RecordId: aws.String("foo"),
},
&firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("error"),
},
&firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("error"),
},
},
}, nil
}
f.PutRecordBatchFunc = call1
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.NoError(t, err)
assert.Equal(t, 3, callCount)
}
func TestFirehoseFailAllRecords(t *testing.T) {
f := &mock.FirehoseMock{}
callCount := 0
f.PutRecordBatchFunc = func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
callCount += 1
assert.Equal(t, logs, getLogsFromInput(input))
if callCount < 3 {
return &firehose.PutRecordBatchOutput{
FailedPutCount: aws.Int64(1),
RequestResponses: []*firehose.PutRecordBatchResponseEntry{
&firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("error"),
},
&firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("error"),
},
&firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("error"),
},
},
}, nil
} else {
// Make test quicker by returning non-retryable error
// before all retries are exhausted.
return nil, errors.New("generic error")
}
}
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.Error(t, err)
assert.Equal(t, 3, callCount)
}
func TestFirehoseRecordTooBig(t *testing.T) {
newLogs := make([]json.RawMessage, len(logs))
copy(newLogs, logs)
logs[0] = make(json.RawMessage, firehoseMaxSizeOfRecord+1, firehoseMaxSizeOfRecord+1)
callCount := 0
putFunc := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
callCount += 1
assert.Equal(t, logs[1:], getLogsFromInput(input))
assert.Equal(t, "foobar", *input.DeliveryStreamName)
return &firehose.PutRecordBatchOutput{FailedPutCount: aws.Int64(0)}, nil
}
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.NoError(t, err)
assert.Equal(t, 1, callCount)
}
func TestFirehoseSplitBatchBySize(t *testing.T) {
// Make each record just under 1 MB so that it takes 3 total batches of
// just under 4 MB each
logs := make([]json.RawMessage, 12)
for i := 0; i < len(logs); i++ {
logs[i] = make(json.RawMessage, firehoseMaxSizeOfRecord-1, firehoseMaxSizeOfRecord-1)
}
callCount := 0
putFunc := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
callCount += 1
assert.Len(t, getLogsFromInput(input), 4)
assert.Equal(t, "foobar", *input.DeliveryStreamName)
return &firehose.PutRecordBatchOutput{FailedPutCount: aws.Int64(0)}, nil
}
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.NoError(t, err)
assert.Equal(t, 3, callCount)
}
func TestFirehoseSplitBatchByCount(t *testing.T) {
logs := make([]json.RawMessage, 2000)
for i := 0; i < len(logs); i++ {
logs[i] = json.RawMessage(`{}`)
}
callCount := 0
putFunc := func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
callCount += 1
assert.Len(t, getLogsFromInput(input), 500)
assert.Equal(t, "foobar", *input.DeliveryStreamName)
return &firehose.PutRecordBatchOutput{FailedPutCount: aws.Int64(0)}, nil
}
f := &mock.FirehoseMock{PutRecordBatchFunc: putFunc}
writer := makeFirehoseWriterWithMock(f, "foobar")
err := writer.Write(logs)
assert.NoError(t, err)
assert.Equal(t, 4, callCount)
}
func TestFirehoseValidateStreamActive(t *testing.T) {
describeFunc := func(input *firehose.DescribeDeliveryStreamInput) (*firehose.DescribeDeliveryStreamOutput, error) {
assert.Equal(t, "test", *input.DeliveryStreamName)
return &firehose.DescribeDeliveryStreamOutput{
DeliveryStreamDescription: &firehose.DeliveryStreamDescription{
DeliveryStreamStatus: aws.String(firehose.DeliveryStreamStatusActive),
},
}, nil
}
f := &mock.FirehoseMock{DescribeDeliveryStreamFunc: describeFunc}
writer := makeFirehoseWriterWithMock(f, "test")
err := writer.validateStream()
assert.NoError(t, err)
assert.True(t, f.DescribeDeliveryStreamFuncInvoked)
}
func TestFirehoseValidateStreamNotActive(t *testing.T) {
describeFunc := func(input *firehose.DescribeDeliveryStreamInput) (*firehose.DescribeDeliveryStreamOutput, error) {
assert.Equal(t, "test", *input.DeliveryStreamName)
return &firehose.DescribeDeliveryStreamOutput{
DeliveryStreamDescription: &firehose.DeliveryStreamDescription{
DeliveryStreamStatus: aws.String(firehose.DeliveryStreamStatusCreating),
},
}, nil
}
f := &mock.FirehoseMock{DescribeDeliveryStreamFunc: describeFunc}
writer := makeFirehoseWriterWithMock(f, "test")
err := writer.validateStream()
assert.Error(t, err)
assert.True(t, f.DescribeDeliveryStreamFuncInvoked)
}
func TestFirehoseValidateStreamError(t *testing.T) {
describeFunc := func(input *firehose.DescribeDeliveryStreamInput) (*firehose.DescribeDeliveryStreamOutput, error) {
assert.Equal(t, "test", *input.DeliveryStreamName)
return nil, errors.New("boom!")
}
f := &mock.FirehoseMock{DescribeDeliveryStreamFunc: describeFunc}
writer := makeFirehoseWriterWithMock(f, "test")
err := writer.validateStream()
assert.Error(t, err)
assert.True(t, f.DescribeDeliveryStreamFuncInvoked)
}