Merge pull request #1791 from theopolis/safe_log_buffer

Improve TLS logger performance
This commit is contained in:
Teddy Reed 2016-01-21 11:37:37 -08:00
commit 7b5ed691ec
13 changed files with 467 additions and 211 deletions

View File

@ -582,7 +582,8 @@ class DatabasePlugin : public Plugin {
/// Key/index lookup method.
virtual Status scan(const std::string& domain,
std::vector<std::string>& results) const {
std::vector<std::string>& results,
size_t max = 0) const {
return Status(0, "Not used");
}
@ -627,7 +628,8 @@ Status deleteDatabaseValue(const std::string& domain, const std::string& key);
/// Get a list of keys for a given domain.
Status scanDatabaseKeys(const std::string& domain,
std::vector<std::string>& keys);
std::vector<std::string>& keys,
size_t max = 0);
/// Allow callers to scan each column family and print each value.
void dumpDatabase();

View File

@ -47,6 +47,7 @@ DECLARE_string(database_path);
DECLARE_string(extensions_socket);
DECLARE_string(modules_autoload);
DECLARE_string(extensions_autoload);
DECLARE_string(enroll_tls_endpoint);
DECLARE_bool(disable_logging);
typedef std::chrono::high_resolution_clock chrono_clock;
@ -385,6 +386,9 @@ void TLSServerRunner::start() {
return;
}
// Pick a port in an ephemeral range at random.
self.port_ = std::to_string(rand() % 10000 + 20000);
// Fork then exec a shell.
self.server_ = fork();
if (self.server_ == 0) {
@ -396,8 +400,34 @@ void TLSServerRunner::start() {
::sleep(1);
}
void TLSServerRunner::setClientConfig() {
auto& self = instance();
self.tls_hostname_ = Flag::getValue("tls_hostname");
Flag::updateValue("tls_hostname", "localhost:" + port());
self.enroll_tls_endpoint_ = Flag::getValue("enroll_tls_endpoint");
Flag::updateValue("enroll_tls_endpoint", "/enroll");
self.tls_server_certs_ = Flag::getValue("tls_server_certs");
Flag::updateValue("tls_server_certs", kTestDataPath + "/test_server_ca.pem");
self.enroll_secret_path_ = Flag::getValue("enroll_secret_path");
Flag::updateValue("enroll_secret_path",
kTestDataPath + "/test_enroll_secret.txt");
}
void TLSServerRunner::unsetClientConfig() {
auto& self = instance();
Flag::updateValue("tls_hostname", self.tls_hostname_);
Flag::updateValue("enroll_tls_endpoint", self.enroll_tls_endpoint_);
Flag::updateValue("tls_server_certs", self.tls_server_certs_);
Flag::updateValue("enroll_secret_path", self.enroll_secret_path_);
}
void TLSServerRunner::stop() {
auto& self = instance();
kill(self.server_, SIGTERM);
self.server_ = 0;
}
}

View File

@ -116,6 +116,7 @@ std::vector<SplitStringTestData> generateSplitStringTestData();
// generate a small directory structure for testing
void createMockFileStructure();
// remove the small directory structure used for testing
void tearDownMockFileStructure();
@ -127,22 +128,32 @@ class TLSServerRunner : private boost::noncopyable {
return instance;
}
/// Set associated flags for testing client TLS usage.
static void setClientConfig();
/// Unset or restore associated flags for testing client TLS usage.
static void unsetClientConfig();
/// TCP port accessor.
static const std::string& port() { return instance().port_; }
/// Start the server if it hasn't started already.
static void start();
/// Stop the service when the process exits.
static void stop();
private:
TLSServerRunner()
: server_(0), port_(std::to_string(rand() % 10000 + 20000)){};
TLSServerRunner(TLSServerRunner const&);
void operator=(TLSServerRunner const&);
virtual ~TLSServerRunner() { stop(); }
/// Current server PID.
pid_t server_{0};
/// Current server TLS port.
std::string port_;
private:
pid_t server_;
std::string port_;
std::string tls_hostname_;
std::string enroll_tls_endpoint_;
std::string tls_server_certs_;
std::string enroll_secret_path_;
};
}

View File

