osquery-1/osquery/logger/plugins/aws_firehose.cpp
Zachary Wasserman 262833c86a Add AWS Kinesis and Firehose logger plugins (#2045)
This commit adds logger plugin implementations for the Amazon
Kinesis (https://aws.amazon.com/kinesis/) and Kinesis
Firehose (https://aws.amazon.com/kinesis/firehose/) services. To support
these plugins there are a number of utility classes and functions for
AWS authentication, configuration and API integration. The logger plugin
implementations take advantage of the BufferedLogForwarder base class
for reliable buffering and batch sending of logs. In their current
implementations, the logger plugins only support sending of result logs
to these AWS services.
2016-04-25 16:19:51 -07:00

125 lines
4.1 KiB
C++

/*
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <algorithm>
#include <boost/algorithm/string/join.hpp>
#include <aws/core/utils/Outcome.h>
#include <aws/firehose/model/ListDeliveryStreamsRequest.h>
#include <aws/firehose/model/ListDeliveryStreamsResult.h>
#include <aws/firehose/model/PutRecordBatchRequest.h>
#include <aws/firehose/model/PutRecordBatchResponseEntry.h>
#include <aws/firehose/model/PutRecordBatchResult.h>
#include <aws/firehose/model/Record.h>
#include <osquery/flags.h>
#include <osquery/registry.h>
#include "osquery/logger/plugins/aws_firehose.h"
#include "osquery/logger/plugins/aws_util.h"
namespace osquery {
REGISTER(FirehoseLoggerPlugin, "logger", "aws_firehose");
FLAG(uint64,
aws_firehose_period,
10,
"Seconds between flushing logs to Firehose (default 10)");
FLAG(string, aws_firehose_stream, "", "Name of Firehose stream for logging")
// This is the max per AWS docs
const size_t FirehoseLogForwarder::kFirehoseMaxRecords = 500;
// Max size of log + partition key is 1MB. Max size of partition key is 256B.
const size_t FirehoseLogForwarder::kFirehoseMaxLogBytes = 1000000 - 256;
Status FirehoseLoggerPlugin::setUp() {
forwarder_ = std::make_shared<FirehoseLogForwarder>();
Status s = forwarder_->setUp();
if (!s.ok()) {
LOG(ERROR) << "Error initializing Firehose logger: " << s.getMessage();
return s;
}
Dispatcher::addService(forwarder_);
return Status(0);
}
Status FirehoseLoggerPlugin::logString(const std::string& s) {
return forwarder_->logString(s);
}
Status FirehoseLogForwarder::send(std::vector<std::string>& log_data,
const std::string& log_type) {
std::vector<Aws::Firehose::Model::Record> records;
for (const std::string& log : log_data) {
if (log.size() > kFirehoseMaxLogBytes) {
LOG(ERROR) << "Firehose log too big, discarding!";
}
Aws::Firehose::Model::Record record;
record.WithData(
Aws::Utils::ByteBuffer((unsigned char*)log.c_str(), log.length()));
records.push_back(std::move(record));
}
Aws::Firehose::Model::PutRecordBatchRequest request;
request.WithDeliveryStreamName(FLAGS_aws_firehose_stream)
.WithRecords(std::move(records));
Aws::Firehose::Model::PutRecordBatchOutcome outcome =
client_->PutRecordBatch(request);
Aws::Firehose::Model::PutRecordBatchResult result = outcome.GetResult();
if (result.GetFailedPutCount() != 0) {
for (const auto& record : result.GetRequestResponses()) {
if (!record.GetErrorMessage().empty()) {
VLOG(1) << "Firehose write for " << result.GetFailedPutCount() << " of "
<< result.GetRequestResponses().size()
<< " records failed with error " << record.GetErrorMessage();
return Status(1, record.GetErrorMessage());
}
}
}
VLOG(1) << "Successfully sent " << result.GetRequestResponses().size()
<< " logs to Firehose.";
return Status(0);
}
Status FirehoseLogForwarder::setUp() {
// Set up client
Status s = makeAWSClient<Aws::Firehose::FirehoseClient>(client_);
if (!s.ok()) {
return s;
}
if (FLAGS_aws_firehose_stream.empty()) {
return Status(1,
"Stream name must be specified with --aws_firehose_stream");
}
// Make sure we can connect to designated stream
Aws::Firehose::Model::ListDeliveryStreamsRequest r;
auto result = client_->ListDeliveryStreams(r).GetResult();
std::vector<std::string> stream_names = result.GetDeliveryStreamNames();
if (std::find(stream_names.begin(),
stream_names.end(),
FLAGS_aws_firehose_stream) == stream_names.end()) {
return Status(1,
"Could not find Firehose delivery stream: " +
FLAGS_aws_firehose_stream);
}
VLOG(1) << "Firehose logging initialized with stream: "
<< FLAGS_aws_firehose_stream;
return Status(0);
}
}