Fix 3568: Kinesis/Firehose record size check failure (#3599)

This commit is contained in:
Alessandro Gario 2017-08-27 20:01:52 +02:00 committed by Teddy Reed
parent 15485e3eb0
commit a3e4310188
9 changed files with 655 additions and 412 deletions

View File

@ -48,8 +48,7 @@ if(NOT SKIP_AWS AND NOT WINDOWS)
)
set(OSQUERY_LOGGER_AWS_PLUGIN_TESTS
"logger/plugins/tests/aws_firehose_tests.cpp"
"logger/plugins/tests/aws_kinesis_tests.cpp"
"logger/plugins/tests/aws_logger_tests.cpp"
)
ADD_OSQUERY_TEST_ADDITIONAL(${OSQUERY_LOGGER_AWS_PLUGIN_TESTS})

View File

@ -14,15 +14,12 @@
#include <aws/core/utils/Outcome.h>
#include <aws/firehose/model/PutRecordBatchRequest.h>
#include <aws/firehose/model/PutRecordBatchResponseEntry.h>
#include <aws/firehose/model/PutRecordBatchResult.h>
#include <aws/firehose/model/Record.h>
#include <osquery/flags.h>
#include <osquery/registry.h>
#include "osquery/logger/plugins/aws_firehose.h"
#include "osquery/utils/aws_util.h"
namespace osquery {
@ -32,21 +29,20 @@ FLAG(uint64,
aws_firehose_period,
10,
"Seconds between flushing logs to Firehose (default 10)");
FLAG(string, aws_firehose_stream, "", "Name of Firehose stream for logging")
// This is the max per AWS docs
const size_t FirehoseLogForwarder::kFirehoseMaxRecords = 500;
// Max size of log + partition key is 1MB. Max size of partition key is 256B.
const size_t FirehoseLogForwarder::kFirehoseMaxLogBytes = 1000000 - 256;
FLAG(string, aws_firehose_stream, "", "Name of Firehose stream for logging")
Status FirehoseLoggerPlugin::setUp() {
initAwsSdk();
forwarder_ = std::make_shared<FirehoseLogForwarder>();
forwarder_ = std::make_shared<FirehoseLogForwarder>(
"aws_firehose", FLAGS_aws_firehose_period, 500);
Status s = forwarder_->setUp();
if (!s.ok()) {
LOG(ERROR) << "Error initializing Firehose logger: " << s.getMessage();
return s;
}
Dispatcher::addService(forwarder_);
return Status(0);
}
@ -66,75 +62,7 @@ void FirehoseLoggerPlugin::init(const std::string& name,
logStatus(log);
}
Status FirehoseLogForwarder::send(std::vector<std::string>& log_data,
const std::string& log_type) {
std::vector<Aws::Firehose::Model::Record> records;
for (std::string& log : log_data) {
Status status = appendLogTypeToJson(log_type, log);
if (!status.ok()) {
LOG(ERROR)
<< "Failed to append log_type key to status log JSON in Firehose";
// To achieve behavior parity with TLS logger plugin, skip non-JSON
// content
continue;
}
if (log.size() + 1 > kFirehoseMaxLogBytes) {
LOG(ERROR) << "Firehose log too big, discarding!";
}
Aws::Firehose::Model::Record record;
auto buffer =
Aws::Utils::ByteBuffer((unsigned char*)log.c_str(), log.length() + 1);
// Firehose buffers together the individual records, so we must insert
// newlines here if we want newlines in the resultant files after Firehose
// processing. See http://goo.gl/Pz6XOj
buffer[log.length()] = '\n';
record.SetData(buffer);
records.emplace_back(std::move(record));
}
Aws::Firehose::Model::PutRecordBatchRequest request;
request.WithDeliveryStreamName(FLAGS_aws_firehose_stream)
.WithRecords(std::move(records));
Aws::Firehose::Model::PutRecordBatchOutcome outcome =
client_->PutRecordBatch(request);
if (!outcome.IsSuccess()) {
LOG(ERROR) << "Firehose write failed: " << outcome.GetError().GetMessage();
return Status(1, outcome.GetError().GetMessage());
}
Aws::Firehose::Model::PutRecordBatchResult result = outcome.GetResult();
if (result.GetFailedPutCount() != 0) {
for (const auto& record : result.GetRequestResponses()) {
if (!record.GetErrorMessage().empty()) {
VLOG(1) << "Firehose write for " << result.GetFailedPutCount() << " of "
<< result.GetRequestResponses().size()
<< " records failed with error " << record.GetErrorMessage();
return Status(1, record.GetErrorMessage());
}
}
}
VLOG(1) << "Successfully sent " << result.GetRequestResponses().size()
<< " logs to Firehose";
return Status(0);
}
Status FirehoseLogForwarder::setUp() {
Status s = BufferedLogForwarder::setUp();
if (!s.ok()) {
return s;
}
// Set up client
s = makeAWSClient<Aws::Firehose::FirehoseClient>(client_);
if (!s.ok()) {
return s;
}
Status FirehoseLogForwarder::internalSetup() {
if (FLAGS_aws_firehose_stream.empty()) {
return Status(1,
"Stream name must be specified with --aws_firehose_stream");
@ -145,4 +73,50 @@ Status FirehoseLogForwarder::setUp() {
<< FLAGS_aws_firehose_stream;
return Status(0);
}
FirehoseLogForwarder::Outcome FirehoseLogForwarder::internalSend(
const Batch& batch) {
Aws::Firehose::Model::PutRecordBatchRequest request;
request.WithDeliveryStreamName(FLAGS_aws_firehose_stream)
.WithRecords(std::move(batch));
return client_->PutRecordBatch(request);
}
void FirehoseLogForwarder::initializeRecord(
Record& record, Aws::Utils::ByteBuffer& buffer) const {
record.SetData(buffer);
}
size_t FirehoseLogForwarder::getMaxBytesPerRecord() const {
return 1000000U;
}
size_t FirehoseLogForwarder::getMaxRecordsPerBatch() const {
return 500U;
}
size_t FirehoseLogForwarder::getMaxBytesPerBatch() const {
return 4000000U;
}
size_t FirehoseLogForwarder::getMaxRetryCount() const {
return 100U;
}
size_t FirehoseLogForwarder::getInitialRetryDelay() const {
return 3000U;
}
bool FirehoseLogForwarder::appendNewlineSeparators() const {
return false;
}
size_t FirehoseLogForwarder::getFailedRecordCount(Outcome& outcome) const {
return static_cast<size_t>(outcome.GetResult().GetFailedPutCount());
}
FirehoseLogForwarder::Result FirehoseLogForwarder::getResult(
Outcome& outcome) const {
return outcome.GetResult().GetRequestResponses();
}
}

View File

@ -15,36 +15,49 @@
#include <vector>
#include <aws/firehose/FirehoseClient.h>
#include <aws/firehose/model/PutRecordBatchResponseEntry.h>
#include <aws/firehose/model/Record.h>
#include <osquery/core.h>
#include <osquery/dispatcher.h>
#include <osquery/logger.h>
#include "osquery/logger/plugins/buffered.h"
#include "osquery/logger/plugins/aws_log_forwarder.h"
namespace osquery {
DECLARE_uint64(aws_firehose_period);
class FirehoseLogForwarder : public BufferedLogForwarder {
private:
static const size_t kFirehoseMaxLogBytes;
static const size_t kFirehoseMaxRecords;
using IFirehoseLogForwarder = AwsLogForwarder<
Aws::Firehose::Model::Record,
Aws::Firehose::FirehoseClient,
Aws::Firehose::Model::PutRecordBatchOutcome,
Aws::Vector<Aws::Firehose::Model::PutRecordBatchResponseEntry>>;
class FirehoseLogForwarder final : public IFirehoseLogForwarder {
public:
FirehoseLogForwarder()
: BufferedLogForwarder("firehose",
std::chrono::seconds(FLAGS_aws_firehose_period),
kFirehoseMaxRecords) {}
Status setUp() override;
FirehoseLogForwarder(const std::string& name,
size_t log_period,
size_t max_lines)
: IFirehoseLogForwarder(name, log_period, max_lines) {}
protected:
Status send(std::vector<std::string>& log_data,
const std::string& log_type) override;
Status internalSetup() override;
Outcome internalSend(const Batch& batch) override;
void initializeRecord(Record& record,
Aws::Utils::ByteBuffer& buffer) const override;
size_t getMaxBytesPerRecord() const override;
size_t getMaxRecordsPerBatch() const override;
size_t getMaxBytesPerBatch() const override;
size_t getMaxRetryCount() const override;
size_t getInitialRetryDelay() const override;
bool appendNewlineSeparators() const override;
size_t getFailedRecordCount(Outcome& outcome) const override;
Result getResult(Outcome& outcome) const override;
private:
std::shared_ptr<Aws::Firehose::FirehoseClient> client_{nullptr};
FRIEND_TEST(FirehoseTests, test_send);
};

View File

@ -9,11 +9,14 @@
*/
#include <algorithm>
#include <chrono>
#include <iterator>
#include <thread>
#include <aws/core/utils/Outcome.h>
#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/model/PutRecordsRequestEntry.h>
#include <aws/kinesis/model/PutRecordsResult.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
@ -24,7 +27,6 @@
#include "osquery/core/process.h"
#include "osquery/logger/plugins/aws_kinesis.h"
#include "osquery/utils/aws_util.h"
namespace osquery {
@ -42,18 +44,10 @@ FLAG(bool,
false,
"Enable random kinesis partition keys");
// This is the max per AWS docs
const size_t KinesisLogForwarder::kKinesisMaxRecords = 500;
// Max size of log + partition key is 1MB. Max size of partition key is 256B.
const size_t KinesisLogForwarder::kKinesisMaxLogBytes = 1000000 - 256;
const size_t KinesisLogForwarder::kKinesisMaxRetryCount = 100;
const size_t KinesisLogForwarder::kKinesisInitialRetryDelay = 3000;
Status KinesisLoggerPlugin::setUp() {
initAwsSdk();
forwarder_ = std::make_shared<KinesisLogForwarder>();
forwarder_ = std::make_shared<KinesisLogForwarder>(
"aws_kinesis", FLAGS_aws_kinesis_period, 500);
Status s = forwarder_->setUp();
if (!s.ok()) {
LOG(ERROR) << "Error initializing Kinesis logger: " << s.getMessage();
@ -78,119 +72,7 @@ void KinesisLoggerPlugin::init(const std::string& name,
logStatus(log);
}
Status KinesisLogForwarder::send(std::vector<std::string>& log_data,
const std::string& log_type) {
size_t retry_count = kKinesisMaxRetryCount;
size_t retry_delay = kKinesisInitialRetryDelay;
size_t original_data_size = log_data.size();
// exit if we sent all the data
while (log_data.size() > 0) {
std::vector<Aws::Kinesis::Model::PutRecordsRequestEntry> entries;
std::vector<size_t> valid_log_data_indices;
size_t log_data_index = 0;
for (std::string& log : log_data) {
if (retry_count == kKinesisMaxRetryCount) {
// On first send attempt, append log_type to the JSON log content
// Other send attempts will be already have log_type appended
Status status = appendLogTypeToJson(log_type, log);
if (!status.ok()) {
LOG(ERROR)
<< "Failed to append log_type key to status log JSON in Kinesis";
log_data_index++;
continue;
}
}
if (log.size() > kKinesisMaxLogBytes) {
LOG(ERROR) << "Kinesis log too big, discarding!";
}
std::string record_partition_key = partition_key_;
if (FLAGS_aws_kinesis_random_partition_key) {
// Generate a random partition key for each record, ensuring that
// records are spread evenly across shards.
boost::uuids::uuid uuid = boost::uuids::random_generator()();
record_partition_key = boost::uuids::to_string(uuid);
}
Aws::Kinesis::Model::PutRecordsRequestEntry entry;
entry.WithPartitionKey(record_partition_key)
.WithData(Aws::Utils::ByteBuffer((unsigned char*)log.c_str(),
log.length()));
entries.emplace_back(std::move(entry));
valid_log_data_indices.push_back(log_data_index);
log_data_index++;
}
Aws::Kinesis::Model::PutRecordsRequest request;
request.WithStreamName(FLAGS_aws_kinesis_stream)
.WithRecords(std::move(entries));
Aws::Kinesis::Model::PutRecordsOutcome outcome =
client_->PutRecords(request);
if (!outcome.IsSuccess()) {
LOG(ERROR) << "Kinesis write failed: " << outcome.GetError().GetMessage();
return Status(1, outcome.GetError().GetMessage());
}
Aws::Kinesis::Model::PutRecordsResult result = outcome.GetResult();
VLOG(1) << "Successfully sent "
<< result.GetRecords().size() - result.GetFailedRecordCount()
<< " of " << result.GetRecords().size() << " logs to Kinesis";
if (result.GetFailedRecordCount() != 0) {
std::vector<std::string> resend;
std::string error_msg = "";
log_data_index = 0;
for (const auto& record : result.GetRecords()) {
if (!record.GetErrorMessage().empty()) {
size_t valid_log_data_index = valid_log_data_indices[log_data_index];
resend.push_back(log_data[valid_log_data_index]);
error_msg = record.GetErrorMessage();
}
log_data_index++;
}
// exit if we have tried too many times
// exit if all uploads fail right off the bat
// note, this will go back to the default logger batch retry code
if (retry_count == 0 ||
static_cast<int>(original_data_size) ==
result.GetFailedRecordCount()) {
LOG(ERROR) << "Kinesis write for " << result.GetFailedRecordCount()
<< " of " << result.GetRecords().size()
<< " records failed with error " << error_msg;
return Status(1, error_msg);
}
VLOG(1) << "Resending " << result.GetFailedRecordCount()
<< " records to Kinesis";
log_data = resend;
sleepFor(retry_delay);
} else {
log_data.clear();
}
--retry_count;
retry_delay += 1000;
}
return Status(0);
}
Status KinesisLogForwarder::setUp() {
Status s = BufferedLogForwarder::setUp();
if (!s.ok()) {
return s;
}
// Set up client
s = makeAWSClient<Aws::Kinesis::KinesisClient>(client_);
if (!s.ok()) {
return s;
}
Status KinesisLogForwarder::internalSetup() {
partition_key_ = getHostIdentifier();
if (FLAGS_aws_kinesis_stream.empty()) {
@ -199,6 +81,63 @@ Status KinesisLogForwarder::setUp() {
VLOG(1) << "Kinesis logging initialized with stream: "
<< FLAGS_aws_kinesis_stream;
return Status(0);
return Status(0, "OK");
}
KinesisLogForwarder::Outcome KinesisLogForwarder::internalSend(
const Batch& batch) {
Aws::Kinesis::Model::PutRecordsRequest request;
request.WithStreamName(FLAGS_aws_kinesis_stream).SetRecords(batch);
return client_->PutRecords(request);
}
void KinesisLogForwarder::initializeRecord(
Record& record, Aws::Utils::ByteBuffer& buffer) const {
std::string record_partition_key;
if (FLAGS_aws_kinesis_random_partition_key) {
// Generate a random partition key for each record, ensuring that
// records are spread evenly across shards.
boost::uuids::uuid uuid = boost::uuids::random_generator()();
record_partition_key = boost::uuids::to_string(uuid);
} else {
record_partition_key = partition_key_;
}
record.WithPartitionKey(record_partition_key).WithData(buffer);
}
size_t KinesisLogForwarder::getMaxBytesPerRecord() const {
// Max size of log + partition key is 1MB. Max size of partition key is 256B.
return (1000000U - 256U);
}
size_t KinesisLogForwarder::getMaxRecordsPerBatch() const {
return 500U;
}
size_t KinesisLogForwarder::getMaxBytesPerBatch() const {
return 5000000U;
}
size_t KinesisLogForwarder::getMaxRetryCount() const {
return 100U;
}
size_t KinesisLogForwarder::getInitialRetryDelay() const {
return 3000U;
}
bool KinesisLogForwarder::appendNewlineSeparators() const {
return false;
}
size_t KinesisLogForwarder::getFailedRecordCount(Outcome& outcome) const {
return static_cast<size_t>(outcome.GetResult().GetFailedRecordCount());
}
KinesisLogForwarder::Result KinesisLogForwarder::getResult(
Outcome& outcome) const {
return outcome.GetResult().GetRecords();
}
}

View File

@ -15,38 +15,49 @@
#include <vector>
#include <aws/kinesis/KinesisClient.h>
#include <aws/kinesis/model/PutRecordsRequestEntry.h>
#include <osquery/core.h>
#include <osquery/dispatcher.h>
#include <osquery/logger.h>
#include "osquery/logger/plugins/buffered.h"
#include "osquery/logger/plugins/aws_log_forwarder.h"
namespace osquery {
DECLARE_uint64(aws_kinesis_period);
class KinesisLogForwarder : public BufferedLogForwarder {
private:
static const size_t kKinesisMaxLogBytes;
static const size_t kKinesisMaxRecords;
static const size_t kKinesisMaxRetryCount;
static const size_t kKinesisInitialRetryDelay;
using IKinesisLogForwarder =
AwsLogForwarder<Aws::Kinesis::Model::PutRecordsRequestEntry,
Aws::Kinesis::KinesisClient,
Aws::Kinesis::Model::PutRecordsOutcome,
Aws::Vector<Aws::Kinesis::Model::PutRecordsResultEntry>>;
class KinesisLogForwarder final : public IKinesisLogForwarder {
public:
KinesisLogForwarder()
: BufferedLogForwarder("kinesis",
std::chrono::seconds(FLAGS_aws_kinesis_period),
kKinesisMaxRecords) {}
Status setUp() override;
KinesisLogForwarder(const std::string& name,
size_t log_period,
size_t max_lines)
: IKinesisLogForwarder(name, log_period, max_lines) {}
protected:
Status send(std::vector<std::string>& log_data,
const std::string& log_type) override;
Status internalSetup() override;
Outcome internalSend(const Batch& batch) override;
void initializeRecord(Record& record,
Aws::Utils::ByteBuffer& buffer) const override;
size_t getMaxBytesPerRecord() const override;
size_t getMaxRecordsPerBatch() const override;
size_t getMaxBytesPerBatch() const override;
size_t getMaxRetryCount() const override;
size_t getInitialRetryDelay() const override;
bool appendNewlineSeparators() const override;
size_t getFailedRecordCount(Outcome& outcome) const override;
Result getResult(Outcome& outcome) const override;
private:
/// The partition key; ignored if aws_kinesis_random_partition_key is set
std::string partition_key_;
std::shared_ptr<Aws::Kinesis::KinesisClient> client_{nullptr};
FRIEND_TEST(KinesisTests, test_send);
};

View File

@ -0,0 +1,310 @@
/*
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <chrono>
#include <memory>
#include <vector>
#include <osquery/core.h>
#include <osquery/dispatcher.h>
#include <osquery/logger.h>
#include "osquery/logger/plugins/buffered.h"
#include "osquery/utils/aws_util.h"
namespace osquery {
template <typename RecordType,
typename ClientType,
typename OutcomeType,
typename ResultType>
class AwsLogForwarder : public BufferedLogForwarder {
public:
using Outcome = OutcomeType;
using Client = ClientType;
using Record = RecordType;
using Batch = std::vector<RecordType>;
using BatchList = std::vector<Batch>;
using Result = ResultType;
public:
AwsLogForwarder(const std::string& name, size_t log_period, size_t max_lines)
: BufferedLogForwarder(name, std::chrono::seconds(log_period), max_lines),
name_(name) {}
/// Common plugin initialization
Status setUp() override {
Status s = BufferedLogForwarder::setUp();
if (!s.ok()) {
return s;
}
s = makeAWSClient<Client>(client_);
if (!s.ok()) {
return s;
}
return internalSetup();
}
private:
/// Dumps the given batch to the error log
void dumpBatchToErrorLog(const Batch& batch) const {
std::stringstream error_output;
error_output << name_
<< " logger: Failed to write the following records:\n";
dumpBatch(error_output, batch);
LOG(ERROR) << error_output.str();
}
/// Dumps the discarded records to the error log
void dumpDiscardedRecordsToErrorLog(
const std::vector<std::string>& discarded_records) const {
if (discarded_records.empty()) {
return;
}
std::stringstream output;
output << name_ << ": The following log records have been discarded "
"because they were too big:\n";
for (const auto& record : discarded_records) {
output << record << "\n";
}
LOG(ERROR) << output.str();
}
/// Dumps the specified batch to the given stream
std::ostream& dumpBatch(std::ostream& stream, const Batch& batch) const {
size_t index = 0;
for (auto it = batch.begin(); it != batch.end(); it++) {
const auto& record = *it;
const auto& raw_buffer = record.GetData();
std::string buffer(
reinterpret_cast<const char*>(raw_buffer.GetUnderlyingData()),
raw_buffer.GetLength());
stream << "Record #" << index << ": " << buffer;
if (std::next(it, 1) != batch.end()) {
stream << "\n";
}
}
return stream;
}
/// Consumes the specified log records generating one or more batches
BatchList consumeDataAndGenerateBatches(
std::vector<std::string>& discarded_records,
const std::string& log_type,
std::vector<std::string>& log_data) const {
BatchList batch_list;
Batch current_batch;
size_t current_batch_byte_size = 0U;
for (auto& record : log_data) {
// Initialize the line and make sure we are still within protocol limits
Status status = appendLogTypeToJson(log_type, record);
if (!status.ok()) {
// To achieve behavior parity with TLS logger plugin, skip non-JSON
// content
LOG(ERROR) << name_ << ": The following log record has been discarded "
"because it was not in JSON format: "
<< record;
continue;
}
size_t record_size = record.size();
if (appendNewlineSeparators()) {
++record_size;
}
if (record_size >= getMaxBytesPerRecord() ||
record_size >= getMaxBytesPerBatch()) {
discarded_records.push_back(std::move(record));
continue;
}
// Complete the current batch if it's full
if (current_batch_byte_size + record_size >= getMaxBytesPerBatch() ||
(current_batch.size() >= getMaxRecordsPerBatch())) {
batch_list.push_back(current_batch);
current_batch.clear();
current_batch_byte_size = 0U;
}
// Initialize and store the new log record
auto buffer = Aws::Utils::ByteBuffer(
reinterpret_cast<unsigned char*>(&record[0]), record_size);
if (appendNewlineSeparators()) {
buffer[record_size - 1] = '\n';
}
RecordType aws_record;
initializeRecord(aws_record, buffer);
current_batch.emplace_back(std::move(aws_record));
current_batch_byte_size += record_size;
}
if (!current_batch.empty()) {
batch_list.push_back(current_batch);
}
log_data.clear();
return batch_list;
}
protected:
/// Sends a single batch
bool sendBatch(Batch& batch, std::stringstream& status_output) {
bool success = false;
auto max_retry_count = getMaxRetryCount();
auto base_retry_delay = getInitialRetryDelay();
for (size_t retry = 0; retry < max_retry_count; retry++) {
bool is_last_retry = (retry + 1 >= max_retry_count);
// Increase the resend delay at each retry
size_t retry_delay =
(retry == 0 ? 0 : base_retry_delay) + (retry * 1000U);
if (retry_delay != 0) {
pauseMilli(retry_delay);
}
// Attempt to send the batch
auto outcome = internalSend(batch);
size_t failed_record_count = getFailedRecordCount(outcome);
size_t sent_record_count = batch.size() - failed_record_count;
if (sent_record_count > 0) {
VLOG(1) << name_ << ": Successfully sent "
<< batch.size() - failed_record_count << " out of "
<< batch.size() << " log records";
}
if (failed_record_count == 0) {
success = true;
break;
}
// Only log final errors
if (is_last_retry) {
if (!status_output.str().empty()) {
status_output << "\n";
}
status_output << outcome.GetError().GetMessage();
}
// We didn't manage to send all records; remove the ones that succeeded
// (so that we do not duplicate them) and try again
const auto& result_record_list = getResult(outcome);
for (size_t i = batch.size(); i-- > 0;) {
if (!result_record_list[i].GetErrorCode().empty()) {
continue;
}
auto it = std::next(batch.begin(), i);
batch.erase(it);
}
}
return success;
}
/// Sends the specified data in one or more batches, depending on the log size
Status send(std::vector<std::string>& log_data,
const std::string& log_type) override {
// Generate the batches, according to the protocol limits
std::vector<std::string> discarded_records;
auto batch_list =
consumeDataAndGenerateBatches(discarded_records, log_type, log_data);
dumpDiscardedRecordsToErrorLog(discarded_records);
discarded_records.clear();
// Send each batch
size_t error_count = 0;
std::stringstream status_output;
for (auto batch_it = batch_list.begin(); batch_it != batch_list.end();) {
auto& batch = *batch_it;
if (!sendBatch(batch, status_output)) {
// We couldn't write some of the records; log them locally so that the
// administrator will at least be able to inspect them
dumpBatchToErrorLog(batch);
error_count++;
}
batch_it = batch_list.erase(batch_it);
}
if (error_count != 0) {
return Status(1, status_output.str());
}
return Status(0, "OK");
}
/// Plugin-specific initialization is performed here
virtual Status internalSetup() = 0;
/// Plugin-specific send method
virtual Outcome internalSend(const Batch& batch) = 0;
/// Plugin-specific record initialization is performed here
virtual void initializeRecord(Record& record,
Aws::Utils::ByteBuffer& buffer) const = 0;
/// Must return the amount of bytes that can fit in a single record
virtual size_t getMaxBytesPerRecord() const = 0;
/// Must return the amount of records that can be inserted into a single batch
virtual size_t getMaxRecordsPerBatch() const = 0;
/// Must return the amount of bytes that can fit in a single batch
virtual size_t getMaxBytesPerBatch() const = 0;
/// Must return the maximum amount of retries when sending records
virtual size_t getMaxRetryCount() const = 0;
/// Must return the initial delay, in seconds, between each retry
virtual size_t getInitialRetryDelay() const = 0;
/// Must return true if records should be terminated with newlines
virtual bool appendNewlineSeparators() const = 0;
/// Must return the amount of records that could not be sent
virtual size_t getFailedRecordCount(Outcome& outcome) const = 0;
/// Must return the vector containing the upload result for each record
virtual Result getResult(Outcome& outcome) const = 0;
protected:
/// Plugin-specific service client
std::shared_ptr<Client> client_{nullptr};
/// Logger name; used when printing messages
std::string name_;
};
}

View File

@ -1,86 +0,0 @@
/*
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <iostream>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <aws/core/utils/Outcome.h>
#include <aws/firehose/model/PutRecordBatchRequest.h>
#include <aws/firehose/model/PutRecordBatchResponseEntry.h>
#include <aws/firehose/model/PutRecordBatchResult.h>
#include <osquery/logger.h>
#include "osquery/logger/plugins/aws_firehose.h"
#include "osquery/tests/test_util.h"
#include "osquery/utils/aws_util.h"
using namespace testing;
namespace osquery {
// Match on just the data element of a PutRecordBatchEntry
MATCHER_P(MatchesEntry, data, "") {
return data ==
std::string(reinterpret_cast<char*>(arg.GetData().GetUnderlyingData()),
arg.GetData().GetLength());
}
class MockFirehoseClient : public Aws::Firehose::FirehoseClient {
public:
MOCK_CONST_METHOD1(
PutRecordBatch,
Aws::Firehose::Model::PutRecordBatchOutcome(
const Aws::Firehose::Model::PutRecordBatchRequest& request));
};
class FirehoseTests : public testing::Test {
public:
void SetUp() override { initAwsSdk(); }
};
TEST_F(FirehoseTests, test_send) {
FirehoseLogForwarder forwarder;
auto client = std::make_shared<StrictMock<MockFirehoseClient>>();
forwarder.client_ = client;
std::vector<std::string> logs{"{\"foo\":\"bar\"}"};
Aws::Firehose::Model::PutRecordBatchOutcome outcome(
Aws::Firehose::Model::PutRecordBatchResult{});
outcome.GetResult().SetFailedPutCount(0);
EXPECT_CALL(*client,
PutRecordBatch(Property(
&Aws::Firehose::Model::PutRecordBatchRequest::GetRecords,
ElementsAre(MatchesEntry(
"{\"foo\":\"bar\",\"log_type\":\"results\"}\n")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(0), forwarder.send(logs, "results"));
logs = {"{\"bar\":\"foo\"}", "{\"foo\":\"bar\"}"};
Aws::Firehose::Model::PutRecordBatchResponseEntry entry;
outcome.GetResult().AddRequestResponses(entry);
entry.SetErrorCode("foo");
entry.SetErrorMessage("Foo error");
outcome.GetResult().SetFailedPutCount(1);
outcome.GetResult().AddRequestResponses(entry);
EXPECT_CALL(
*client,
PutRecordBatch(Property(
&Aws::Firehose::Model::PutRecordBatchRequest::GetRecords,
ElementsAre(
MatchesEntry("{\"bar\":\"foo\",\"log_type\":\"results\"}\n"),
MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}\n")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(1, "Foo error"), forwarder.send(logs, "results"));
}
}

View File

@ -1,90 +0,0 @@
/*
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <aws/core/utils/Outcome.h>
#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/model/PutRecordsRequestEntry.h>
#include <aws/kinesis/model/PutRecordsResult.h>
#include <osquery/logger.h>
#include "osquery/logger/plugins/aws_kinesis.h"
#include "osquery/tests/test_util.h"
#include "osquery/utils/aws_util.h"
using namespace testing;
namespace osquery {
// Match on just the partition key and data elements of a PutRecordsRequestEntry
MATCHER_P2(MatchesEntry, data, key, "") {
return arg.GetPartitionKey() == key &&
data == std::string(
reinterpret_cast<char*>(arg.GetData().GetUnderlyingData()),
arg.GetData().GetLength());
}
class MockKinesisClient : public Aws::Kinesis::KinesisClient {
public:
MOCK_CONST_METHOD1(
PutRecords,
Aws::Kinesis::Model::PutRecordsOutcome(
const Aws::Kinesis::Model::PutRecordsRequest& request));
};
class KinesisTests : public testing::Test {
public:
void SetUp() override {
initAwsSdk();
}
};
TEST_F(KinesisTests, test_send) {
KinesisLogForwarder forwarder;
forwarder.partition_key_ = "fake_partition_key";
auto client = std::make_shared<StrictMock<MockKinesisClient>>();
forwarder.client_ = client;
std::vector<std::string> logs{"{\"foo\":\"bar\"}"};
Aws::Kinesis::Model::PutRecordsOutcome outcome(
Aws::Kinesis::Model::PutRecordsResult{});
outcome.GetResult().SetFailedRecordCount(0);
EXPECT_CALL(
*client,
PutRecords(Property(
&Aws::Kinesis::Model::PutRecordsRequest::GetRecords,
ElementsAre(MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}",
"fake_partition_key")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(0), forwarder.send(logs, "results"));
logs = {"{\"bar\":\"foo\"}", "{\"foo\":\"bar\"}"};
Aws::Kinesis::Model::PutRecordsResultEntry entry;
outcome.GetResult().AddRecords(entry);
entry.SetErrorCode("foo");
entry.SetErrorMessage("Foo error");
outcome.GetResult().SetFailedRecordCount(2);
outcome.GetResult().AddRecords(entry);
EXPECT_CALL(
*client,
PutRecords(Property(
&Aws::Kinesis::Model::PutRecordsRequest::GetRecords,
ElementsAre(MatchesEntry("{\"bar\":\"foo\",\"log_type\":\"results\"}",
"fake_partition_key"),
MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}",
"fake_partition_key")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(1, "Foo error"), forwarder.send(logs, "results"));
}
}

View File

@ -0,0 +1,173 @@
/*
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <chrono>
#include <iostream>
#include <memory>
#include <vector>
#include <aws/kinesis/KinesisClient.h>
#include <aws/kinesis/model/PutRecordsRequestEntry.h>
#include <gtest/gtest.h>
#include <osquery/core.h>
#include <osquery/logger.h>
#include "osquery/logger/plugins/aws_log_forwarder.h"
#include "osquery/tests/test_util.h"
using namespace testing;
namespace osquery {
class AwsLoggerTests : public testing::Test {
public:
void SetUp() override {}
};
using RawBatch = std::vector<std::string>;
using RawBatchList = std::vector<RawBatch>;
using IDummyLogForwarder =
AwsLogForwarder<Aws::Kinesis::Model::PutRecordsRequestEntry,
Aws::Kinesis::KinesisClient,
Aws::Kinesis::Model::PutRecordsOutcome,
Aws::Vector<Aws::Kinesis::Model::PutRecordsResultEntry>>;
class DummyLogForwarder final : public IDummyLogForwarder {
public:
DummyLogForwarder() : IDummyLogForwarder("dummy", 10, 50) {}
protected:
Status internalSetup() override {
return Status(0, "OK");
}
Outcome internalSend(const Batch& batch) override {
RawBatch raw_batch;
for (const auto& record : batch) {
std::string buffer(
reinterpret_cast<const char*>(record.GetData().GetUnderlyingData()),
record.GetData().GetLength());
raw_batch.push_back(buffer);
}
emitted_batch_list_.push_back(raw_batch);
return Outcome();
}
void initializeRecord(Record& record,
Aws::Utils::ByteBuffer& buffer) const override {
record.SetData(buffer);
}
std::size_t getMaxBytesPerRecord() const override {
return 80U;
}
std::size_t getMaxRecordsPerBatch() const override {
return 3U;
}
std::size_t getMaxBytesPerBatch() const override {
return 128U;
}
std::size_t getMaxRetryCount() const override {
return 1U;
}
std::size_t getInitialRetryDelay() const override {
return 0U;
}
bool appendNewlineSeparators() const override {
return true;
}
std::size_t getFailedRecordCount(Outcome& outcome) const override {
return 0U;
}
Result getResult(Outcome& outcome) const override {
return outcome.GetResult().GetRecords();
}
public:
RawBatchList emitted_batch_list_;
FRIEND_TEST(AwsLoggerTests, test_send);
};
TEST_F(AwsLoggerTests, test_send) {
DummyLogForwarder log_forwarder;
// The following 3 lines fit nicely inside a single batch
log_forwarder.logString("{ \"batch1\": \"1\" }");
log_forwarder.logString("{ \"batch1\": \"2\" }");
log_forwarder.logString("{ \"batch1\": \"3\" }");
log_forwarder.check();
// The following two lines will be discarded
std::cout << "Emitting two lines that will be discarded..." << std::endl;
log_forwarder.logString(
"{ \"test\": \"This line will be discarded because too long according to "
"the protocol\" }");
log_forwarder.logString(
"This line will be discarded because it is not in JSON format");
log_forwarder.check();
// The next 3 lines will be split in two because the whole batch size
// is too big
log_forwarder.logString(
"{ \"batch2\": \"1\", \"test test test test test\": \"1\" }");
log_forwarder.logString(
"{ \"batch2\": \"2\", \"test test test test test\": \"2\" }");
log_forwarder.logString("{ \"batch3\": \"3\" }");
log_forwarder.check();
//
// Make sure we have sent the correct data. Remember that we have
// requested to add newlines at the end of each record!
//
// We expect to have sent three batches
EXPECT_EQ(log_forwarder.emitted_batch_list_.size(), 3U);
// The first batch should contain 3 items
auto first_batch = log_forwarder.emitted_batch_list_[0];
EXPECT_EQ(first_batch.size(), 3U);
EXPECT_EQ(first_batch[0], "{\"batch1\":\"1\",\"log_type\":\"result\"}\n");
EXPECT_EQ(first_batch[1], "{\"batch1\":\"2\",\"log_type\":\"result\"}\n");
EXPECT_EQ(first_batch[2], "{\"batch1\":\"3\",\"log_type\":\"result\"}\n");
// The second batch should contain only one item, because it has been split
auto second_batch = log_forwarder.emitted_batch_list_[1];
EXPECT_EQ(second_batch.size(), 1U);
EXPECT_EQ(second_batch[0],
"{\"batch2\":\"1\",\"test test test test "
"test\":\"1\",\"log_type\":\"result\"}\n");
// The third and last batch should contain the remaining 2 items
auto third_batch = log_forwarder.emitted_batch_list_[2];
EXPECT_EQ(third_batch.size(), 2U);
EXPECT_EQ(third_batch[0],
"{\"batch2\":\"2\",\"test test test test "
"test\":\"2\",\"log_type\":\"result\"}\n");
EXPECT_EQ(third_batch[1], "{\"batch3\":\"3\",\"log_type\":\"result\"}\n");
}
}