Adding Status Logging to AWS Kinesis/Firehose Logger Plugins (#3115)

This commit is contained in:
yying 2017-04-04 12:54:56 -04:00 committed by Teddy Reed
parent 5d6ea77efd
commit 20f8e6cd5a
9 changed files with 184 additions and 27 deletions

View File

@ -55,10 +55,31 @@ Status FirehoseLoggerPlugin::logString(const std::string& s) {
return forwarder_->logString(s);
}
Status FirehoseLoggerPlugin::logStatus(const std::vector<StatusLogLine>& log) {
return forwarder_->logStatus(log);
}
void FirehoseLoggerPlugin::init(const std::string& name,
const std::vector<StatusLogLine>& log) {
google::ShutdownGoogleLogging();
google::InitGoogleLogging(name.c_str());
logStatus(log);
}
Status FirehoseLogForwarder::send(std::vector<std::string>& log_data,
const std::string& log_type) {
std::vector<Aws::Firehose::Model::Record> records;
for (const std::string& log : log_data) {
for (std::string& log : log_data) {
Status status = appendLogTypeToJson(log_type, log);
if (!status.ok()) {
LOG(ERROR)
<< "Failed to append log_type key to status log JSON in Firehose";
// To achieve behavior parity with TLS logger plugin, skip non-JSON
// content
continue;
}
if (log.size() + 1 > kFirehoseMaxLogBytes) {
LOG(ERROR) << "Firehose log too big, discarding!";
}

View File

@ -54,12 +54,19 @@ class FirehoseLoggerPlugin : public LoggerPlugin {
Status setUp() override;
bool usesLogStatus() override {
return true;
}
protected:
void init(const std::string& name,
const std::vector<StatusLogLine>& log) override {}
const std::vector<StatusLogLine>& log) override;
Status logString(const std::string& s) override;
/// Log a status (ERROR/WARNING/INFO) message.
Status logStatus(const std::vector<StatusLogLine>& log) override;
private:
std::shared_ptr<FirehoseLogForwarder> forwarder_{nullptr};
};

View File

@ -48,6 +48,9 @@ const size_t KinesisLogForwarder::kKinesisMaxRecords = 500;
// Max size of log + partition key is 1MB. Max size of partition key is 256B.
const size_t KinesisLogForwarder::kKinesisMaxLogBytes = 1000000 - 256;
const size_t KinesisLogForwarder::kKinesisMaxRetryCount = 100;
const size_t KinesisLogForwarder::kKinesisInitialRetryDelay = 3000;
Status KinesisLoggerPlugin::setUp() {
initAwsSdk();
forwarder_ = std::make_shared<KinesisLogForwarder>();
@ -64,15 +67,43 @@ Status KinesisLoggerPlugin::logString(const std::string& s) {
return forwarder_->logString(s);
}
Status KinesisLoggerPlugin::logStatus(const std::vector<StatusLogLine>& log) {
return forwarder_->logStatus(log);
}
void KinesisLoggerPlugin::init(const std::string& name,
const std::vector<StatusLogLine>& log) {
google::ShutdownGoogleLogging();
google::InitGoogleLogging(name.c_str());
logStatus(log);
}
Status KinesisLogForwarder::send(std::vector<std::string>& log_data,
const std::string& log_type) {
size_t retry_count = 100;
size_t retry_delay = 3000;
size_t retry_count = kKinesisMaxRetryCount;
size_t retry_delay = kKinesisInitialRetryDelay;
size_t original_data_size = log_data.size();
// exit if we sent all the data
while (log_data.size() > 0) {
std::vector<Aws::Kinesis::Model::PutRecordsRequestEntry> entries;
for (const std::string& log : log_data) {
std::vector<size_t> valid_log_data_indices;
size_t log_data_index = 0;
for (std::string& log : log_data) {
if (retry_count == kKinesisMaxRetryCount) {
// On first send attempt, append log_type to the JSON log content
// Other send attempts will be already have log_type appended
Status status = appendLogTypeToJson(log_type, log);
if (!status.ok()) {
LOG(ERROR)
<< "Failed to append log_type key to status log JSON in Kinesis";
log_data_index++;
continue;
}
}
if (log.size() > kKinesisMaxLogBytes) {
LOG(ERROR) << "Kinesis log too big, discarding!";
}
@ -90,6 +121,9 @@ Status KinesisLogForwarder::send(std::vector<std::string>& log_data,
.WithData(Aws::Utils::ByteBuffer((unsigned char*)log.c_str(),
log.length()));
entries.push_back(std::move(entry));
valid_log_data_indices.push_back(log_data_index);
log_data_index++;
}
Aws::Kinesis::Model::PutRecordsRequest request;
@ -105,13 +139,14 @@ Status KinesisLogForwarder::send(std::vector<std::string>& log_data,
if (result.GetFailedRecordCount() != 0) {
std::vector<std::string> resend;
std::string error_msg = "";
int i = 0;
log_data_index = 0;
for (const auto& record : result.GetRecords()) {
if (!record.GetErrorMessage().empty()) {
resend.push_back(log_data[i]);
size_t valid_log_data_index = valid_log_data_indices[log_data_index];
resend.push_back(log_data[valid_log_data_index]);
error_msg = record.GetErrorMessage();
}
i++;
log_data_index++;
}
// exit if we have tried too many times
// exit if all uploads fail right off the bat

View File

@ -30,6 +30,8 @@ class KinesisLogForwarder : public BufferedLogForwarder {
private:
static const size_t kKinesisMaxLogBytes;
static const size_t kKinesisMaxRecords;
static const size_t kKinesisMaxRetryCount;
static const size_t kKinesisInitialRetryDelay;
public:
KinesisLogForwarder()
@ -55,12 +57,19 @@ class KinesisLoggerPlugin : public LoggerPlugin {
Status setUp() override;
bool usesLogStatus() override {
return true;
}
private:
void init(const std::string& name,
const std::vector<StatusLogLine>& log) override {}
const std::vector<StatusLogLine>& log) override;
Status logString(const std::string& s) override;
/// Log a status (ERROR/WARNING/INFO) message.
Status logStatus(const std::vector<StatusLogLine>& log) override;
private:
std::shared_ptr<KinesisLogForwarder> forwarder_{nullptr};
};

View File

@ -29,6 +29,7 @@
#include <osquery/logger.h>
#include <osquery/system.h>
#include "osquery/core/json.h"
#include "osquery/logger/plugins/aws_util.h"
#include "osquery/remote/transports/tls.h"
@ -356,4 +357,41 @@ Status getAWSRegion(std::string& region, bool sts) {
VLOG(1) << "Using default AWS region: " << region;
return Status(0);
}
Status appendLogTypeToJson(const std::string& log_type, std::string& log) {
if (log_type.empty()) {
return Status(1, "log_type is empty");
}
if (log.empty()) {
return Status(1, "original JSON is empty");
}
pt::ptree params;
try {
std::stringstream input;
input << log;
pt::read_json(input, params);
} catch (const pt::json_parser::json_parser_error& e) {
return Status(1,
std::string("JSON deserialization exception: ") + e.what());
}
params.put<std::string>("log_type", log_type);
std::ostringstream output;
try {
pt::write_json(output, params, false);
} catch (const pt::json_parser::json_parser_error& e) {
return Status(1, std::string("JSON serialization exception: ") + e.what());
}
log = output.str();
// Get rid of newline
if (!log.empty()) {
log.pop_back();
}
return Status(0, "OK");
}
}

View File

@ -171,4 +171,16 @@ Status makeAWSClient(std::shared_ptr<Client>& client, bool sts = true) {
std::make_shared<OsqueryAWSCredentialsProviderChain>(sts), client_config);
return Status(0);
}
/**
* @brief Parses an input string as JSON, appends a "log_type" key to the
* dictionary, and serializes it, mutating the input
*
* @param log_type The type of log (as passed to a logger plugin's send
* function)
* @param log The input to be mutated with the appended "log_type" JSON key
*
* @return 0 if successful, 1 if there were issues
*/
Status appendLogTypeToJson(const std::string& log_type, std::string& log);
}

View File

@ -53,17 +53,18 @@ TEST_F(FirehoseTests, test_send) {
auto client = std::make_shared<StrictMock<MockFirehoseClient>>();
forwarder.client_ = client;
std::vector<std::string> logs{"foo"};
std::vector<std::string> logs{"{\"foo\":\"bar\"}"};
Aws::Firehose::Model::PutRecordBatchOutcome outcome;
outcome.GetResult().SetFailedPutCount(0);
EXPECT_CALL(*client,
PutRecordBatch(Property(
&Aws::Firehose::Model::PutRecordBatchRequest::GetRecords,
ElementsAre(MatchesEntry("foo\n")))))
ElementsAre(MatchesEntry(
"{\"foo\":\"bar\",\"log_type\":\"results\"}\n")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(0), forwarder.send(logs, "results"));
logs = {"bar", "foo"};
logs = {"{\"bar\":\"foo\"}", "{\"foo\":\"bar\"}"};
Aws::Firehose::Model::PutRecordBatchResponseEntry entry;
outcome.GetResult().AddRequestResponses(entry);
entry.SetErrorCode("foo");
@ -71,10 +72,13 @@ TEST_F(FirehoseTests, test_send) {
outcome.GetResult().SetFailedPutCount(1);
outcome.GetResult().AddRequestResponses(entry);
EXPECT_CALL(*client,
PutRecordBatch(Property(
&Aws::Firehose::Model::PutRecordBatchRequest::GetRecords,
ElementsAre(MatchesEntry("bar\n"), MatchesEntry("foo\n")))))
EXPECT_CALL(
*client,
PutRecordBatch(Property(
&Aws::Firehose::Model::PutRecordBatchRequest::GetRecords,
ElementsAre(
MatchesEntry("{\"bar\":\"foo\",\"log_type\":\"results\"}\n"),
MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}\n")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(1, "Foo error"), forwarder.send(logs, "results"));
}

View File

@ -53,17 +53,19 @@ TEST_F(KinesisTests, test_send) {
auto client = std::make_shared<StrictMock<MockKinesisClient>>();
forwarder.client_ = client;
std::vector<std::string> logs{"foo"};
std::vector<std::string> logs{"{\"foo\":\"bar\"}"};
Aws::Kinesis::Model::PutRecordsOutcome outcome;
outcome.GetResult().SetFailedRecordCount(0);
EXPECT_CALL(*client,
PutRecords(Property(
&Aws::Kinesis::Model::PutRecordsRequest::GetRecords,
ElementsAre(MatchesEntry("foo", "fake_partition_key")))))
EXPECT_CALL(
*client,
PutRecords(Property(
&Aws::Kinesis::Model::PutRecordsRequest::GetRecords,
ElementsAre(MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}",
"fake_partition_key")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(0), forwarder.send(logs, "results"));
logs = {"bar", "foo"};
logs = {"{\"bar\":\"foo\"}", "{\"foo\":\"bar\"}"};
Aws::Kinesis::Model::PutRecordsResultEntry entry;
outcome.GetResult().AddRecords(entry);
entry.SetErrorCode("foo");
@ -71,11 +73,14 @@ TEST_F(KinesisTests, test_send) {
outcome.GetResult().SetFailedRecordCount(2);
outcome.GetResult().AddRecords(entry);
EXPECT_CALL(*client,
PutRecords(Property(
&Aws::Kinesis::Model::PutRecordsRequest::GetRecords,
ElementsAre(MatchesEntry("bar", "fake_partition_key"),
MatchesEntry("foo", "fake_partition_key")))))
EXPECT_CALL(
*client,
PutRecords(Property(
&Aws::Kinesis::Model::PutRecordsRequest::GetRecords,
ElementsAre(MatchesEntry("{\"bar\":\"foo\",\"log_type\":\"results\"}",
"fake_partition_key"),
MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}",
"fake_partition_key")))))
.WillOnce(Return(outcome));
EXPECT_EQ(Status(1, "Foo error"), forwarder.send(logs, "results"));
}

View File

@ -158,4 +158,30 @@ TEST_F(AwsUtilTests, test_get_region) {
ASSERT_EQ(Status(0), getAWSRegion(region));
ASSERT_EQ(std::string(Aws::Region::US_WEST_2), region);
}
TEST_F(AwsUtilTests, test_append_log_type_to_json) {
Status status;
std::string output;
std::string null_json = "";
status = appendLogTypeToJson("result", null_json);
ASSERT_FALSE(status.ok());
ASSERT_EQ(status.getCode(), 1);
const std::string expected_empty = "{\"log_type\":\"result\"}";
std::string empty_json = "{}";
status = appendLogTypeToJson("result", empty_json);
ASSERT_TRUE(status.ok());
ASSERT_EQ(expected_empty, empty_json);
const std::string expected_full =
"{\"severity\":\"0\",\"log_type\":\"status\"}";
std::string full_json = "{\"severity\":\"0\"}";
status = appendLogTypeToJson("status", full_json);
ASSERT_TRUE(status.ok());
ASSERT_EQ(expected_full, full_json);
}
}