osquery-1/osquery/config/config.cpp
Jesse Kornblum c7355b19aa Update osquery licensing wording (#5452)
Summary:
Pull Request resolved: https://github.com/facebook/osquery/pull/5452

As suggested in another diff, this diff updates the language we use to describe the osquery licensing terms. We are changing all instances of

//This source code is licensed as defined on the LICENSE file found in the root directory of this source tree.//

to

//This source code is licensed in accordance with the terms specified in the LICENSE file found in the root directory of this source tree.//

We accomplish this with a codemod:

  $ codemod -md xplat/osquery/oss --extensions cpp,h,in,py,sh,mm,ps1 "(.\s+)This source code is licensed as defined on the LICENSE file found in the(.*)root directory of this source tree\." "\1This source code is licensed in accordance with the terms specified in\2the LICENSE file found in the root directory of this source tree."

Reviewed By: fmanco

Differential Revision: D14131290

fbshipit-source-id: 52c90da342263e2a80f5a678ecd760c19cf7513e
2019-02-19 10:59:48 -08:00

1122 lines
35 KiB
C++

/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed in accordance with the terms specified in
* the LICENSE file found in the root directory of this source tree.
*/
#include <algorithm>
#include <chrono>
#include <functional>
#include <map>
#include <string>
#include <vector>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/iterator/filter_iterator.hpp>
#include <osquery/config/config.h>
#include <osquery/database.h>
#include <osquery/events.h>
#include <osquery/flagalias.h>
#include <osquery/flags.h>
#include <osquery/hashing/hashing.h>
#include <osquery/killswitch.h>
#include <osquery/logger.h>
#include <osquery/packs.h>
#include <osquery/registry.h>
#include <osquery/system.h>
#include <osquery/tables.h>
#include <osquery/utils/conversions/split.h>
#include <osquery/utils/conversions/tryto.h>
#include <osquery/utils/system/time.h>
namespace rj = rapidjson;
namespace osquery {
namespace {
/// Prefix to persist config data
const std::string kConfigPersistencePrefix{"config_persistence."};
using ConfigMap = std::map<std::string, std::string>;
std::atomic<bool> is_first_time_refresh(true);
}; // namespace
/**
* @brief Config plugin registry.
*
* This creates an osquery registry for "config" which may implement
* ConfigPlugin. A ConfigPlugin's call API should make use of a genConfig
* after reading JSON data in the plugin implementation.
*/
CREATE_REGISTRY(ConfigPlugin, "config");
/**
* @brief ConfigParser plugin registry.
*
* This creates an osquery registry for "config_parser" which may implement
* ConfigParserPlugin. A ConfigParserPlugin should not export any call actions
* but rather have a simple property tree-accessor API through Config.
*/
CREATE_LAZY_REGISTRY(ConfigParserPlugin, "config_parser");
/// The config plugin must be known before reading options.
CLI_FLAG(string, config_plugin, "filesystem", "Config plugin name");
CLI_FLAG(bool,
config_check,
false,
"Check the format of an osquery config and exit");
CLI_FLAG(bool, config_dump, false, "Dump the contents of the configuration");
CLI_FLAG(uint64,
config_refresh,
0,
"Optional interval in seconds to re-read configuration");
FLAG_ALIAS(google::uint64, config_tls_refresh, config_refresh);
/// How long to wait when config update fails
CLI_FLAG(uint64,
config_accelerated_refresh,
300,
"Interval to wait if reading a configuration fails");
CLI_FLAG(bool,
config_enable_backup,
false,
"Backup config and use it when refresh fails");
FLAG_ALIAS(google::uint64,
config_tls_accelerated_refresh,
config_accelerated_refresh);
DECLARE_string(config_plugin);
DECLARE_string(pack_delimiter);
/**
* @brief The backing store key name for the executing query.
*
* The config maintains schedule statistics and tracks failed executions.
* On process or worker resume an initializer or config may check if the
* resume was the result of a failure during an executing query.
*/
const std::string kExecutingQuery{"executing_query"};
const std::string kFailedQueries{"failed_queries"};
/// The time osquery was started.
std::atomic<size_t> kStartTime;
// The config may be accessed and updated asynchronously; use mutexes.
Mutex config_hash_mutex_;
Mutex config_refresh_mutex_;
Mutex config_backup_mutex_;
/// Several config methods require enumeration via predicate lambdas.
RecursiveMutex config_schedule_mutex_;
RecursiveMutex config_files_mutex_;
RecursiveMutex config_performance_mutex_;
using PackRef = std::unique_ptr<Pack>;
/**
* The schedule is an iterable collection of Packs. When you iterate through
* a schedule, you only get the packs that should be running on the host that
* you're currently operating on.
*/
class Schedule : private boost::noncopyable {
public:
/// Under the hood, the schedule is just a list of the Pack objects
using container = std::vector<PackRef>;
/**
* @brief Create a schedule maintained by the configuration.
*
* This will check for previously executing queries. If any query was
* executing it is considered in a 'dirty' state and should generate logs.
* The schedule may also choose to blacklist this query.
*/
Schedule();
/**
* @brief This class' iteration function
*
* Our step operation will be called on each element in packs_. It is
* responsible for determining if that element should be returned as the
* next iterator element or skipped.
*/
struct Step {
bool operator()(const PackRef& pack) const;
};
/// Add a pack to the schedule
void add(PackRef pack);
/// Remove a pack, by name.
void remove(const std::string& pack);
/// Remove a pack by name and source.
void remove(const std::string& pack, const std::string& source);
/// Remove all packs by source.
void removeAll(const std::string& source);
/// Boost gives us a nice template for maintaining the state of the iterator
using iterator = boost::filter_iterator<Step, container::iterator>;
iterator begin();
iterator end();
PackRef& last();
private:
/// Underlying storage for the packs
container packs_;
/**
* @brief The schedule will check and record previously executing queries.
*
* If a query is found on initialization, the name will be recorded, it is
* possible to skip previously failed queries.
*/
std::string failed_query_;
/**
* @brief List of blacklisted queries.
*
* A list of queries that are blacklisted from executing due to prior
* failures. If a query caused a worker to fail it will be recorded during
* the next execution and saved to the blacklist.
*/
std::map<std::string, size_t> blacklist_;
private:
friend class Config;
};
bool Schedule::Step::operator()(const PackRef& pack) const {
return pack->shouldPackExecute();
}
void Schedule::add(PackRef pack) {
remove(pack->getName(), pack->getSource());
packs_.push_back(std::move(pack));
}
void Schedule::remove(const std::string& pack) {
remove(pack, "");
}
void Schedule::remove(const std::string& pack, const std::string& source) {
auto new_end = std::remove_if(
packs_.begin(), packs_.end(), [pack, source](const PackRef& p) {
if (p->getName() == pack &&
(p->getSource() == source || source == "")) {
Config::get().removeFiles(source + FLAGS_pack_delimiter +
p->getName());
return true;
}
return false;
});
packs_.erase(new_end, packs_.end());
}
void Schedule::removeAll(const std::string& source) {
auto new_end =
std::remove_if(packs_.begin(), packs_.end(), [source](const PackRef& p) {
if (p->getSource() == source) {
Config::get().removeFiles(source + FLAGS_pack_delimiter +
p->getName());
return true;
}
return false;
});
packs_.erase(new_end, packs_.end());
}
Schedule::iterator Schedule::begin() {
return Schedule::iterator(packs_.begin(), packs_.end());
}
Schedule::iterator Schedule::end() {
return Schedule::iterator(packs_.end(), packs_.end());
}
PackRef& Schedule::last() {
return packs_.back();
}
/**
* @brief A thread that periodically reloads configuration state.
*
* This refresh runner thread can refresh any configuration plugin.
* It may accelerate the time between checks if the configuration fails to load.
* For configurations pulled from the network this assures that configuration
* is fresh when re-attaching.
*/
class ConfigRefreshRunner : public InternalRunnable {
public:
ConfigRefreshRunner() : InternalRunnable("ConfigRefreshRunner") {}
/// A simple wait/interruptible lock.
void start() override;
private:
/// The current refresh rate in seconds.
std::atomic<size_t> refresh_sec_{0};
private:
friend class Config;
};
void restoreScheduleBlacklist(std::map<std::string, size_t>& blacklist) {
std::string content;
getDatabaseValue(kPersistentSettings, kFailedQueries, content);
auto blacklist_pairs = osquery::split(content, ":");
if (blacklist_pairs.size() == 0 || blacklist_pairs.size() % 2 != 0) {
// Nothing in the blacklist, or malformed data.
return;
}
size_t current_time = getUnixTime();
for (size_t i = 0; i < blacklist_pairs.size() / 2; i++) {
// Fill in a mapping of query name to time the blacklist expires.
auto expire =
tryTo<long long>(blacklist_pairs[(i * 2) + 1], 10).takeOr(0ll);
if (expire > 0 && current_time < (size_t)expire) {
blacklist[blacklist_pairs[(i * 2)]] = (size_t)expire;
}
}
}
void saveScheduleBlacklist(const std::map<std::string, size_t>& blacklist) {
std::string content;
for (const auto& query : blacklist) {
if (!content.empty()) {
content += ":";
}
content += query.first + ":" + std::to_string(query.second);
}
setDatabaseValue(kPersistentSettings, kFailedQueries, content);
}
Schedule::Schedule() {
if (RegistryFactory::get().external()) {
// Extensions should not restore or save schedule details.
return;
}
// Parse the schedule's query blacklist from backing storage.
restoreScheduleBlacklist(blacklist_);
// Check if any queries were executing when the tool last stopped.
getDatabaseValue(kPersistentSettings, kExecutingQuery, failed_query_);
if (!failed_query_.empty()) {
LOG(WARNING) << "Scheduled query may have failed: " << failed_query_;
setDatabaseValue(kPersistentSettings, kExecutingQuery, "");
// Add this query name to the blacklist and save the blacklist.
blacklist_[failed_query_] = getUnixTime() + 86400;
saveScheduleBlacklist(blacklist_);
}
}
Config::Config()
: schedule_(std::make_unique<Schedule>()),
valid_(false),
refresh_runner_(std::make_shared<ConfigRefreshRunner>()) {}
Config& Config::get() {
static Config instance;
return instance;
}
void Config::addPack(const std::string& name,
const std::string& source,
const rj::Value& obj) {
assert(obj.IsObject());
auto addSinglePack = ([this, &source](const std::string pack_name,
const rj::Value& pack_obj) {
RecursiveLock wlock(config_schedule_mutex_);
try {
schedule_->add(std::make_unique<Pack>(pack_name, source, pack_obj));
if (schedule_->last()->shouldPackExecute()) {
applyParsers(source + FLAGS_pack_delimiter + pack_name, pack_obj, true);
}
} catch (const std::exception& e) {
LOG(WARNING) << "Error adding pack: " << pack_name << ": " << e.what();
}
});
if (name == "*") {
// This is a multi-pack, expect the config plugin to have generated a
// "name": {pack-content} response similar to embedded pack content
// within the configuration.
for (const auto& pack : obj.GetObject()) {
addSinglePack(pack.name.GetString(), pack.value);
}
} else {
addSinglePack(name, obj);
}
}
size_t Config::getStartTime() {
return kStartTime;
}
void Config::setStartTime(size_t st) {
kStartTime = st;
}
void Config::removePack(const std::string& pack) {
RecursiveLock wlock(config_schedule_mutex_);
return schedule_->remove(pack);
}
void Config::addFile(const std::string& source,
const std::string& category,
const std::string& path) {
RecursiveLock wlock(config_files_mutex_);
files_[source][category].push_back(path);
}
void Config::removeFiles(const std::string& source) {
RecursiveLock wlock(config_files_mutex_);
if (files_.count(source)) {
FileCategories().swap(files_[source]);
}
}
/**
* @brief Return true if the failed query is no longer blacklisted.
*
* There are two scenarios where a blacklisted query becomes 'unblacklisted'.
* The first is simple, the amount of time it was blacklisted for has expired.
* The second is more complex, the query failed but the schedule has requested
* that the query should not be blacklisted.
*
* @param blt The time the query was originally blacklisted.
* @param query The scheduled query and its options.
*/
static inline bool blacklistExpired(size_t blt, const ScheduledQuery& query) {
if (getUnixTime() > blt) {
return true;
}
auto blo = query.options.find("blacklist");
if (blo != query.options.end() && blo->second == false) {
// The schedule requested that we do not blacklist this query.
return true;
}
return false;
}
void Config::scheduledQueries(
std::function<void(std::string name, const ScheduledQuery& query)>
predicate,
bool blacklisted) const {
RecursiveLock lock(config_schedule_mutex_);
for (PackRef& pack : *schedule_) {
for (auto& it : pack->getSchedule()) {
std::string name = it.first;
// The query name may be synthetic.
if (pack->getName() != "main") {
name = "pack" + FLAGS_pack_delimiter + pack->getName() +
FLAGS_pack_delimiter + it.first;
}
// They query may have failed and been added to the schedule's blacklist.
auto blacklisted_query = schedule_->blacklist_.find(name);
if (blacklisted_query != schedule_->blacklist_.end()) {
if (blacklistExpired(blacklisted_query->second, it.second)) {
// The blacklisted query passed the expiration time (remove).
schedule_->blacklist_.erase(blacklisted_query);
saveScheduleBlacklist(schedule_->blacklist_);
it.second.blacklisted = false;
} else {
// The query is still blacklisted.
it.second.blacklisted = true;
if (!blacklisted) {
// The caller does not want blacklisted queries.
continue;
}
}
}
// Call the predicate.
predicate(std::move(name), it.second);
}
}
}
void Config::packs(std::function<void(const Pack& pack)> predicate) const {
RecursiveLock lock(config_schedule_mutex_);
for (PackRef& pack : schedule_->packs_) {
predicate(std::cref(*pack.get()));
}
}
Status Config::refresh() {
PluginResponse response;
auto status = Registry::call("config", {{"action", "genConfig"}}, response);
WriteLock lock(config_refresh_mutex_);
if (!status.ok()) {
if (FLAGS_config_refresh > 0 && getRefresh() == FLAGS_config_refresh) {
VLOG(1) << "Using accelerated configuration delay";
setRefresh(FLAGS_config_accelerated_refresh);
}
loaded_ = true;
if (Killswitch::get().isConfigBackupEnabled()) {
if (FLAGS_config_enable_backup && is_first_time_refresh.exchange(false)) {
const auto result = restoreConfigBackup();
if (!result) {
return Status::failure(result.getError().getMessage());
} else {
update(*result);
}
}
} else {
LOG(INFO) << "Config backup is disabled by the killswitch";
}
return status;
} else if (getRefresh() != FLAGS_config_refresh) {
VLOG(1) << "Normal configuration delay restored";
setRefresh(FLAGS_config_refresh);
}
// if there was a response, parse it and update internal state
valid_ = true;
if (response.size() > 0) {
if (FLAGS_config_dump) {
// If config checking is enabled, debug-write the raw config data.
for (const auto& content : response[0]) {
fprintf(stdout,
"{\"%s\": %s}\n",
content.first.c_str(),
content.second.c_str());
}
// Don't force because the config plugin may have started services.
Initializer::requestShutdown();
return Status();
}
status = update(response[0]);
}
is_first_time_refresh = false;
loaded_ = true;
return status;
}
void Config::setRefresh(size_t refresh_sec) {
refresh_runner_->refresh_sec_ = refresh_sec;
}
size_t Config::getRefresh() const {
return refresh_runner_->refresh_sec_;
}
Status Config::load() {
valid_ = false;
auto config_plugin = RegistryFactory::get().getActive("config");
if (!RegistryFactory::get().exists("config", config_plugin)) {
return Status(1, "Missing config plugin " + config_plugin);
}
// Set the initial and optional refresh value.
setRefresh(FLAGS_config_refresh);
/*
* If the initial configuration includes a non-0 refresh, start an
* additional service that sleeps and periodically regenerates the
* configuration.
*/
if (!FLAGS_config_check && !started_thread_ && getRefresh() > 0) {
Dispatcher::addService(refresh_runner_);
started_thread_ = true;
}
return refresh();
}
void stripConfigComments(std::string& json) {
std::string sink;
boost::replace_all(json, "\\\n", "");
for (auto& line : osquery::split(json, "\n")) {
boost::trim(line);
if (line.size() > 0 && line[0] == '#') {
continue;
}
if (line.size() > 1 && line[0] == '/' && line[1] == '/') {
continue;
}
sink += line + '\n';
}
json = sink;
}
Expected<ConfigMap, Config::RestoreConfigError> Config::restoreConfigBackup() {
LOG(INFO) << "Restoring backed up config from the database";
std::vector<std::string> keys;
ConfigMap config;
WriteLock lock(config_backup_mutex_);
scanDatabaseKeys(kPersistentSettings, keys, kConfigPersistencePrefix);
for (const auto& key : keys) {
std::string value;
Status status = getDatabaseValue(kPersistentSettings, key, value);
if (!status.ok()) {
LOG(ERROR)
<< "restoreConfigBackup database failed to retrieve config for key "
<< key;
return createError(Config::RestoreConfigError::DatabaseError,
"Could not retrieve value for the key: " + key);
}
config[key.substr(kConfigPersistencePrefix.length())] = std::move(value);
}
return config;
}
void Config::backupConfig(const ConfigMap& config) {
LOG(INFO) << "BackupConfig started";
std::vector<std::string> keys;
WriteLock lock(config_backup_mutex_);
scanDatabaseKeys(kPersistentSettings, keys, kConfigPersistencePrefix);
for (const auto& key : keys) {
if (config.find(key.substr(kConfigPersistencePrefix.length())) ==
config.end()) {
deleteDatabaseValue(kPersistentSettings, key);
}
}
for (const auto& source : config) {
setDatabaseValue(kPersistentSettings,
kConfigPersistencePrefix + source.first,
source.second);
}
}
Status Config::updateSource(const std::string& source,
const std::string& json) {
// Compute a 'synthesized' hash using the content before it is parsed.
if (!hashSource(source, json)) {
// This source did not change, the returned status allows the caller to
// choose to reconfigure if any sources had changed.
return Status(2);
}
{
RecursiveLock lock(config_schedule_mutex_);
// Remove all packs from this source.
schedule_->removeAll(source);
// Remove all files from this source.
removeFiles(source);
}
// load the config (source.second) into a JSON object.
auto doc = JSON::newObject();
auto clone = json;
stripConfigComments(clone);
if (!doc.fromString(clone) || !doc.doc().IsObject()) {
return Status(1, "Error parsing the config JSON");
}
// extract the "schedule" key and store it as the main pack
auto& rf = RegistryFactory::get();
if (doc.doc().HasMember("schedule") && !rf.external()) {
auto& schedule = doc.doc()["schedule"];
if (schedule.IsObject()) {
auto main_doc = JSON::newObject();
auto queries_obj = main_doc.getObject();
main_doc.copyFrom(schedule, queries_obj);
main_doc.add("queries", queries_obj);
addPack("main", source, main_doc.doc());
}
}
// extract the "packs" key into additional pack objects
if (doc.doc().HasMember("packs") && !rf.external()) {
auto& packs = doc.doc()["packs"];
if (packs.IsObject()) {
for (const auto& pack : packs.GetObject()) {
std::string pack_name = pack.name.GetString();
if (pack.value.IsObject()) {
// The pack is a JSON object, treat the content as pack data.
addPack(pack_name, source, pack.value);
} else if (pack.value.IsString()) {
genPack(pack_name, source, pack.value.GetString());
}
}
}
}
applyParsers(source, doc.doc(), false);
return Status();
}
Status Config::genPack(const std::string& name,
const std::string& source,
const std::string& target) {
// If the pack value is a string (and not a JSON object) then it is a
// resource to be handled by the config plugin.
PluginResponse response;
PluginRequest request = {
{"action", "genPack"}, {"name", name}, {"value", target}};
Registry::call("config", request, response);
if (response.size() == 0 || response[0].count(name) == 0) {
return Status(1, "Invalid plugin response");
}
auto clone = response[0][name];
if (clone.empty()) {
LOG(WARNING) << "Error reading the query pack named: " << name;
return Status();
}
stripConfigComments(clone);
auto doc = JSON::newObject();
if (!doc.fromString(clone) || !doc.doc().IsObject()) {
LOG(WARNING) << "Error parsing the \"" << name << "\" pack JSON";
} else {
addPack(name, source, doc.doc());
}
return Status();
}
void Config::applyParsers(const std::string& source,
const rj::Value& obj,
bool pack) {
assert(obj.IsObject());
// Iterate each parser.
RecursiveLock lock(config_schedule_mutex_);
for (const auto& plugin : RegistryFactory::get().plugins("config_parser")) {
std::shared_ptr<ConfigParserPlugin> parser = nullptr;
try {
parser = std::dynamic_pointer_cast<ConfigParserPlugin>(plugin.second);
} catch (const std::bad_cast& /* e */) {
LOG(ERROR) << "Error casting config parser plugin: " << plugin.first;
}
if (parser == nullptr || parser.get() == nullptr) {
continue;
}
// For each key requested by the parser, add a property tree reference.
std::map<std::string, JSON> parser_config;
for (const auto& key : parser->keys()) {
if (obj.HasMember(key) && !obj[key].IsNull()) {
if (!obj[key].IsArray() && !obj[key].IsObject()) {
LOG(WARNING) << "Error config " << key
<< " should be an array or object";
continue;
}
auto doc = JSON::newFromValue(obj[key]);
parser_config.emplace(std::make_pair(key, std::move(doc)));
}
}
// The config parser plugin will receive a copy of each property tree for
// each top-level-config key. The parser may choose to update the config's
// internal state
parser->update(source, parser_config);
}
}
Status Config::update(const ConfigMap& config) {
// A config plugin may call update from an extension. This will update
// the config instance within the extension process and the update must be
// reflected in the core.
if (RegistryFactory::get().external()) {
for (const auto& source : config) {
PluginRequest request = {
{"action", "update"},
{"source", source.first},
{"data", source.second},
};
// A "update" registry item within core should call the core's update
// method. The config plugin call action handling must also know to
// update.
auto status = Registry::call("config", "update", request);
if (!status.ok()) {
// If something goes wrong, do not go with update further
return status;
}
}
}
// Iterate though each source and overwrite config data.
// This will add/overwrite pack data, append to the schedule, change watched
// files, set options, etc.
// Before this occurs, take an opportunity to purge stale state.
purge();
bool needs_reconfigure = false;
for (const auto& source : config) {
auto status = updateSource(source.first, source.second);
if (status.getCode() == 2) {
// The source content did not change.
continue;
}
if (!status.ok()) {
LOG(ERROR) << "updateSource failed to parse config, of source: "
<< source.first << " and content: " << source.second;
return status;
}
// If a source was updated and the content has changed, then the registry
// should be reconfigured. File watches may have changed, etc.
needs_reconfigure = true;
}
if (loaded_ && needs_reconfigure) {
// The config has since been loaded.
// This update call is most likely a response to an async update request
// from a config plugin. This request should request all plugins to update.
for (const auto& registry : RegistryFactory::get().all()) {
if (registry.first == "event_publisher" ||
registry.first == "event_subscriber") {
continue;
}
registry.second->configure();
}
EventFactory::configUpdate();
}
// This cannot be under the previous if block because on extensions loaded_
// allways false.
if (needs_reconfigure) {
std::string loggers = RegistryFactory::get().getActive("logger");
for (const auto& logger : osquery::split(loggers, ",")) {
LOG(INFO) << "Calling configure for logger " << logger;
PluginRef plugin = Registry::get().plugin("logger", logger);
if (plugin) {
plugin->configure();
}
}
}
if (FLAGS_config_enable_backup) {
backupConfig(config);
}
return Status(0, "OK");
}
void Config::purge() {
// The first use of purge is removing expired query results.
std::vector<std::string> saved_queries;
scanDatabaseKeys(kQueries, saved_queries);
auto queryExists = [schedule = static_cast<const Schedule*>(schedule_.get())](
const std::string& query_name) {
for (const auto& pack : schedule->packs_) {
const auto& pack_queries = pack->getSchedule();
if (pack_queries.count(query_name)) {
return true;
}
}
return false;
};
RecursiveLock lock(config_schedule_mutex_);
// Iterate over each result set in the database.
for (const auto& saved_query : saved_queries) {
if (queryExists(saved_query)) {
continue;
}
std::string content;
getDatabaseValue(kPersistentSettings, "timestamp." + saved_query, content);
if (content.empty()) {
// No timestamp is set for this query, perhaps this is the first time
// query results expiration is applied.
setDatabaseValue(kPersistentSettings,
"timestamp." + saved_query,
std::to_string(getUnixTime()));
continue;
}
// Parse the timestamp and compare.
size_t last_executed = 0;
try {
last_executed = boost::lexical_cast<size_t>(content);
} catch (const boost::bad_lexical_cast& /* e */) {
// Erase the timestamp as is it potentially corrupt.
deleteDatabaseValue(kPersistentSettings, "timestamp." + saved_query);
continue;
}
if (last_executed < getUnixTime() - 592200) {
// Query has not run in the last week, expire results and interval.
deleteDatabaseValue(kQueries, saved_query);
deleteDatabaseValue(kQueries, saved_query + "epoch");
deleteDatabaseValue(kPersistentSettings, "interval." + saved_query);
deleteDatabaseValue(kPersistentSettings, "timestamp." + saved_query);
VLOG(1) << "Expiring results for scheduled query: " << saved_query;
}
}
}
void Config::reset() {
setStartTime(getUnixTime());
schedule_ = std::make_unique<Schedule>();
std::map<std::string, QueryPerformance>().swap(performance_);
std::map<std::string, FileCategories>().swap(files_);
std::map<std::string, std::string>().swap(hash_);
valid_ = false;
loaded_ = false;
is_first_time_refresh = true;
refresh_runner_ = std::make_shared<ConfigRefreshRunner>();
started_thread_ = false;
// Also request each parse to reset state.
for (const auto& plugin : RegistryFactory::get().plugins("config_parser")) {
std::shared_ptr<ConfigParserPlugin> parser = nullptr;
try {
parser = std::dynamic_pointer_cast<ConfigParserPlugin>(plugin.second);
} catch (const std::bad_cast& /* e */) {
continue;
}
if (parser == nullptr || parser.get() == nullptr) {
continue;
}
parser->reset();
}
}
void ConfigParserPlugin::reset() {
// Resets will clear all top-level keys from the parser's data store.
for (auto& category : data_.doc().GetObject()) {
auto obj = data_.getObject();
data_.add(category.name.GetString(), obj, data_.doc());
}
}
void Config::recordQueryPerformance(const std::string& name,
size_t delay,
const Row& r0,
const Row& r1) {
RecursiveLock lock(config_performance_mutex_);
if (performance_.count(name) == 0) {
performance_[name] = QueryPerformance();
}
// Grab access to the non-const schedule item.
auto& query = performance_.at(name);
if (!r1.at("user_time").empty() && !r0.at("user_time").empty()) {
auto ut1 = tryTo<long long>(r1.at("user_time"));
auto ut0 = tryTo<long long>(r0.at("user_time"));
auto diff = (ut1 && ut0) ? ut1.take() - ut0.take() : 0;
if (diff > 0) {
query.user_time += diff;
}
}
if (!r1.at("system_time").empty() && !r0.at("system_time").empty()) {
auto st1 = tryTo<long long>(r1.at("system_time"));
auto st0 = tryTo<long long>(r0.at("system_time"));
auto diff = (st1 && st0) ? st1.take() - st0.take() : 0;
if (diff > 0) {
query.system_time += diff;
}
}
if (!r1.at("resident_size").empty() && !r0.at("resident_size").empty()) {
auto rs1 = tryTo<long long>(r1.at("resident_size"));
auto rs0 = tryTo<long long>(r0.at("resident_size"));
auto diff = (rs1 && rs0) ? rs1.take() - rs0.take() : 0;
if (diff > 0) {
// Memory is stored as an average of RSS changes between query executions.
query.average_memory = (query.average_memory * query.executions) + diff;
query.average_memory = (query.average_memory / (query.executions + 1));
}
}
query.wall_time += delay;
query.executions += 1;
query.last_executed = getUnixTime();
// Clear the executing query (remove the dirty bit).
setDatabaseValue(kPersistentSettings, kExecutingQuery, "");
}
void Config::recordQueryStart(const std::string& name) {
// There should only ever be a single executing query in the schedule.
setDatabaseValue(kPersistentSettings, kExecutingQuery, name);
// Store the time this query name last executed for later results eviction.
// When configuration updates occur the previous schedule is searched for
// 'stale' query names, aka those that have week-old or longer last execute
// timestamps. Offending queries have their database results purged.
setDatabaseValue(
kPersistentSettings, "timestamp." + name, std::to_string(getUnixTime()));
}
void Config::getPerformanceStats(
const std::string& name,
std::function<void(const QueryPerformance& query)> predicate) const {
if (performance_.count(name) > 0) {
RecursiveLock lock(config_performance_mutex_);
predicate(performance_.at(name));
}
}
bool Config::hashSource(const std::string& source, const std::string& content) {
Hash hash(HASH_TYPE_SHA1);
hash.update(content.c_str(), content.size());
auto new_hash = hash.digest();
WriteLock wlock(config_hash_mutex_);
if (hash_[source] == new_hash) {
return false;
}
hash_[source] = new_hash;
return true;
}
Status Config::genHash(std::string& hash) const {
WriteLock lock(config_hash_mutex_);
if (!valid_) {
return Status(1, "Current config is not valid");
}
std::vector<char> buffer;
buffer.reserve(hash_.size() * 32);
auto add = [&buffer](const std::string& text) {
for (const auto& c : text) {
buffer.push_back(c);
}
};
for (const auto& it : hash_) {
add(it.second);
}
Hash new_hash(HASH_TYPE_SHA1);
new_hash.update(buffer.data(), buffer.size());
hash = new_hash.digest();
return Status(0, "OK");
}
std::string Config::getHash(const std::string& source) const {
WriteLock lock(config_hash_mutex_);
if (!hash_.count(source)) {
return std::string();
}
return hash_.at(source);
}
const std::shared_ptr<ConfigParserPlugin> Config::getParser(
const std::string& parser) {
if (!RegistryFactory::get().exists("config_parser", parser, true)) {
return nullptr;
}
auto plugin = RegistryFactory::get().plugin("config_parser", parser);
// This is an error, need to check for existence (and not nullptr).
return std::dynamic_pointer_cast<ConfigParserPlugin>(plugin);
}
void Config::files(std::function<void(const std::string& category,
const std::vector<std::string>& files)>
predicate) const {
RecursiveLock lock(config_files_mutex_);
for (const auto& it : files_) {
for (const auto& category : it.second) {
predicate(category.first, category.second);
}
}
}
Config::~Config() = default;
Status ConfigPlugin::genPack(const std::string& name,
const std::string& value,
std::string& pack) {
return Status(1, "Not implemented");
}
Status ConfigPlugin::call(const PluginRequest& request,
PluginResponse& response) {
auto action = request.find("action");
if (action == request.end()) {
return Status::failure("Config plugins require an action");
}
if (action->second == "genConfig") {
std::map<std::string, std::string> config;
auto stat = genConfig(config);
response.push_back(config);
return stat;
} else if (action->second == "genPack") {
auto name = request.find("name");
auto value = request.find("value");
if (name == request.end() || value == request.end()) {
return Status(1, "Missing name or value");
}
std::string pack;
auto stat = genPack(name->second, value->second, pack);
response.push_back({{name->second, pack}});
return stat;
} else if (action->second == "update") {
auto source = request.find("source");
auto data = request.find("data");
if (source == request.end() || data == request.end()) {
return Status(1, "Missing source or data");
}
return Config::get().update({{source->second, data->second}});
} else if (action->second == "option") {
auto name = request.find("name");
if (name == request.end()) {
return Status(1, "Missing option name");
}
response.push_back(
{{"name", name->second}, {"value", Flag::getValue(name->second)}});
return Status();
}
return Status(1, "Config plugin action unknown: " + action->second);
}
Status ConfigParserPlugin::setUp() {
for (const auto& key : keys()) {
auto obj = data_.getObject();
data_.add(key, obj);
}
return Status();
}
void ConfigRefreshRunner::start() {
while (!interrupted()) {
// Cool off and time wait the configured period.
// Apply this interruption initially as at t=0 the config was read.
pause(std::chrono::seconds(refresh_sec_));
// Since the pause occurs before the logic, we need to check for an
// interruption request.
if (interrupted()) {
return;
}
VLOG(1) << "Refreshing configuration state";
Config::get().refresh();
}
}
}