diff --git a/osquery/logger/plugins/aws_firehose.cpp b/osquery/logger/plugins/aws_firehose.cpp index 1c2fbdc4..4ac106f2 100644 --- a/osquery/logger/plugins/aws_firehose.cpp +++ b/osquery/logger/plugins/aws_firehose.cpp @@ -55,10 +55,31 @@ Status FirehoseLoggerPlugin::logString(const std::string& s) { return forwarder_->logString(s); } +Status FirehoseLoggerPlugin::logStatus(const std::vector& log) { + return forwarder_->logStatus(log); +} + +void FirehoseLoggerPlugin::init(const std::string& name, + const std::vector& log) { + google::ShutdownGoogleLogging(); + google::InitGoogleLogging(name.c_str()); + logStatus(log); +} + Status FirehoseLogForwarder::send(std::vector& log_data, const std::string& log_type) { std::vector records; - for (const std::string& log : log_data) { + for (std::string& log : log_data) { + Status status = appendLogTypeToJson(log_type, log); + if (!status.ok()) { + LOG(ERROR) + << "Failed to append log_type key to status log JSON in Firehose"; + + // To achieve behavior parity with TLS logger plugin, skip non-JSON + // content + continue; + } + if (log.size() + 1 > kFirehoseMaxLogBytes) { LOG(ERROR) << "Firehose log too big, discarding!"; } diff --git a/osquery/logger/plugins/aws_firehose.h b/osquery/logger/plugins/aws_firehose.h index d8720577..2a55d014 100644 --- a/osquery/logger/plugins/aws_firehose.h +++ b/osquery/logger/plugins/aws_firehose.h @@ -54,12 +54,19 @@ class FirehoseLoggerPlugin : public LoggerPlugin { Status setUp() override; + bool usesLogStatus() override { + return true; + } + protected: void init(const std::string& name, - const std::vector& log) override {} + const std::vector& log) override; Status logString(const std::string& s) override; + /// Log a status (ERROR/WARNING/INFO) message. + Status logStatus(const std::vector& log) override; + private: std::shared_ptr forwarder_{nullptr}; }; diff --git a/osquery/logger/plugins/aws_kinesis.cpp b/osquery/logger/plugins/aws_kinesis.cpp index f9fb106c..fb90a3e8 100644 --- a/osquery/logger/plugins/aws_kinesis.cpp +++ b/osquery/logger/plugins/aws_kinesis.cpp @@ -48,6 +48,9 @@ const size_t KinesisLogForwarder::kKinesisMaxRecords = 500; // Max size of log + partition key is 1MB. Max size of partition key is 256B. const size_t KinesisLogForwarder::kKinesisMaxLogBytes = 1000000 - 256; +const size_t KinesisLogForwarder::kKinesisMaxRetryCount = 100; +const size_t KinesisLogForwarder::kKinesisInitialRetryDelay = 3000; + Status KinesisLoggerPlugin::setUp() { initAwsSdk(); forwarder_ = std::make_shared(); @@ -64,15 +67,43 @@ Status KinesisLoggerPlugin::logString(const std::string& s) { return forwarder_->logString(s); } +Status KinesisLoggerPlugin::logStatus(const std::vector& log) { + return forwarder_->logStatus(log); +} + +void KinesisLoggerPlugin::init(const std::string& name, + const std::vector& log) { + google::ShutdownGoogleLogging(); + google::InitGoogleLogging(name.c_str()); + logStatus(log); +} + Status KinesisLogForwarder::send(std::vector& log_data, const std::string& log_type) { - size_t retry_count = 100; - size_t retry_delay = 3000; + size_t retry_count = kKinesisMaxRetryCount; + size_t retry_delay = kKinesisInitialRetryDelay; size_t original_data_size = log_data.size(); + // exit if we sent all the data while (log_data.size() > 0) { std::vector entries; - for (const std::string& log : log_data) { + std::vector valid_log_data_indices; + + size_t log_data_index = 0; + for (std::string& log : log_data) { + if (retry_count == kKinesisMaxRetryCount) { + // On first send attempt, append log_type to the JSON log content + // Other send attempts will be already have log_type appended + Status status = appendLogTypeToJson(log_type, log); + if (!status.ok()) { + LOG(ERROR) + << "Failed to append log_type key to status log JSON in Kinesis"; + + log_data_index++; + continue; + } + } + if (log.size() > kKinesisMaxLogBytes) { LOG(ERROR) << "Kinesis log too big, discarding!"; } @@ -90,6 +121,9 @@ Status KinesisLogForwarder::send(std::vector& log_data, .WithData(Aws::Utils::ByteBuffer((unsigned char*)log.c_str(), log.length())); entries.push_back(std::move(entry)); + valid_log_data_indices.push_back(log_data_index); + + log_data_index++; } Aws::Kinesis::Model::PutRecordsRequest request; @@ -105,13 +139,14 @@ Status KinesisLogForwarder::send(std::vector& log_data, if (result.GetFailedRecordCount() != 0) { std::vector resend; std::string error_msg = ""; - int i = 0; + log_data_index = 0; for (const auto& record : result.GetRecords()) { if (!record.GetErrorMessage().empty()) { - resend.push_back(log_data[i]); + size_t valid_log_data_index = valid_log_data_indices[log_data_index]; + resend.push_back(log_data[valid_log_data_index]); error_msg = record.GetErrorMessage(); } - i++; + log_data_index++; } // exit if we have tried too many times // exit if all uploads fail right off the bat diff --git a/osquery/logger/plugins/aws_kinesis.h b/osquery/logger/plugins/aws_kinesis.h index 529537eb..161110e9 100644 --- a/osquery/logger/plugins/aws_kinesis.h +++ b/osquery/logger/plugins/aws_kinesis.h @@ -30,6 +30,8 @@ class KinesisLogForwarder : public BufferedLogForwarder { private: static const size_t kKinesisMaxLogBytes; static const size_t kKinesisMaxRecords; + static const size_t kKinesisMaxRetryCount; + static const size_t kKinesisInitialRetryDelay; public: KinesisLogForwarder() @@ -55,12 +57,19 @@ class KinesisLoggerPlugin : public LoggerPlugin { Status setUp() override; + bool usesLogStatus() override { + return true; + } + private: void init(const std::string& name, - const std::vector& log) override {} + const std::vector& log) override; Status logString(const std::string& s) override; + /// Log a status (ERROR/WARNING/INFO) message. + Status logStatus(const std::vector& log) override; + private: std::shared_ptr forwarder_{nullptr}; }; diff --git a/osquery/logger/plugins/aws_util.cpp b/osquery/logger/plugins/aws_util.cpp index 2361adad..d24ec962 100644 --- a/osquery/logger/plugins/aws_util.cpp +++ b/osquery/logger/plugins/aws_util.cpp @@ -29,6 +29,7 @@ #include #include +#include "osquery/core/json.h" #include "osquery/logger/plugins/aws_util.h" #include "osquery/remote/transports/tls.h" @@ -356,4 +357,41 @@ Status getAWSRegion(std::string& region, bool sts) { VLOG(1) << "Using default AWS region: " << region; return Status(0); } + +Status appendLogTypeToJson(const std::string& log_type, std::string& log) { + if (log_type.empty()) { + return Status(1, "log_type is empty"); + } + + if (log.empty()) { + return Status(1, "original JSON is empty"); + } + + pt::ptree params; + try { + std::stringstream input; + input << log; + pt::read_json(input, params); + } catch (const pt::json_parser::json_parser_error& e) { + return Status(1, + std::string("JSON deserialization exception: ") + e.what()); + } + + params.put("log_type", log_type); + + std::ostringstream output; + try { + pt::write_json(output, params, false); + } catch (const pt::json_parser::json_parser_error& e) { + return Status(1, std::string("JSON serialization exception: ") + e.what()); + } + + log = output.str(); + + // Get rid of newline + if (!log.empty()) { + log.pop_back(); + } + return Status(0, "OK"); +} } diff --git a/osquery/logger/plugins/aws_util.h b/osquery/logger/plugins/aws_util.h index 43ab0d7a..0f483ee4 100644 --- a/osquery/logger/plugins/aws_util.h +++ b/osquery/logger/plugins/aws_util.h @@ -171,4 +171,16 @@ Status makeAWSClient(std::shared_ptr& client, bool sts = true) { std::make_shared(sts), client_config); return Status(0); } + +/** + * @brief Parses an input string as JSON, appends a "log_type" key to the + * dictionary, and serializes it, mutating the input + * + * @param log_type The type of log (as passed to a logger plugin's send + * function) + * @param log The input to be mutated with the appended "log_type" JSON key + * + * @return 0 if successful, 1 if there were issues + */ +Status appendLogTypeToJson(const std::string& log_type, std::string& log); } diff --git a/osquery/logger/plugins/tests/aws_firehose_tests.cpp b/osquery/logger/plugins/tests/aws_firehose_tests.cpp index 75875f0a..e88a094e 100644 --- a/osquery/logger/plugins/tests/aws_firehose_tests.cpp +++ b/osquery/logger/plugins/tests/aws_firehose_tests.cpp @@ -53,17 +53,18 @@ TEST_F(FirehoseTests, test_send) { auto client = std::make_shared>(); forwarder.client_ = client; - std::vector logs{"foo"}; + std::vector logs{"{\"foo\":\"bar\"}"}; Aws::Firehose::Model::PutRecordBatchOutcome outcome; outcome.GetResult().SetFailedPutCount(0); EXPECT_CALL(*client, PutRecordBatch(Property( &Aws::Firehose::Model::PutRecordBatchRequest::GetRecords, - ElementsAre(MatchesEntry("foo\n"))))) + ElementsAre(MatchesEntry( + "{\"foo\":\"bar\",\"log_type\":\"results\"}\n"))))) .WillOnce(Return(outcome)); EXPECT_EQ(Status(0), forwarder.send(logs, "results")); - logs = {"bar", "foo"}; + logs = {"{\"bar\":\"foo\"}", "{\"foo\":\"bar\"}"}; Aws::Firehose::Model::PutRecordBatchResponseEntry entry; outcome.GetResult().AddRequestResponses(entry); entry.SetErrorCode("foo"); @@ -71,10 +72,13 @@ TEST_F(FirehoseTests, test_send) { outcome.GetResult().SetFailedPutCount(1); outcome.GetResult().AddRequestResponses(entry); - EXPECT_CALL(*client, - PutRecordBatch(Property( - &Aws::Firehose::Model::PutRecordBatchRequest::GetRecords, - ElementsAre(MatchesEntry("bar\n"), MatchesEntry("foo\n"))))) + EXPECT_CALL( + *client, + PutRecordBatch(Property( + &Aws::Firehose::Model::PutRecordBatchRequest::GetRecords, + ElementsAre( + MatchesEntry("{\"bar\":\"foo\",\"log_type\":\"results\"}\n"), + MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}\n"))))) .WillOnce(Return(outcome)); EXPECT_EQ(Status(1, "Foo error"), forwarder.send(logs, "results")); } diff --git a/osquery/logger/plugins/tests/aws_kinesis_tests.cpp b/osquery/logger/plugins/tests/aws_kinesis_tests.cpp index be0dcd06..e58b4096 100644 --- a/osquery/logger/plugins/tests/aws_kinesis_tests.cpp +++ b/osquery/logger/plugins/tests/aws_kinesis_tests.cpp @@ -53,17 +53,19 @@ TEST_F(KinesisTests, test_send) { auto client = std::make_shared>(); forwarder.client_ = client; - std::vector logs{"foo"}; + std::vector logs{"{\"foo\":\"bar\"}"}; Aws::Kinesis::Model::PutRecordsOutcome outcome; outcome.GetResult().SetFailedRecordCount(0); - EXPECT_CALL(*client, - PutRecords(Property( - &Aws::Kinesis::Model::PutRecordsRequest::GetRecords, - ElementsAre(MatchesEntry("foo", "fake_partition_key"))))) + EXPECT_CALL( + *client, + PutRecords(Property( + &Aws::Kinesis::Model::PutRecordsRequest::GetRecords, + ElementsAre(MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}", + "fake_partition_key"))))) .WillOnce(Return(outcome)); EXPECT_EQ(Status(0), forwarder.send(logs, "results")); - logs = {"bar", "foo"}; + logs = {"{\"bar\":\"foo\"}", "{\"foo\":\"bar\"}"}; Aws::Kinesis::Model::PutRecordsResultEntry entry; outcome.GetResult().AddRecords(entry); entry.SetErrorCode("foo"); @@ -71,11 +73,14 @@ TEST_F(KinesisTests, test_send) { outcome.GetResult().SetFailedRecordCount(2); outcome.GetResult().AddRecords(entry); - EXPECT_CALL(*client, - PutRecords(Property( - &Aws::Kinesis::Model::PutRecordsRequest::GetRecords, - ElementsAre(MatchesEntry("bar", "fake_partition_key"), - MatchesEntry("foo", "fake_partition_key"))))) + EXPECT_CALL( + *client, + PutRecords(Property( + &Aws::Kinesis::Model::PutRecordsRequest::GetRecords, + ElementsAre(MatchesEntry("{\"bar\":\"foo\",\"log_type\":\"results\"}", + "fake_partition_key"), + MatchesEntry("{\"foo\":\"bar\",\"log_type\":\"results\"}", + "fake_partition_key"))))) .WillOnce(Return(outcome)); EXPECT_EQ(Status(1, "Foo error"), forwarder.send(logs, "results")); } diff --git a/osquery/logger/plugins/tests/aws_util_tests.cpp b/osquery/logger/plugins/tests/aws_util_tests.cpp index a79d23ee..451605ba 100644 --- a/osquery/logger/plugins/tests/aws_util_tests.cpp +++ b/osquery/logger/plugins/tests/aws_util_tests.cpp @@ -158,4 +158,30 @@ TEST_F(AwsUtilTests, test_get_region) { ASSERT_EQ(Status(0), getAWSRegion(region)); ASSERT_EQ(std::string(Aws::Region::US_WEST_2), region); } + +TEST_F(AwsUtilTests, test_append_log_type_to_json) { + Status status; + std::string output; + + std::string null_json = ""; + + status = appendLogTypeToJson("result", null_json); + ASSERT_FALSE(status.ok()); + ASSERT_EQ(status.getCode(), 1); + + const std::string expected_empty = "{\"log_type\":\"result\"}"; + std::string empty_json = "{}"; + + status = appendLogTypeToJson("result", empty_json); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(expected_empty, empty_json); + + const std::string expected_full = + "{\"severity\":\"0\",\"log_type\":\"status\"}"; + std::string full_json = "{\"severity\":\"0\"}"; + + status = appendLogTypeToJson("status", full_json); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(expected_full, full_json); +} }