@ -519,8 +519,14 @@ Status DatabasePlugin::call(const PluginRequest& request,
} else if (request.at("action") == "remove") {
return this->remove(domain, key);
} else if (request.at("action") == "scan") {
// Accumulate scanned keys into a vector.
std::vector<std::string> keys;
auto status = this->scan(domain, keys);
// Optionally allow the caller to request a max number of keys.
size_t max = 0;
if (request.count("max") > 0) {
max = std::stoul(request.at("max"));
}
auto status = this->scan(domain, keys, max);
for (const auto& key : keys) {
response.push_back({{"k", key}});
}
@ -562,8 +568,10 @@ Status deleteDatabaseValue(const std::string& domain, const std::string& key) {
}
Status scanDatabaseKeys(const std::string& domain,
std::vector<std::string>& keys) {
PluginRequest request = {{"action", "scan"}, {"domain", domain}};
std::vector<std::string>& keys,
size_t max) {
PluginRequest request = {
{"action", "scan"}, {"domain", domain}, {"max", std::to_string(max)}};
PluginResponse response;
auto status = Registry::call("database", "rocks", request, response);

View File

@ -33,19 +33,20 @@ class RocksDatabasePlugin : public DatabasePlugin {
/// Data retrieval method.
Status get(const std::string& domain,
const std::string& key,
std::string& value) const;
std::string& value) const override;
/// Data storage method.
Status put(const std::string& domain,
const std::string& key,
const std::string& value);
const std::string& value) override;
/// Data removal method.
Status remove(const std::string& domain, const std::string& k);
Status remove(const std::string& domain, const std::string& k) override;
/// Key/index lookup method.
Status scan(const std::string& domain,
std::vector<std::string>& results) const;
std::vector<std::string>& results,
size_t max = 0) const override;
};
/// Backing-storage provider for osquery internal/core.
@ -334,7 +335,8 @@ Status DBHandle::Delete(const std::string& domain,
}
Status DBHandle::Scan(const std::string& domain,
std::vector<std::string>& results) const {
std::vector<std::string>& results,
size_t max) const {
if (getDB() == nullptr) {
return Status(1, "Database not opened");
}
@ -347,8 +349,13 @@ Status DBHandle::Scan(const std::string& domain,
if (it == nullptr) {
return Status(1, "Could not get iterator for " + domain);
}
size_t count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
results.push_back(it->key().ToString());
if (max > 0 && ++count >= max) {
break;
}
}
delete it;
return Status(0, "OK");
@ -372,7 +379,8 @@ Status RocksDatabasePlugin::remove(const std::string& domain,
}
Status RocksDatabasePlugin::scan(const std::string& domain,
std::vector<std::string>& results) const {
return DBHandle::getInstance()->Scan(domain, results);
std::vector<std::string>& results,
size_t max) const {
return DBHandle::getInstance()->Scan(domain, results, max);
}
}

View File

