/* * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. * */ #include #include #include #include #include #include #include #include #include #include #include "osquery/config/parsers/decorators.h" #include "osquery/logger/plugins/buffered.h" namespace pt = boost::property_tree; namespace osquery { FLAG(uint64, buffered_log_max, 1000000, "Maximum number of logs in buffered output plugins (0 = unlimited)"); const auto BufferedLogForwarder::kLogPeriod = std::chrono::seconds(4); const size_t BufferedLogForwarder::kMaxLogLines = 1024; Status BufferedLogForwarder::setUp() { // initialize buffer_count_ by scanning the DB std::vector indexes; auto status = scanDatabaseKeys(kLogs, indexes, index_name_, 0); if (!status.ok()) { return Status(1, "Error scanning for buffered log count"); } buffer_count_ = indexes.size(); return Status(0); } void BufferedLogForwarder::check() { // Get a list of all the buffered log items, with a max of 1024 lines. std::vector indexes; auto status = scanDatabaseKeys(kLogs, indexes, index_name_, max_log_lines_); // For each index, accumulate the log line into the result or status set. std::vector results, statuses; iterate(indexes, ([&results, &statuses, this](std::string& index) { std::string value; auto& target = isResultIndex(index) ? results : statuses; if (getDatabaseValue(kLogs, index, value)) { target.push_back(std::move(value)); } })); // If any results/statuses were found in the flushed buffer, send. if (results.size() > 0) { status = send(results, "result"); if (!status.ok()) { VLOG(1) << "Error sending results to logger: " << status.getMessage(); } else { // Clear the results logs once they were sent. iterate(indexes, ([this](std::string& index) { if (!isResultIndex(index)) { return; } deleteValueWithCount(kLogs, index); })); } } if (statuses.size() > 0) { status = send(statuses, "status"); if (!status.ok()) { VLOG(1) << "Error sending status to logger: " << status.getMessage(); } else { // Clear the status logs once they were sent. iterate(indexes, ([this](std::string& index) { if (!isStatusIndex(index)) { return; } deleteValueWithCount(kLogs, index); })); } } // Purge any logs exceeding the max after our send attempt if (FLAGS_buffered_log_max > 0) { purge(); } } void BufferedLogForwarder::purge() { if (buffer_count_ <= FLAGS_buffered_log_max) { return; } size_t purge_count = buffer_count_ - FLAGS_buffered_log_max; // Collect purge_count indexes of each type (result/status) before // partitioning to find the oldest. Note this assumes that the indexes are // returned in ascending lexicographic order (true for RocksDB). std::vector indexes; auto status = scanDatabaseKeys(kLogs, indexes, genIndexPrefix(true), purge_count); if (!status.ok()) { LOG(ERROR) << "Error scanning DB during buffered log purge"; return; } LOG(WARNING) << "Purging buffered logs limit (" << FLAGS_buffered_log_max << ") exceeded: " << buffer_count_; std::vector status_indexes; status = scanDatabaseKeys(kLogs, status_indexes, genIndexPrefix(false), purge_count); if (!status.ok()) { LOG(ERROR) << "Error scanning DB during buffered log purge"; return; } indexes.insert(indexes.end(), status_indexes.begin(), status_indexes.end()); if (indexes.size() < purge_count) { LOG(ERROR) << "Trying to purge " << purge_count << " logs but only found " << indexes.size(); return; } size_t prefix_size = genIndexPrefix(true).size(); // Partition the indexes so that the first purge_count elements are the // oldest indexes (the ones to be purged) std::nth_element(indexes.begin(), indexes.begin() + purge_count - 1, indexes.end(), [&](const std::string& a, const std::string& b) { // Skip the prefix when doing comparisons return a.compare(prefix_size, std::string::npos, b, prefix_size, std::string::npos) < 0; }); indexes.erase(indexes.begin() + purge_count, indexes.end()); // Now only indexes of logs to be deleted remain iterate(indexes, [this](const std::string& index) { if (!deleteValueWithCount(kLogs, index).ok()) { LOG(ERROR) << "Error deleting value during buffered log purge"; } }); } void BufferedLogForwarder::start() { while (!interrupted()) { check(); // Cool off and time wait the configured period. pauseMilli(log_period_); } } Status BufferedLogForwarder::logString(const std::string& s, size_t time) { std::string index = genResultIndex(time); return addValueWithCount(kLogs, index, s); } Status BufferedLogForwarder::logStatus(const std::vector& log, size_t time) { // Append decorations to status // Assemble a decorations tree to append to each status buffer line. pt::ptree dtree; std::map decorations; getDecorations(decorations); for (const auto& decoration : decorations) { dtree.put(decoration.first, decoration.second); } for (const auto& item : log) { // Convert the StatusLogLine into ptree format, to convert to JSON. pt::ptree buffer; buffer.put("severity", (google::LogSeverity)item.severity); buffer.put("filename", item.filename); buffer.put("line", item.line); buffer.put("message", item.message); buffer.put("version", kVersion); if (decorations.size() > 0) { buffer.put_child("decorations", dtree); } // Convert to JSON, for storing a string-representation in the database. std::string json; try { std::stringstream json_output; pt::write_json(json_output, buffer, false); json = json_output.str(); } catch (const pt::json_parser::json_parser_error& e) { // The log could not be represented as JSON. return Status(1, e.what()); } // Store the status line in a backing store. if (!json.empty()) { json.pop_back(); } std::string index = genStatusIndex(time); Status status = addValueWithCount(kLogs, index, json); if (!status.ok()) { // Do not continue if any line fails. return status; } } return Status(0); } bool BufferedLogForwarder::isIndex(const std::string& index, bool results) { size_t target = index_name_.size() + 1; return target < index.size() && index.at(target) == (results ? 'r' : 's'); } bool BufferedLogForwarder::isResultIndex(const std::string& index) { return isIndex(index, true); } bool BufferedLogForwarder::isStatusIndex(const std::string& index) { return isIndex(index, false); } std::string BufferedLogForwarder::genResultIndex(size_t time) { return genIndex(true, time); } std::string BufferedLogForwarder::genStatusIndex(size_t time) { return genIndex(false, time); } std::string BufferedLogForwarder::genIndexPrefix(bool results) { return index_name_ + "_" + ((results) ? "r" : "s") + "_"; } std::string BufferedLogForwarder::genIndex(bool results, size_t time) { if (time == 0) { time = getUnixTime(); } return genIndexPrefix(results) + std::to_string(time) + "_" + std::to_string(++log_index_); } Status BufferedLogForwarder::addValueWithCount(const std::string& domain, const std::string& key, const std::string& value) { Status status = setDatabaseValue(domain, key, value); if (status.ok()) { buffer_count_++; } return status; } Status BufferedLogForwarder::deleteValueWithCount(const std::string& domain, const std::string& key) { Status status = deleteDatabaseValue(domain, key); if (status.ok()) { buffer_count_--; } return status; } }