@ -84,7 +84,7 @@ class DBHandle {
/// Allow DBHandle creations.
static void setAllowOpen(bool ao) { kDBHandleOptionAllowOpen = ao; }
private:
public:
/////////////////////////////////////////////////////////////////////////////
// Data access methods
/////////////////////////////////////////////////////////////////////////////
@ -144,7 +144,8 @@ class DBHandle {
* of the operation.
*/
Status Scan(const std::string& domain,
std::vector<std::string>& results) const;
std::vector<std::string>& results,
size_t max = 0) const;
private:
/**

View File

@ -0,0 +1,75 @@
/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <gtest/gtest.h>
#include <osquery/database.h>
#include "osquery/core/test_util.h"
namespace osquery {
class DatabaseTests : public testing::Test {};
TEST_F(DatabaseTests, test_set_value) {
auto s = setDatabaseValue(kLogs, "i", "{}");
EXPECT_TRUE(s.ok());
}
TEST_F(DatabaseTests, test_get_value) {
std::string expected = "{}";
setDatabaseValue(kLogs, "i", expected);
std::string value;
auto s = getDatabaseValue(kLogs, "i", value);
EXPECT_TRUE(s.ok());
EXPECT_EQ(value, expected);
// Unknown keys return failed, but will return empty data.
value.clear();
s = getDatabaseValue(kLogs, "does_not_exist", value);
EXPECT_FALSE(s.ok());
EXPECT_TRUE(value.empty());
}
TEST_F(DatabaseTests, test_scan_values) {
setDatabaseValue(kLogs, "1", "0");
setDatabaseValue(kLogs, "2", "0");
setDatabaseValue(kLogs, "3", "0");
std::vector<std::string> keys;
auto s = scanDatabaseKeys(kLogs, keys);
EXPECT_TRUE(s.ok());
EXPECT_GT(keys.size(), 2U);
keys.clear();
s = scanDatabaseKeys(kLogs, keys, 2);
EXPECT_TRUE(s.ok());
EXPECT_EQ(keys.size(), 2U);
}
TEST_F(DatabaseTests, test_delete_values) {
setDatabaseValue(kLogs, "k", "0");
std::string value;
getDatabaseValue(kLogs, "k", value);
EXPECT_FALSE(value.empty());
auto s = deleteDatabaseValue(kLogs, "k");
EXPECT_TRUE(s.ok());
// Make sure the key has been deleted.
value.clear();
s = getDatabaseValue(kLogs, "k", value);
EXPECT_FALSE(s.ok());
EXPECT_TRUE(value.empty());
}
}

View File

@ -99,11 +99,24 @@ TEST_F(DBHandleTests, test_scan) {
auto s = db_->Scan(kQueries, keys);
EXPECT_TRUE(s.ok());
EXPECT_EQ(s.toString(), "OK");
EXPECT_EQ(keys.size(), 3U);
for (const auto& i : expected) {
EXPECT_NE(std::find(keys.begin(), keys.end(), i), keys.end());
}
}
TEST_F(DBHandleTests, test_scan_limit) {
db_->Put(kQueries, "test_scan_foo1", "baz");
db_->Put(kQueries, "test_scan_foo2", "baz");
db_->Put(kQueries, "test_scan_foo3", "baz");
std::vector<std::string> keys;
auto s = db_->Scan(kQueries, keys, 2);
EXPECT_TRUE(s.ok());
EXPECT_EQ(s.toString(), "OK");
EXPECT_EQ(keys.size(), 2U);
}
TEST_F(DBHandleTests, test_rocksdb_loglevel) {
// Make sure a log file was created.
EXPECT_FALSE(pathExists(path_ + "/LOG"));

View File

@ -12,6 +12,7 @@
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <gtest/gtest.h>
#include <osquery/core.h>
@ -20,9 +21,6 @@
#include <osquery/sql.h>
#include "osquery/core/test_util.h"
#include "osquery/remote/requests.h"
#include "osquery/remote/serializers/json.h"
#include "osquery/remote/transports/tls.h"
#include "osquery/sql/sqlite_util.h"
@ -30,7 +28,6 @@ namespace pt = boost::property_tree;
DECLARE_string(distributed_tls_read_endpoint);
DECLARE_string(distributed_tls_write_endpoint);
DECLARE_string(enroll_tls_endpoint);
namespace osquery {
@ -38,14 +35,9 @@ class DistributedTests : public testing::Test {
protected:
void SetUp() {
TLSServerRunner::start();
TLSServerRunner::setClientConfig();
clearNodeKey();
tls_hostname_ = Flag::getValue("tls_hostname");
Flag::updateValue("tls_hostname", "localhost:" + TLSServerRunner::port());
enroll_tls_endpoint_ = Flag::getValue("enroll_tls_endpoint");
Flag::updateValue("enroll_tls_endpoint", "/enroll");
distributed_tls_read_endpoint_ =
Flag::getValue("distributed_tls_read_endpoint");
Flag::updateValue("distributed_tls_read_endpoint", "/distributed_read");
@ -54,36 +46,23 @@ class DistributedTests : public testing::Test {
Flag::getValue("distributed_tls_write_endpoint");
Flag::updateValue("distributed_tls_write_endpoint", "/distributed_write");
tls_server_certs_ = Flag::getValue("tls_server_certs");
Flag::updateValue("tls_server_certs",
kTestDataPath + "/test_server_ca.pem");
enroll_secret_path_ = Flag::getValue("enroll_secret_path");
Flag::updateValue("enroll_secret_path",
kTestDataPath + "/test_enroll_secret.txt");
Registry::setActive("distributed", "tls");
}
void TearDown() {
TLSServerRunner::stop();
TLSServerRunner::unsetClientConfig();
clearNodeKey();
Flag::updateValue("tls_hostname", tls_hostname_);
Flag::updateValue("enroll_tls_endpoint", enroll_tls_endpoint_);
Flag::updateValue("distributed_tls_read_endpoint",
distributed_tls_read_endpoint_);
Flag::updateValue("distributed_tls_write_endpoint",
distributed_tls_write_endpoint_);
Flag::updateValue("tls_server_certs", tls_server_certs_);
Flag::updateValue("enroll_secret_path", enroll_secret_path_);
}
std::string tls_hostname_;
std::string enroll_tls_endpoint_;
protected:
std::string distributed_tls_read_endpoint_;
std::string distributed_tls_write_endpoint_;
std::string tls_server_certs_;
std::string enroll_secret_path_;
};
TEST_F(DistributedTests, test_workflow) {

View File

@ -9,6 +9,9 @@ ADD_OSQUERY_LIBRARY(FALSE osquery_logger_plugins
plugins/syslog.cpp
)
# Kepp the logger testing in the additional to test filesystem logging.
# There is a significant difference between the Glog-backed filesystem plugin
# and other, which use a Glog sink. They must be tested in tandem.
file(GLOB OSQUERY_LOGGER_TESTS "tests/*.cpp")
ADD_OSQUERY_TEST(FALSE ${OSQUERY_LOGGER_TESTS})

View File

@ -0,0 +1,90 @@
/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <gtest/gtest.h>
#include <osquery/logger.h>
#include <osquery/database.h>
#include "osquery/core/test_util.h"
#include "osquery/logger/plugins/tls.h"
namespace pt = boost::property_tree;
namespace osquery {
class TLSLoggerTests : public testing::Test {
public:
size_t getIndex(const std::shared_ptr<TLSLoggerPlugin>& plugin) {
return plugin->log_index_;
}
void runCheck(const std::shared_ptr<TLSLogForwarderRunner>& runner) {
runner->check();
}
};
TEST_F(TLSLoggerTests, test_log) {
auto plugin = std::make_shared<TLSLoggerPlugin>();
std::vector<StatusLogLine> status;
status.push_back({O_INFO, "test.cpp", 0, "test"});
auto s = plugin->logStatus(status);
EXPECT_TRUE(s.ok());
// A single status log should have advanced the index by 1.
EXPECT_EQ(getIndex(plugin), 1U);
s = plugin->logString("{\"json\": true}");
EXPECT_TRUE(s.ok());
// The index is shared between statuses and strings.
EXPECT_EQ(getIndex(plugin), 2U);
}
TEST_F(TLSLoggerTests, test_database) {
auto plugin = std::make_shared<TLSLoggerPlugin>();
std::string expected = "{\"new_json\": true}";
plugin->logString(expected);
std::vector<std::string> indexes;
scanDatabaseKeys(kLogs, indexes);
EXPECT_EQ(indexes.size(), 3U);
// Iterate using an unordered search, and search for the expected string
// that was just logged.
bool found_string = false;
for (const auto& index : indexes) {
std::string value;
getDatabaseValue(kLogs, index, value);
found_string = (found_string || value == expected);
}
EXPECT_TRUE(found_string);
}
TEST_F(TLSLoggerTests, test_send) {
auto plugin = std::make_shared<TLSLoggerPlugin>();
for (size_t i = 0; i < 20; i++) {
std::string expected = "{\"more_json\": true}";
plugin->logString(expected);
}
// Start a server.
TLSServerRunner::start();
TLSServerRunner::setClientConfig();
// The runner should be dispatched as an osquery service.
auto runner = std::make_shared<TLSLogForwarderRunner>("fake_key");
runCheck(runner);
// Stop the server.
TLSServerRunner::unsetClientConfig();
TLSServerRunner::stop();
}
}

View File

@ -16,20 +16,23 @@
#include <osquery/enroll.h>
#include <osquery/flags.h>
#include <osquery/logger.h>
#include <osquery/registry.h>
#include <osquery/database.h>
#include "osquery/dispatcher/dispatcher.h"
#include "osquery/remote/requests.h"
#include "osquery/remote/transports/tls.h"
#include "osquery/remote/serializers/json.h"
#include "osquery/remote/utility.h"
#include "osquery/database/db_handle.h"
#include "osquery/logger/plugins/tls.h"
namespace pt = boost::property_tree;
namespace osquery {
constexpr size_t kTLSMaxLogLines = 1024;
constexpr size_t kTLSMaxLogLineSize = 1 * 1024 * 1024;
FLAG(string, logger_tls_endpoint, "", "TLS/HTTPS endpoint for results logging");
FLAG(int32,
@ -41,102 +44,6 @@ DECLARE_bool(tls_secret_always);
DECLARE_string(tls_enroll_override);
DECLARE_bool(tls_node_api);
/**
* @brief Control the number of backing-store buffered logs.
*
* The TLSLogForwarderRunner run loop requests the set of log indexes before
* sending logs to a TLS handler. If the number of indexes exceeds
* kTLSLoggerBufferMax the thread will set TLSLoggerPlugin::stop_buffering.
* Then the logger plugin stops buffering, and new logs will drop.
*/
const size_t kTLSLoggerBufferMax = 1024 * 1024;
class TLSLogForwarderRunner;
class TLSLoggerPlugin : public LoggerPlugin {
public:
TLSLoggerPlugin() : log_index_(0) {}
/**
* @brief The osquery logger initialization method.
*
* LoggerPlugin::init is optionally used by logger plugins to receive a
* buffer of status logs generated between application start and logger
* initialization. TLSLoggerPlugin will further buffer these logs into the
* backing store. They will flush to a TLS endpoint under normal conditions
* in a supporting/asynchronous thread.
*/
Status init(const std::string& name, const std::vector<StatusLogLine>& log);
public:
/// Log a result string. This is the basic catch-all for snapshots and events.
Status logString(const std::string& s);
/// Log a status (ERROR/WARNING/INFO) message.
Status logStatus(const std::vector<StatusLogLine>& log);
private:
/**
* @brief Hold an auto-incrementing offset for buffered logs.
*
* Logs are buffered to a backing store until they can be flushed to a TLS
* endpoint (based on latency/retry/etc options). Buffering uses a UNIX time
* second precision for indexing and ordering. log_index_ helps prevent
* collisions by appending an auto-increment counter.
*/
unsigned long log_index_;
/**
* @brief Start dropping logs by preventing buffering.
*
* If the TLS endpoint goes down while running and the backing store of log
* buffers fills up (exceeds a maximum number of log lines) then logs will
* start dropping.
*/
static bool stop_buffering;
private:
/// Allow the TLSLogForwardRunner thread to disable log buffering.
friend class TLSLogForwarderRunner;
};
/// Initialize the buffering stop to false.
bool TLSLoggerPlugin::stop_buffering = false;
/**
* @brief A log forwarder thread flushing database-buffered logs.
*
* The TLSLogForwarderRunner flushes buffered result and status logs based
* on CLI/options settings. If an enrollment key is set (and checked) during
* startup, this Dispatcher service is started.
*/
class TLSLogForwarderRunner : public InternalRunnable {
public:
explicit TLSLogForwarderRunner(const std::string& node_key)
: node_key_(node_key) {
uri_ = TLSRequestHelper::makeURI(FLAGS_logger_tls_endpoint);
}
/// A simple wait lock, and flush based on settings.
void start();
protected:
/**
* @brief Send labeled result logs.
*
* The log_data provided to send must be mutable.
* To optimize for smaller memory, this will be moved into place within the
* constructed property tree before sending.
*/
Status send(std::vector<std::string>& log_data, const std::string& log_type);
/// Receive an enrollment/node key from the backing store cache.
std::string node_key_;
/// Endpoint URI
std::string uri_;
};
REGISTER(TLSLoggerPlugin, "logger", "tls");
static inline std::string genLogIndex(bool results, unsigned long& counter) {
@ -144,20 +51,32 @@ static inline std::string genLogIndex(bool results, unsigned long& counter) {
std::to_string(++counter);
}
Status TLSLoggerPlugin::logString(const std::string& s) {
if (stop_buffering) {
return Status(1, "Buffer is paused, dropping logs");
static inline void iterate(std::vector<std::string>& input,
std::function<void(std::string&)> predicate) {
// Since there are no 'multi-do' APIs, keep a count of consecutive actions.
// This count allows us to sleep the thread to prevent utilization thrash.
size_t count = 0;
for (auto& item : input) {
// The predicate is provided a mutable string.
// It may choose to clear/move the data.
predicate(item);
if (++count % 100 == 0) {
osquery::interruptableSleep(20);
}
}
}
TLSLogForwarderRunner::TLSLogForwarderRunner(const std::string& node_key)
: node_key_(node_key) {
uri_ = TLSRequestHelper::makeURI(FLAGS_logger_tls_endpoint);
}
Status TLSLoggerPlugin::logString(const std::string& s) {
auto index = genLogIndex(true, log_index_);
return setDatabaseValue(kLogs, index, s);
}
Status TLSLoggerPlugin::logStatus(const std::vector<StatusLogLine>& log) {
if (stop_buffering) {
return Status(1, "Buffer is paused, dropping logs");
}
for (const auto& item : log) {
// Convert the StatusLogLine into ptree format, to convert to JSON.
pt::ptree buffer;
@ -215,76 +134,90 @@ Status TLSLogForwarderRunner::send(std::vector<std::string>& log_data,
params.put<std::string>("node_key", node_key_);
params.put<std::string>("log_type", log_type);
// Read each logged line into JSON and populate a list of lines.
// The result list will use the 'data' key.
pt::ptree children;
for (auto& item : log_data) {
pt::ptree child;
try {
std::stringstream input;
input << item;
pt::read_json(input, child);
} catch (const pt::json_parser::json_parser_error& e) {
// The log line entered was not valid JSON, skip it.
}
children.push_back(std::make_pair("", std::move(child)));
{
// Read each logged line into JSON and populate a list of lines.
// The result list will use the 'data' key.
pt::ptree children;
iterate(log_data,
([&children](std::string& item) {
pt::ptree child;
try {
std::stringstream input;
input << item;
std::string().swap(item);
pt::read_json(input, child);
} catch (const pt::json_parser::json_parser_error& e) {
// The log line entered was not valid JSON, skip it.
}
children.push_back(std::make_pair("", std::move(child)));
}));
params.add_child("data", std::move(children));
}
params.add_child("data", std::move(children));
auto request = Request<TLSTransport, JSONSerializer>(uri_);
return request.call(params);
}
inline void clearLogs(bool results, const std::vector<std::string>& indexes) {
for (const auto& index : indexes) {
if (results && index.at(0) != 'r') {
continue;
void TLSLogForwarderRunner::check() {
// Instead of using the 'help' database API, prefer to interact with the
// DBHandle directly for additional performance.
auto handle = DBHandle::getInstance();
// Get a list of all the buffered log items, with a max of 1024 lines.
std::vector<std::string> indexes;
auto status = handle->Scan(kLogs, indexes, kTLSMaxLogLines);
// For each index, accumulate the log line into the result or status set.
std::vector<std::string> results, statuses;
iterate(indexes,
([&handle, &results, &statuses](std::string& index) {
std::string value;
auto& target = ((index.at(0) == 'r') ? results : statuses);
if (handle->Get(kLogs, index, value)) {
// Enforce a max log line size for TLS logging.
if (value.size() > kTLSMaxLogLineSize) {
LOG(WARNING) << "Line exceeds TLS logger max: " << value.size();
} else {
target.push_back(std::move(value));
}
}
}));
// If any results/statuses were found in the flushed buffer, send.
if (results.size() > 0) {
if (!send(results, "result")) {
VLOG(1) << "Could not send results to logger URI: " << uri_;
} else {
// Clear the results logs once they were sent.
iterate(indexes,
([&results](std::string& index) {
if (index.at(0) != 'r') {
return;
}
deleteDatabaseValue(kLogs, index);
}));
}
}
if (statuses.size() > 0) {
if (!send(statuses, "status")) {
VLOG(1) << "Could not send status logs to logger URI: " << uri_;
} else {
// Clear the status logs once they were sent.
iterate(indexes,
([&results](std::string& index) {
if (index.at(0) != 's') {
return;
}
deleteDatabaseValue(kLogs, index);
}));
}
// If the value was flushed, remove from the backing store.
deleteDatabaseValue(kLogs, index);
}
}
void TLSLogForwarderRunner::start() {
while (true) {
// Get a list of all the buffered log items.
std::vector<std::string> indexes;
auto status = scanDatabaseKeys(kLogs, indexes);
if (indexes.size() > kTLSLoggerBufferMax) {
// The log buffer is filled. Stop buffering and start dropping logs.
TLSLoggerPlugin::stop_buffering = true;
} else if (TLSLoggerPlugin::stop_buffering == true) {
// If the buffering was paused, resume.
TLSLoggerPlugin::stop_buffering = false;
}
std::vector<std::string> results, statuses;
for (const auto& index : indexes) {
std::string value;
auto& target = ((index.at(0) == 'r') ? results : statuses);
if (getDatabaseValue(kLogs, index, value)) {
// Resist failure, only append delimiters if the value get succeeded.
target.push_back(std::move(value));
}
}
// If any results/statuses were found in the flushed buffer, send.
if (results.size() > 0) {
if (!send(results, "result")) {
VLOG(1) << "Could not send results to logger URI: " << uri_;
} else {
// Clear the results logs once they were sent.
clearLogs(true, indexes);
}
}
if (statuses.size() > 0) {
if (!send(statuses, "status")) {
VLOG(1) << "Could not send status logs to logger URI: " << uri_;
} else {
// Clear the status logs once they were sent.
clearLogs(false, indexes);
}
}
check();
// Cool off and time wait the configured period.
osquery::interruptableSleep(FLAGS_logger_tls_period * 1000);

View File

@ -0,0 +1,103 @@
/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <osquery/logger.h>
#include "osquery/dispatcher/dispatcher.h"
namespace osquery {
class TLSLogForwarderRunner;
/**
* @brief A log forwarder thread flushing database-buffered logs.
*
* The TLSLogForwarderRunner flushes buffered result and status logs based
* on CLI/options settings. If an enrollment key is set (and checked) during
* startup, this Dispatcher service is started.
*/
class TLSLogForwarderRunner : public InternalRunnable {
public:
explicit TLSLogForwarderRunner(const std::string& node_key);
/// A simple wait lock, and flush based on settings.
void start() override;
protected:
/**
* @brief Send labeled result logs.
*
* The log_data provided to send must be mutable.
* To optimize for smaller memory, this will be moved into place within the
* constructed property tree before sending.
*/
Status send(std::vector<std::string>& log_data, const std::string& log_type);
/**
* @brief Check for new logs and send.
*
* Scan the logs domain for up to 1024 log lines.
* Sort those lines into status and request types then forward (send) each
* set. On success, clear the data and indexes.
*/
void check();
/// Receive an enrollment/node key from the backing store cache.
std::string node_key_;
/// Endpoint URI
std::string uri_;
private:
friend class TLSLoggerTests;
};
class TLSLoggerPlugin : public LoggerPlugin {
public:
TLSLoggerPlugin() : log_index_(0) {}
/**
* @brief The osquery logger initialization method.
*
* LoggerPlugin::init is optionally used by logger plugins to receive a
* buffer of status logs generated between application start and logger
* initialization. TLSLoggerPlugin will further buffer these logs into the
* backing store. They will flush to a TLS endpoint under normal conditions
* in a supporting/asynchronous thread.
*/
Status init(const std::string& name,
const std::vector<StatusLogLine>& log) override;
public:
/// Log a result string. This is the basic catch-all for snapshots and events.
Status logString(const std::string& s) override;
/// Log a status (ERROR/WARNING/INFO) message.
Status logStatus(const std::vector<StatusLogLine>& log) override;
private:
/**
* @brief Hold an auto-incrementing offset for buffered logs.
*
* Logs are buffered to a backing store until they can be flushed to a TLS
* endpoint (based on latency/retry/etc options). Buffering uses a UNIX time
* second precision for indexing and ordering. log_index_ helps prevent
* collisions by appending an auto-increment counter.
*/
size_t log_index_;
private:
/// Allow the TLSLogForwardRunner thread to disable log buffering.
friend class TLSLogForwarderRunner;
friend class TLSLoggerTests;
};
}