diff --git a/docs/wiki/deployment/remote.md b/docs/wiki/deployment/remote.md index 56de0bab..653bdc4a 100644 --- a/docs/wiki/deployment/remote.md +++ b/docs/wiki/deployment/remote.md @@ -83,6 +83,61 @@ The posted logger data is exactly the same as logged to disk by the **filesystem {} ``` +**Distributed** read request POST body: + +```json +{ + "node_key": "..." +} +``` + +**Distributed** read responde POST body: + +```json +{ + "queries": { + "id1": "select * from osquery_info;", + "id2": "select * from osquery_schedule;" + } +} +``` + +**Distributed** write request POST body: + +```json +{ + "node_key": "...", + "queries": { + "id1": [ + { + "column1": "value1", + "column2": "value2" + }, + { + "column1": "value1", + "column2": "value2" + } + ], + "id2": [ + { + "column1": "value1", + "column2": "value2" + }, + { + "column1": "value1", + "column2": "value2" + } + ] + } +} +``` + +**Distributed** write request POST body: + +```json +{} +``` + **Customizations** There are several unlisted flags to further control the remote settings. These controls are helpful if using a somewhat opaque API. diff --git a/docs/wiki/installation/cli-flags.md b/docs/wiki/installation/cli-flags.md index 8756c975..83587a9c 100644 --- a/docs/wiki/installation/cli-flags.md +++ b/docs/wiki/installation/cli-flags.md @@ -15,6 +15,7 @@ Include line-delimited switches to be interpreted and used as CLI-flags: ``` --config_plugin=custom_plugin --logger_plugin=custom_plugin +--distributed_plugin=custom_plugin --watchlog_level=2 ``` @@ -107,9 +108,9 @@ Extensions are loaded as processes. They are expected to start a thrift service Optional path to a list of autoloaded library module-based extensions. Modules are similar to extensions but are loaded as shared libraries. They are less flexible and should be built using the same GCC runtime and developer dependency library versions as osqueryd. See the extensions [deployment](../deployment/extensions.md) page for more details on extension module autoloading. -### Remote settings (optional for config/logger) flags +### Remote settings (optional for config/logger/distributed) flags -When using non-default [remote](../deployment/remote.md) plugins such as the **tls** config and logger plugins, there are process-wide settings applied to every plugin. +When using non-default [remote](../deployment/remote.md) plugins such as the **tls** config, logger and distributed plugins, there are process-wide settings applied to every plugin. `--tls_hostname=""` @@ -143,6 +144,11 @@ The **tls** endpoint path, e.g.: **/api/v1/config** when using the **tls** confi The configuration **tls** endpoint refresh interval. By default a configuration is fetched only at osquery load. If the configuration should be auto-updated set a "refresh" time to a value in seconds. This option enforces a minimum of 10 seconds. If the configuration endpoint cannot be reached during run, during an attempted refresh, the normal retry approach is applied. + +`--config_tls_max_attempts=3` + +The total number of attempts that will be made to the remote config server if a request fails. + `--logger_tls_endpoint=""` The **tls** endpoint path, e.g.: **/api/v1/logger** when using the **tls** logger plugin. See the other **tls_** related CLI flags. @@ -155,6 +161,18 @@ See the **tls**/[remote](../deployment/remote.md) plugin documentation. An enrol See the **tls**/[remote](../deployment/remote.md) plugin documentation. This is a number of seconds before checking for buffered logs. Results are sent to the TLS endpoint in intervals, not on demand (unless the period=0). +`--distributed_tls_read_endpoint=/foobar` + +The URI path which will be used, in conjunction with `tls_hostname`, to create the remote URI for retrieving distributed queries when using the **tls** distributed plugin. + +`--distributed_tls_write_endpoint=/foobar` + +The URI path which will be used, in conjunction with `tls_hostname`, to create the remote URI for submitting the results of distributed queries when using the **tls** distributed plugin. + +`--distributed_tls_max_attempts=3` + +The total number of attempts that will be made to the remote distributed query server if a request fails when using the **tls** distributed plugin. + ## Runtime flags `--read_max=52428800` (50MB) @@ -204,10 +222,6 @@ Number of work dispatch threads. Limit the schedule, 0 for no limit. Optionally limit the osqueryd's life by adding a schedule limit in seconds. This should only be used for testing. -`--distributed_retries=3` - -(Unsupported) Times to retry retrieving distributed queries. - `--disable_tables=table_name1,table_name2` Comma-delimited list of table names to be disabled. @@ -263,6 +277,20 @@ Directory path for ERROR/WARN/INFO and results logging. Maximum returned row value size. +## Distributed Flags + +`--distributed_plugin=tls` + +Distributed plugin name. The default distributed plugin is not set. You must set `--distributed_enabled=true --distributed_plugin=tls` (or whatever plugin you'd rather use instead of TLS) to use the distributed feature. + +`--distributed_enabled=false` + +Main killswitch for distributed queries functionality. By default, this is turned off. + +`--distributed_poll_interval=60` + +In seconds, the amount of time that osqueryd will wait between periodically checking in with a distributed query server to see if there are any queries to execute. + ## Shell-only flags Most of the shell flags are self-explanatory and are adapted from the SQLite shell. Refer to the shell's ".help" command for details and explanations. diff --git a/include/osquery/config.h b/include/osquery/config.h index 91a3a166..1671b07e 100644 --- a/include/osquery/config.h +++ b/include/osquery/config.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -93,10 +94,6 @@ class Config { private: Config() : schedule_(Schedule()), valid_(false){}; - protected: - typedef boost::unique_lock WriteLock; - typedef boost::shared_lock ReadLock; - public: /// Get a singleton instance of the Config class static Config& getInstance() { diff --git a/include/osquery/core.h b/include/osquery/core.h index 40f8abb4..fd9c9d9d 100644 --- a/include/osquery/core.h +++ b/include/osquery/core.h @@ -13,6 +13,8 @@ #include #include +#include + #include // clang-format off @@ -64,6 +66,9 @@ enum ToolType { OSQUERY_EXTENSION, }; +typedef boost::unique_lock WriteLock; +typedef boost::shared_lock ReadLock; + /// The osquery tool type for runtime decisions. extern ToolType kToolType; diff --git a/include/osquery/database.h b/include/osquery/database.h index ea4a0ba9..069b2b9d 100644 --- a/include/osquery/database.h +++ b/include/osquery/database.h @@ -387,6 +387,142 @@ Status serializeQueryLogItemAsEvents(const QueryLogItem& item, Status serializeQueryLogItemAsEventsJSON(const QueryLogItem& i, std::string& json); +///////////////////////////////////////////////////////////////////////////// +// DistributedQueryRequest +///////////////////////////////////////////////////////////////////////////// + +/** + * @brief Small struct containing the query and ID information for a + * distributed query + */ +struct DistributedQueryRequest { + public: + explicit DistributedQueryRequest() {} + explicit DistributedQueryRequest(const std::string& q, const std::string& i) + : query(q), id(i) {} + + /// equals operator + bool operator==(const DistributedQueryRequest& comp) const { + return (comp.query == query) && (comp.id == id); + } + + std::string query; + std::string id; +}; + +/** + * @brief Serialize a DistributedQueryRequest into a property tree + * + * @param r the DistributedQueryRequest to serialize + * @param tree the output property tree + * + * @return Status indicating the success or failure of the operation + */ +Status serializeDistributedQueryRequest(const DistributedQueryRequest& r, + boost::property_tree::ptree& tree); + +/** + * @brief Serialize a DistributedQueryRequest object into a JSON string + * + * @param r the DistributedQueryRequest to serialize + * @param json the output JSON string + * + * @return Status indicating the success or failure of the operation + */ +Status serializeDistributedQueryRequestJSON(const DistributedQueryRequest& r, + std::string& json); + +/** + * @brief Deserialize a DistributedQueryRequest object from a property tree + * + * @param tree the input property tree + * @param r the output DistributedQueryRequest structure + * + * @return Status indicating the success or failure of the operation + */ +Status deserializeDistributedQueryRequest( + const boost::property_tree::ptree& tree, DistributedQueryRequest& r); + +/** + * @brief Deserialize a DistributedQueryRequest object from a JSON string + * + * @param json the input JSON string + * @param r the output DistributedQueryRequest structure + * + * @return Status indicating the success or failure of the operation + */ +Status deserializeDistributedQueryRequestJSON(const std::string& json, + DistributedQueryRequest& r); + +///////////////////////////////////////////////////////////////////////////// +// DistributedQueryResult +///////////////////////////////////////////////////////////////////////////// + +/** + * @brief Small struct containing the results of a distributed query + */ +struct DistributedQueryResult { + public: + explicit DistributedQueryResult() {} + explicit DistributedQueryResult(const DistributedQueryRequest& req, + const QueryData& res) + : request(req), results(res) {} + + /// equals operator + bool operator==(const DistributedQueryResult& comp) const { + return (comp.request == request) && (comp.results == results); + } + + DistributedQueryRequest request; + QueryData results; +}; + +/** + * @brief Serialize a DistributedQueryResult into a property tree + * + * @param r the DistributedQueryResult to serialize + * @param tree the output property tree + * + * @return Status indicating the success or failure of the operation + */ +Status serializeDistributedQueryResult(const DistributedQueryResult& r, + boost::property_tree::ptree& tree); + +/** + * @brief Serialize a DistributedQueryResult object into a JSON string + * + * @param r the DistributedQueryResult to serialize + * @param json the output JSON string + * + * @return Status indicating the success or failure of the operation + */ +Status serializeDistributedQueryResultJSON(const DistributedQueryResult& r, + std::string& json); + +/** + * @brief Deserialize a DistributedQueryResult object from a property tree + * + * @param tree the input property tree + * @param r the output DistributedQueryResult structure + * + * @return Status indicating the success or failure of the operation + */ +Status deserializeDistributedQueryResult( + const boost::property_tree::ptree& tree, DistributedQueryResult& r); + +/** + * @brief Deserialize a DistributedQueryResult object from a JSON string + * + * @param json the input JSON string + * @param r the output DistributedQueryResult structure + * + * @return Status indicating the success or failure of the operation + */ +Status deserializeDistributedQueryResultJSON(const std::string& json, + DistributedQueryResult& r); + +///////////////////////////////////////////////////////////////////////////// + /** * @brief An osquery backing storage (database) type that persists executions. * diff --git a/include/osquery/distributed.h b/include/osquery/distributed.h new file mode 100644 index 00000000..a90fcdbb --- /dev/null +++ b/include/osquery/distributed.h @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2014, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include + +namespace osquery { + +class DistributedPlugin : public Plugin { + public: + /** + * @brief Get the queries to be executed + * + * Consider the following example JSON which represents the expected format + * + * @code{.json} + * { + * "queries": { + * "id1": "select * from osquery_info", + * "id2": "select * from osquery_schedule" + * } + * } + * @endcode + * + * @param json is the string to populate the queries data structure with + * @return a Status indicating the success or failure of the operation + */ + virtual Status getQueries(std::string& json) = 0; + + /** + * @brief Write the results that were executed + * + * Consider the following JSON which represents the format that will be used: + * + * @code{.json} + * { + * "queries": { + * "id1": [ + * { + * "col1": "val1", + * "col2": "val2" + * }, + * { + * "col1": "val1", + * "col2": "val2" + * } + * ], + * "id2": [ + * { + * "col1": "val1", + * "col2": "val2" + * } + * ] + * } + * } + * @endcode + * + * @param json is the results data to write + * @return a Status indicating the success or failure of the operation + */ + virtual Status writeResults(const std::string& json) = 0; + + /// Main entrypoint for distirbuted plugin requests + Status call(const PluginRequest& request, PluginResponse& response); +}; + +CREATE_REGISTRY(DistributedPlugin, "distributed"); + +/** + * @brief Class for managing the set of distributed queries to execute + * + * Consider the following workflow example, without any error handling + * + * @code{.cpp} + * auto dist = Distributed(); + * while (true) { + * dist.pullUpdates(); + * if (dist.getPendingQueryCount() > 0) { + * dist.runQueries(); + * } + * } + * @endcode + */ +class Distributed { + public: + /// Default constructor + Distributed(){}; + + /// Retrieve queued queries from a remote server + Status pullUpdates(); + + /// Get the number of queries which are waiting to be executed + size_t getPendingQueryCount(); + + /// Get the number of results which are waiting to be flushed + size_t getCompletedCount(); + + /// Serialize result data into a JSON string and clear the results + Status serializeResults(std::string& json); + + /// Process and execute queued queries + Status runQueries(); + + protected: + /** + * @brief Process several queries from a distributed plugin + * + * Given a response from a distributed plugin, parse the results and enqueue + * them in the internal state of the class + * + * @param work is the string from DistributedPlugin::getQueries + * @return a Status indicating the success or failure of the operation + */ + Status acceptWork(const std::string& work); + + /** + * @brief Pop a request object off of the queries_ member + * + * @return a DistributedQueryRequest object which needs to be executed + */ + DistributedQueryRequest popRequest(); + + /** + * @brief Queue a result to be batch sent to the server + * + * @param result is a DistributedQueryResult object to be sent to the server + */ + void addResult(const DistributedQueryResult& result); + + /** + * @brief Flush all of the collected results to the server + */ + Status flushCompleted(); + + protected: + std::vector queries_; + std::vector results_; + + private: + friend class DistributedTests; + FRIEND_TEST(DistributedTests, test_workflow); +}; +} diff --git a/include/osquery/sql.h b/include/osquery/sql.h index b8d0f197..c6c8af91 100644 --- a/include/osquery/sql.h +++ b/include/osquery/sql.h @@ -54,7 +54,7 @@ class SQL { * * @return A QueryData object of the query results */ - const QueryData& rows(); + const QueryData& rows() const; /** * @brief Accessor to switch off of when checking the success of a query @@ -68,7 +68,7 @@ class SQL { * * @return The query status */ - Status getStatus(); + const Status& getStatus() const; /** * @brief Accessor for the message string indicating the status of the query @@ -77,15 +77,6 @@ class SQL { */ std::string getMessageString(); - /** - * @brief Add host info columns onto existing QueryData - * - * Use this to add columns providing host info to the query results. - * Distributed queries use this to add host information before returning - * results to the aggregator. - */ - void annotateHostInfo(); - /** * @brief Accessor for the list of queryable tables * diff --git a/osquery/config/plugins/tls.cpp b/osquery/config/plugins/tls.cpp index 6bd27c68..062c4292 100644 --- a/osquery/config/plugins/tls.cpp +++ b/osquery/config/plugins/tls.cpp @@ -20,15 +20,15 @@ #include "osquery/dispatcher/dispatcher.h" #include "osquery/remote/requests.h" -#include "osquery/remote/transports/tls.h" #include "osquery/remote/serializers/json.h" - -#define CONFIG_TLS_MAX_ATTEMPTS 3 +#include "osquery/remote/utility.h" namespace pt = boost::property_tree; namespace osquery { +FLAG(uint64, config_tls_max_attempts, 3, "Number of times to attempt a request") + /// Config retrieval TLS endpoint (path) using TLS hostname. CLI_FLAG(string, config_tls_endpoint, @@ -49,6 +49,9 @@ class TLSConfigPlugin : public ConfigPlugin { public: Status setUp(); Status genConfig(std::map& config); + + protected: + std::string uri_; }; class TLSConfigRefreshRunner : public InternalRunnable { @@ -67,85 +70,20 @@ Status TLSConfigPlugin::setUp() { if (FLAGS_config_tls_refresh >= 10) { Dispatcher::addService(std::make_shared()); } - return Status(0, "OK"); -} -Status makeTLSConfigRequest(const std::string& uri, - const std::string& node_key, - pt::ptree& output) { - // Make a request to the config endpoint, providing the node secret. - pt::ptree params; - - // If using a GET request, append the node_key to the URI variables. - std::string uri_suffix; - if (FLAGS_tls_node_api) { - uri_suffix = "&node_key=" + node_key; - } else { - params.put("node_key", node_key); - } - - // Again check for GET to call with/without parameters. - auto request = Request(uri + uri_suffix); - auto status = (FLAGS_tls_node_api) ? request.call() : request.call(params); - - if (!status.ok()) { - return status; - } - - // The call succeeded, store the enrolled key. - status = request.getResponse(output); - if (!status.ok()) { - return status; - } - - // Receive config or key rejection - if (output.count("node_invalid") > 0 || output.count("error") > 0) { - return Status(1, "Config retrieval failed: Invalid node key"); - } + uri_ = TLSRequestHelper::makeURI(FLAGS_config_tls_endpoint); return Status(0, "OK"); } Status TLSConfigPlugin::genConfig(std::map& config) { - auto node_key = getNodeKey("tls"); - auto uri = "https://" + FLAGS_tls_hostname; - if (FLAGS_tls_node_api) { - // The TLS API should treat clients as nodes. - // In this case the node_key acts as an identifier (node) and the endpoints - // (if provided) are treated as edges from the nodes. - uri += "/" + node_key; + std::string json; + + auto s = TLSRequestHelper::go( + uri_, json, FLAGS_config_tls_max_attempts); + if (s.ok()) { + config["tls_plugin"] = json; } - uri += FLAGS_config_tls_endpoint; - - // Some APIs may require persistent identification. - if (FLAGS_tls_secret_always) { - uri += ((uri.find("?") != std::string::npos) ? "&" : "?") + - FLAGS_tls_enroll_override + "=" + getEnrollSecret(); - } - - pt::ptree recv; - for (size_t i = 1; i <= CONFIG_TLS_MAX_ATTEMPTS; i++) { - auto status = makeTLSConfigRequest(uri, node_key, recv); - if (status.ok()) { - std::stringstream ss; - try { - pt::write_json(ss, recv, false); - } catch (const pt::json_parser::json_parser_error& e) { - // The response content could not be represented as JSON. - continue; - } - - config["tls_plugin"] = ss.str(); - return Status(0, "OK"); - } else if (i == CONFIG_TLS_MAX_ATTEMPTS) { - break; - } - - LOG(WARNING) << "Failed config retrieval from " << uri << " (" - << status.what() << ") retrying..."; - ::sleep(i * i); - } - - return Status(1, "TLSConfigPlugin failed"); + return s; } void TLSConfigRefreshRunner::start() { diff --git a/osquery/core/init.cpp b/osquery/core/init.cpp index ce2f1e3b..12edd71e 100644 --- a/osquery/core/init.cpp +++ b/osquery/core/init.cpp @@ -98,6 +98,9 @@ CLI_FLAG(bool, CLI_FLAG(bool, daemonize, false, "Run as daemon (osqueryd only)"); #endif +DECLARE_string(distributed_plugin); +DECLARE_bool(distributed_enabled); + ToolType kToolType = OSQUERY_TOOL_UNKNOWN; void printUsage(const std::string& binary, int tool) { @@ -374,6 +377,13 @@ void Initializer::start() { initActivePlugin("logger", FLAGS_logger_plugin); initLogger(binary_); + // Initialize the distributed plugin, if necessary + if (FLAGS_distributed_enabled) { + if (Registry::exists("distributed", FLAGS_distributed_plugin)) { + initActivePlugin("distributed", FLAGS_distributed_plugin); + } + } + // Start event threads. osquery::attachEvents(); EventFactory::delay(); diff --git a/osquery/core/test_util.cpp b/osquery/core/test_util.cpp index a33ca378..f99f2358 100644 --- a/osquery/core/test_util.cpp +++ b/osquery/core/test_util.cpp @@ -8,10 +8,12 @@ * */ -#include -#include #include +#include +#include +#include +#include #include #include @@ -329,4 +331,26 @@ void createMockFileStructure() { void tearDownMockFileStructure() { boost::filesystem::remove_all(kFakeDirectory); } + +void TLSServerRunner::start() { + auto& self = instance(); + if (self.server_ != 0) { + return; + } + + // Fork then exec a shell. + self.server_ = fork(); + if (self.server_ == 0) { + // Start a python TLS/HTTPS or HTTP server. + auto script = kTestDataPath + "/test_http_server.py --tls " + self.port_; + execlp("sh", "sh", "-c", script.c_str(), nullptr); + ::exit(0); + } + ::sleep(1); +} + +void TLSServerRunner::stop() { + auto& self = instance(); + kill(self.server_, SIGTERM); +} } diff --git a/osquery/core/test_util.h b/osquery/core/test_util.h index 6f55eace..5224e121 100644 --- a/osquery/core/test_util.h +++ b/osquery/core/test_util.h @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -108,4 +109,31 @@ std::vector generateSplitStringTestData(); void createMockFileStructure(); // remove the small directory structure used for testing void tearDownMockFileStructure(); + +class TLSServerRunner : private boost::noncopyable { + public: + /// Create a singleton TLS server runner. + static TLSServerRunner& instance() { + static TLSServerRunner instance; + return instance; + } + + /// TCP port accessor. + static const std::string& port() { return instance().port_; } + /// Start the server if it hasn't started already. + static void start(); + /// Stop the service when the process exits. + static void stop(); + + private: + TLSServerRunner() + : server_(0), port_(std::to_string(rand() % 10000 + 20000)){}; + TLSServerRunner(TLSServerRunner const&); + void operator=(TLSServerRunner const&); + virtual ~TLSServerRunner() { stop(); } + + private: + pid_t server_; + std::string port_; +}; } diff --git a/osquery/database/database.cpp b/osquery/database/database.cpp index 96e9ec63..00ae443a 100644 --- a/osquery/database/database.cpp +++ b/osquery/database/database.cpp @@ -390,6 +390,133 @@ Status serializeQueryLogItemAsEventsJSON(const QueryLogItem& i, return Status(0, "OK"); } +///////////////////////////////////////////////////////////////////////////// +// DistributedQueryRequest - small struct containing the query and ID +// information for a distributed query +///////////////////////////////////////////////////////////////////////////// + +Status serializeDistributedQueryRequest(const DistributedQueryRequest& r, + pt::ptree& tree) { + tree.put("query", r.query); + tree.put("id", r.id); + return Status(0, "OK"); +} + +Status serializeDistributedQueryRequestJSON(const DistributedQueryRequest& r, + std::string& json) { + pt::ptree tree; + auto s = serializeDistributedQueryRequest(r, tree); + if (!s.ok()) { + return s; + } + std::stringstream ss; + try { + pt::write_json(ss, tree, false); + } catch (const pt::ptree_error& e) { + return Status(1, "Error serializing JSON: " + std::string(e.what())); + } + json = ss.str(); + + return Status(0, "OK"); +} + +Status deserializeDistributedQueryRequest(const pt::ptree& tree, + DistributedQueryRequest& r) { + r.query = tree.get("query", ""); + r.id = tree.get("id", ""); + return Status(0, "OK"); +} + +Status deserializeDistributedQueryRequestJSON(const std::string& json, + DistributedQueryRequest& r) { + std::stringstream ss(json); + pt::ptree tree; + try { + pt::read_json(ss, tree); + } catch (const pt::ptree_error& e) { + return Status(1, "Error serializing JSON: " + std::string(e.what())); + } + return deserializeDistributedQueryRequest(tree, r); +} + +///////////////////////////////////////////////////////////////////////////// +// DistributedQueryResult - small struct containing the results of a +// distributed query +///////////////////////////////////////////////////////////////////////////// + +Status serializeDistributedQueryResult(const DistributedQueryResult& r, + pt::ptree& tree) { + pt::ptree request; + auto s = serializeDistributedQueryRequest(r.request, request); + if (!s.ok()) { + return s; + } + + pt::ptree results; + s = serializeQueryData(r.results, results); + if (!s.ok()) { + return s; + } + + tree.add_child("request", request); + tree.add_child("results", results); + + return Status(0, "OK"); +} + +Status serializeDistributedQueryResultJSON(const DistributedQueryResult& r, + std::string& json) { + pt::ptree tree; + auto s = serializeDistributedQueryResult(r, tree); + if (!s.ok()) { + return s; + } + std::stringstream ss; + try { + pt::write_json(ss, tree, false); + } catch (const pt::ptree_error& e) { + return Status(1, "Error serializing JSON: " + std::string(e.what())); + } + json = ss.str(); + + return Status(0, "OK"); +} + +Status deserializeDistributedQueryResult(const pt::ptree& tree, + DistributedQueryResult& r) { + DistributedQueryRequest request; + auto s = + deserializeDistributedQueryRequest(tree.get_child("request"), request); + if (!s.ok()) { + return s; + } + + QueryData results; + s = deserializeQueryData(tree.get_child("results"), results); + if (!s.ok()) { + return s; + } + + r.request = request; + r.results = results; + + return Status(0, "OK"); +} + +Status deserializeDistributedQueryResultJSON(const std::string& json, + DistributedQueryResult& r) { + std::stringstream ss(json); + pt::ptree tree; + try { + pt::read_json(ss, tree); + } catch (const pt::ptree_error& e) { + return Status(1, "Error serializing JSON: " + std::string(e.what())); + } + return deserializeDistributedQueryResult(tree, r); +} + +///////////////////////////////////////////////////////////////////////////// + bool addUniqueRowToQueryData(QueryData& q, const Row& r) { if (std::find(q.begin(), q.end(), r) != q.end()) { return false; diff --git a/osquery/database/tests/results_tests.cpp b/osquery/database/tests/results_tests.cpp index 2c16d010..a7423759 100644 --- a/osquery/database/tests/results_tests.cpp +++ b/osquery/database/tests/results_tests.cpp @@ -137,6 +137,111 @@ TEST_F(ResultsTests, test_deserialize_query_log_item_json) { EXPECT_EQ(output, results.second); } +TEST_F(ResultsTests, test_serialize_distributed_query_request) { + DistributedQueryRequest r; + r.query = "foo"; + r.id = "bar"; + + pt::ptree tree; + auto s = serializeDistributedQueryRequest(r, tree); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(tree.get("query"), "foo"); + EXPECT_EQ(tree.get("id"), "bar"); +} + +TEST_F(ResultsTests, test_deserialize_distributed_query_request) { + pt::ptree tree; + tree.put("query", "foo"); + tree.put("id", "bar"); + + DistributedQueryRequest r; + auto s = deserializeDistributedQueryRequest(tree, r); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(r.query, "foo"); + EXPECT_EQ(r.id, "bar"); +} + +TEST_F(ResultsTests, test_deserialize_distributed_query_request_json) { + auto json = + "{" + " \"query\": \"foo\"," + " \"id\": \"bar\"" + "}"; + + DistributedQueryRequest r; + auto s = deserializeDistributedQueryRequestJSON(json, r); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(r.query, "foo"); + EXPECT_EQ(r.id, "bar"); +} + +TEST_F(ResultsTests, test_serialize_distributed_query_result) { + DistributedQueryResult r; + r.request.query = "foo"; + r.request.id = "bar"; + + Row r1; + r1["foo"] = "bar"; + r.results = {r1}; + + pt::ptree tree; + auto s = serializeDistributedQueryResult(r, tree); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(tree.get("request.query"), "foo"); + EXPECT_EQ(tree.get("request.id"), "bar"); + auto& results = tree.get_child("results"); + for (const auto& q : results) { + for (const auto& row : q.second) { + EXPECT_EQ(row.first, "foo"); + EXPECT_EQ(q.second.get(row.first), "bar"); + } + } +} + +TEST_F(ResultsTests, test_deserialize_distributed_query_result) { + pt::ptree request; + request.put("id", "foo"); + request.put("query", "bar"); + + pt::ptree row; + row.put("foo", "bar"); + pt::ptree results; + results.push_back(std::make_pair("", row)); + + pt::ptree query_result; + query_result.put_child("request", request); + query_result.put_child("results", results); + + DistributedQueryResult r; + auto s = deserializeDistributedQueryResult(query_result, r); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(r.request.id, "foo"); + EXPECT_EQ(r.request.query, "bar"); + EXPECT_EQ(r.results[0]["foo"], "bar"); +} + +TEST_F(ResultsTests, test_deserialize_distributed_query_result_json) { + auto json = + "{" + " \"request\": {" + " \"id\": \"foo\"," + " \"query\": \"bar\"" + " }," + " \"results\": [" + " {" + " \"foo\": \"bar\"" + " }" + " ]" + "}"; + + DistributedQueryResult r; + auto s = deserializeDistributedQueryResultJSON(json, r); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(r.request.id, "foo"); + EXPECT_EQ(r.request.query, "bar"); + EXPECT_EQ(r.results[0]["foo"], "bar"); +} + TEST_F(ResultsTests, test_unicode_to_ascii_conversion) { EXPECT_EQ(escapeNonPrintableBytes("しかたがない"), "\\xE3\\x81\\x97\\xE3\\x81\\x8B\\xE3\\x81\\x9F\\xE3\\x81\\x8C\\xE3" diff --git a/osquery/dispatcher/CMakeLists.txt b/osquery/dispatcher/CMakeLists.txt index 66770d67..8b4c10b8 100644 --- a/osquery/dispatcher/CMakeLists.txt +++ b/osquery/dispatcher/CMakeLists.txt @@ -2,8 +2,9 @@ ADD_OSQUERY_LIBRARY(TRUE osquery_dispatcher dispatcher.cpp ) -ADD_OSQUERY_LIBRARY(FALSE osquery_dispatch_scheduler +ADD_OSQUERY_LIBRARY(FALSE osquery_dispatcher_runners scheduler.cpp + distributed.cpp ) # Note: if there are ever scheduler tests this needs to set the diff --git a/osquery/dispatcher/dispatcher.h b/osquery/dispatcher/dispatcher.h index b5db333d..a5900172 100644 --- a/osquery/dispatcher/dispatcher.h +++ b/osquery/dispatcher/dispatcher.h @@ -24,6 +24,18 @@ // paths for their includes. Unfortunately, changing include paths is not // possible in every build system. // clang-format off +#ifndef OSQUERY_THRIFT_LIB +#define OSQUERY_THRIFT_LIB thrift +#endif + +#ifndef OSQUERY_THRIFT_SERVER_LIB +#define OSQUERY_THRIFT_SERVER_LIB thrift/server +#endif + +#ifndef OSQUERY_THRIFT_POINTER +#define OSQUERY_THRIFT_POINTER boost +#endif + #include CONCAT(OSQUERY_THRIFT_LIB,/concurrency/Thread.h) #include CONCAT(OSQUERY_THRIFT_LIB,/concurrency/ThreadManager.h) #include CONCAT(OSQUERY_THRIFT_LIB,/concurrency/PosixThreadFactory.h) diff --git a/osquery/dispatcher/distributed.cpp b/osquery/dispatcher/distributed.cpp new file mode 100644 index 00000000..391b7af7 --- /dev/null +++ b/osquery/dispatcher/distributed.cpp @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2014, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ + +#include "osquery/dispatcher/distributed.h" +#include "osquery/distributed.h" + +namespace osquery { + +FLAG(uint64, + distributed_poll_interval, + 60, + "Seconds in between polling the server for new queries (default 60)") + +DECLARE_bool(distributed_enabled); +DECLARE_string(distributed_plugin); + +void DistributedRunner::start() { + auto dist = Distributed(); + while (true) { + dist.pullUpdates(); + if (dist.getPendingQueryCount() > 0) { + dist.runQueries(); + } + ::sleep(FLAGS_distributed_poll_interval); + } +} + +Status startDistributed() { + if (FLAGS_distributed_enabled && !FLAGS_distributed_plugin.empty() && + Registry::getActive("distributed") == FLAGS_distributed_plugin) { + Dispatcher::addService(std::make_shared()); + return Status(0, "OK"); + } else { + return Status(1, "Distributed query service not enabled."); + } +} +} diff --git a/osquery/dispatcher/distributed.h b/osquery/dispatcher/distributed.h new file mode 100644 index 00000000..ba715a4a --- /dev/null +++ b/osquery/dispatcher/distributed.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2014, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ + +#pragma once + +#include "osquery/dispatcher/dispatcher.h" + +namespace osquery { + +/// A Dispatcher service thread that implements the distributed query service +class DistributedRunner : public InternalRunnable { + public: + virtual ~DistributedRunner() {} + DistributedRunner() {} + + public: + /// The Dispatcher thread entry point. + void start(); +}; + +Status startDistributed(); +} diff --git a/osquery/distributed/CMakeLists.txt b/osquery/distributed/CMakeLists.txt index 083c6154..b8083207 100644 --- a/osquery/distributed/CMakeLists.txt +++ b/osquery/distributed/CMakeLists.txt @@ -1,4 +1,10 @@ -ADD_OSQUERY_LIBRARY(TRUE osquery_distributed distributed.cpp) +ADD_OSQUERY_LIBRARY(TRUE osquery_distributed + distributed.cpp +) + +ADD_OSQUERY_LIBRARY(FALSE osquery_distributed_plugins + plugins/tls.cpp +) file(GLOB OSQUERY_DISTRIBUTED_TESTS "tests/*.cpp") ADD_OSQUERY_TEST(FALSE ${OSQUERY_DISTRIBUTED_TESTS}) diff --git a/osquery/distributed/distributed.cpp b/osquery/distributed/distributed.cpp index 5647b1f8..d6735abb 100644 --- a/osquery/distributed/distributed.cpp +++ b/osquery/distributed/distributed.cpp @@ -9,154 +9,181 @@ */ #include +#include #include #include +#include #include +#include -#include "osquery/distributed/distributed.h" namespace pt = boost::property_tree; namespace osquery { -FLAG(int32, - distributed_retries, - 3, - "Times to retry reading/writing distributed queries"); +CLI_FLAG(string, distributed_plugin, "", "Distributed plugin name") -Status MockDistributedProvider::getQueriesJSON(std::string& query_json) { - query_json = queriesJSON_; - return Status(); +FLAG(bool, + distributed_enabled, + false, + "Main killswitch for distributed queries. (default false"); + +boost::shared_mutex distributed_queries_mutex_; +boost::shared_mutex distributed_results_mutex_; + +Status DistributedPlugin::call(const PluginRequest& request, + PluginResponse& response) { + if (request.count("action") == 0) { + return Status(1, "Distributed plugins require an action in PluginRequest"); + } + + if (request.at("action") == "getQueries") { + std::string queries; + getQueries(queries); + response.push_back({{"results", queries}}); + return Status(0, "OK"); + } else if (request.at("action") == "writeResults") { + if (request.count("results") == 0) { + return Status(1, "Missing results field"); + } + return writeResults(request.at("results")); + } + + return Status(1, + "Distributed plugin action unknown: " + request.at("action")); } -Status MockDistributedProvider::writeResultsJSON(const std::string& results) { - resultsJSON_ = results; - return Status(); +Status Distributed::pullUpdates() { + auto& distributed_plugin = Registry::getActive("distributed"); + if (!Registry::exists("distributed", distributed_plugin)) { + return Status(1, "Missing distributed plugin " + distributed_plugin); + } + + PluginResponse response; + auto status = + Registry::call("distributed", {{"action", "getQueries"}}, response); + if (!status.ok()) { + return status; + } + + if (response.size() > 0 && response[0].count("results") > 0) { + return acceptWork(response[0]["results"]); + } + + return Status(0, "OK"); } -Status DistributedQueryHandler::parseQueriesJSON( - const std::string& query_json, - std::vector& requests) { - // Parse the JSON into a ptree +size_t Distributed::getPendingQueryCount() { + ReadLock rlock(distributed_queries_mutex_); + return queries_.size(); +} + +size_t Distributed::getCompletedCount() { + ReadLock rlock(distributed_results_mutex_); + return results_.size(); +} + +Status Distributed::serializeResults(std::string& json) { + WriteLock wlock(distributed_results_mutex_); + pt::ptree tree; - try { - std::stringstream query_stream(query_json); - pt::read_json(query_stream, tree); - } catch (const pt::json_parser::json_parser_error& e) { - return Status(1, std::string("Error loading query JSON: ") + e.what()); - } - - // Parse the ptree into DistributedQueryRequests - std::vector results; - for (const auto& node : tree) { - const auto& request_tree = node.second; - DistributedQueryRequest request; - try { - request.query = request_tree.get_child("query").get_value(); - request.id = request_tree.get_child("id").get_value(); - } catch (const std::exception& e) { - return Status(1, std::string("Error parsing queries: ") + e.what()); + for (const auto& result : results_) { + pt::ptree qd; + auto s = serializeQueryData(result.results, qd); + if (!s.ok()) { + return s; } - results.push_back(request); + tree.add_child(result.request.id, qd); } - requests = std::move(results); + pt::ptree results; + results.add_child("queries", tree); - return Status(); -} - -SQL DistributedQueryHandler::handleQuery(const std::string& query_string) { - SQL query = SQL(query_string); - query.annotateHostInfo(); - return query; -} - -Status DistributedQueryHandler::serializeResults( - const std::vector >& results, - pt::ptree& tree) { + std::stringstream ss; try { - pt::ptree& res_tree = tree.put_child("results", pt::ptree()); - for (const auto& result : results) { - DistributedQueryRequest request = result.first; - SQL sql = result.second; - pt::ptree& child = res_tree.put_child(request.id, pt::ptree()); - child.put("status", sql.getStatus().getCode()); - pt::ptree& rows_child = child.put_child("rows", pt::ptree()); - Status s = serializeQueryData(sql.rows(), rows_child); - if (!s.ok()) { - return s; - } - } - } catch (const std::exception& e) { - return Status(1, std::string("Error serializing results: ") + e.what()); + pt::write_json(ss, results, false); + } catch (const pt::ptree_error& e) { + return Status(1, "Error writing JSON: " + std::string(e.what())); } - return Status(); + json = ss.str(); + + return Status(0, "OK"); } -Status DistributedQueryHandler::doQueries() { - // Get and parse the queries - Status status; - std::string query_json; - int retries = 0; - do { - status = provider_->getQueriesJSON(query_json); - ++retries; - } while (!status.ok() && retries <= FLAGS_distributed_retries); - if (!status.ok()) { - return status; - } +void Distributed::addResult(const DistributedQueryResult& result) { + WriteLock wlock_results(distributed_results_mutex_); + results_.push_back(result); +} - std::vector requests; - status = parseQueriesJSON(query_json, requests); - if (!status.ok()) { - return status; - } +Status Distributed::runQueries() { + while (getPendingQueryCount() > 0) { + auto query = popRequest(); - // Run the queries - std::vector > query_results; - std::set successful_query_ids; - for (const auto& request : requests) { - if (executedRequestIds_.find(request.id) != executedRequestIds_.end()) { - // We've already successfully returned results for this request, don't - // process it again. + auto sql = SQL(query.query); + if (!sql.getStatus().ok()) { + LOG(ERROR) << "Error running distributed query[" << query.id + << "]: " << query.query; continue; } - SQL query_result = handleQuery(request.query); - if (query_result.ok()) { - successful_query_ids.insert(request.id); - } - query_results.push_back({request, query_result}); + + DistributedQueryResult result(std::move(query), std::move(sql.rows())); + addResult(result); + } + return flushCompleted(); +} + +Status Distributed::flushCompleted() { + if (getCompletedCount() == 0) { + return Status(0, "OK"); } - // Serialize the results - pt::ptree serialized_results; - serializeResults(query_results, serialized_results); - std::string json; + auto& distributed_plugin = Registry::getActive("distributed"); + if (!Registry::exists("distributed", distributed_plugin)) { + return Status(1, "Missing distributed plugin " + distributed_plugin); + } + + std::string results; + auto s = serializeResults(results); + if (!s.ok()) { + return s; + } + + PluginResponse response; + return Registry::call("distributed", + {{"action", "writeResults"}, {"results", results}}, + response); +} + +Status Distributed::acceptWork(const std::string& work) { + pt::ptree tree; + std::stringstream ss(work); try { - std::ostringstream ss; - pt::write_json(ss, serialized_results, false); - json = ss.str(); - } catch (const pt::json_parser::json_parser_error& e) { - return Status(1, e.what()); + pt::read_json(ss, tree); + } catch (const pt::ptree_error& e) { + return Status(1, "Error parsing JSON: " + std::string(e.what())); } - // Write the results - retries = 0; - do { - status = provider_->writeResultsJSON(json); - ++retries; - } while (!status.ok() && retries <= FLAGS_distributed_retries); - if (!status.ok()) { - return status; + auto& queries = tree.get_child("queries"); + for (const auto& node : queries) { + DistributedQueryRequest request; + request.id = node.first; + request.query = queries.get(node.first, ""); + if (request.query.empty() || request.id.empty()) { + return Status(1, "Distributed query does not have complete attributes."); + } + WriteLock wlock(distributed_queries_mutex_); + queries_.push_back(request); } - // Only note that the queries were successfully completed if we were actually - // able to write the results. - executedRequestIds_.insert(successful_query_ids.begin(), - successful_query_ids.end()); + return Status(0, "OK"); +} - return status; +DistributedQueryRequest Distributed::popRequest() { + WriteLock wlock_queries(distributed_queries_mutex_); + auto q = queries_[0]; + queries_.erase(queries_.begin()); + return std::move(q); } } diff --git a/osquery/distributed/distributed.h b/osquery/distributed/distributed.h deleted file mode 100644 index fc53f012..00000000 --- a/osquery/distributed/distributed.h +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2014, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - * - */ - -#pragma once - -#include -#include - -#include - -#include - -namespace osquery { - -/** - * @brief This is an interface for distributed query "providers" - * - * Providers implement the communication between the distributed query master - * and the individual host. A provider may utilize any communications strategy - * that supports reading and writing JSON (i.e. HTTPS requests, reading from a - * file, querying a message queue, etc.) - */ -class IDistributedProvider { -public: - virtual ~IDistributedProvider() {} - - /* - * @brief Get the JSON string containing the queries to be executed - * - * @param query_json A string to fill with the retrieved JSON - * - * @return osquery::Status indicating success or failure of the operation - */ - virtual Status getQueriesJSON(std::string& query_json) = 0; - - /* - * @brief Write the results JSON back to the master - * - * @param results A string containing the results JSON - * - * @return osquery::Status indicating success or failure of the operation - */ - virtual Status writeResultsJSON(const std::string& results) = 0; -}; - -/** - * @brief A mocked implementation of IDistributedProvider - * - * This implementation is useful for writing unit tests of the - * DistributedQueryHandler functionality. - */ -class MockDistributedProvider : public IDistributedProvider { -public: - // These methods just read/write the corresponding public members - Status getQueriesJSON(std::string& query_json) override; - Status writeResultsJSON(const std::string& results) override; - - std::string queriesJSON_; - std::string resultsJSON_; -}; - -/** - * @brief Small struct containing the query and ID information for a - * distributed query - */ -struct DistributedQueryRequest { -public: - explicit DistributedQueryRequest() {} - explicit DistributedQueryRequest(const std::string& q, const std::string& i) - : query(q), id(i) {} - std::string query; - std::string id; -}; - -/** - * @brief The main handler class for distributed queries - * - * This class is responsible for implementing the core functionality of - * distributed queries. It manages state, uses the provider to read/write from - * the master, and executes queries. - */ -class DistributedQueryHandler { -public: - /** - * @brief Construct a new handler with the given provider - * - * @param provider The provider used retrieving queries and writing results - */ - explicit DistributedQueryHandler( - std::unique_ptr provider) - : provider_(std::move(provider)) {} - - /** - * @brief Retrieve queries, run them, and write results - * - * This is the core method of DistributedQueryHandler, tying together all the - * other components to read the requests from the provider, execute the - * queries, and write the results back to the provider. - * - * @return osquery::Status indicating success or failure of the operation - */ - Status doQueries(); - - /** - * @brief Run and annotate an individual query - * - * @param query_string A string containing the query to be executed - * - * @return A SQL object containing the (annotated) query results - */ - static SQL handleQuery(const std::string& query_string); - - /** - * @brief Serialize the results of all requests into a ptree - * - * @param results The vector of requests and results - * @param tree The tree to serialize results into - * - * @return osquery::Status indicating success or failure of the operation - */ - static Status serializeResults( - const std::vector >& results, - boost::property_tree::ptree& tree); - - /** - * @brief Parse the query JSON into the individual query objects - * - * @param query_json The JSON string containing the queries - * @param requests A vector to fill with the query objects - * - * @return osquery::Status indicating success or failure of the parsing - */ - static Status parseQueriesJSON(const std::string& query_json, - std::vector& requests); - -private: - // The provider used to read and write queries and results - std::unique_ptr provider_; - - // Used to store already executed queries to avoid duplication. (Some master - // configurations may asynchronously process the results of requests, so a - // request might be seen by the host after it has already been executed.) - std::set executedRequestIds_; -}; - -} // namespace osquery diff --git a/osquery/distributed/plugins/tls.cpp b/osquery/distributed/plugins/tls.cpp new file mode 100644 index 00000000..47376c09 --- /dev/null +++ b/osquery/distributed/plugins/tls.cpp @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2014, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ + +#include +#include + +#include +#include + +#include +#include +#include +#include + +#include "osquery/remote/requests.h" +#include "osquery/remote/transports/tls.h" +#include "osquery/remote/serializers/json.h" +#include "osquery/remote/utility.h" + +namespace pt = boost::property_tree; + +namespace osquery { + +FLAG(string, + distributed_tls_read_endpoint, + "", + "TLS/HTTPS endpoint for distributed query retrieval"); + +FLAG(string, + distributed_tls_write_endpoint, + "", + "TLS/HTTPS endpoint for distributed query results"); + +FLAG(uint64, + distributed_tls_max_attempts, + 3, + "Number of times to attempt a request") + +class TLSDistributedPlugin : public DistributedPlugin { + public: + Status setUp(); + Status getQueries(std::string& json); + Status writeResults(const std::string& json); + + protected: + std::string read_uri_; + std::string write_uri_; +}; + +REGISTER(TLSDistributedPlugin, "distributed", "tls"); + +Status TLSDistributedPlugin::setUp() { + read_uri_ = TLSRequestHelper::makeURI(FLAGS_distributed_tls_read_endpoint); + write_uri_ = TLSRequestHelper::makeURI(FLAGS_distributed_tls_write_endpoint); + return Status(0, "OK"); +} + +Status TLSDistributedPlugin::getQueries(std::string& json) { + return TLSRequestHelper::go( + read_uri_, json, FLAGS_distributed_tls_max_attempts); +} + +Status TLSDistributedPlugin::writeResults(const std::string& json) { + pt::ptree params; + std::stringstream ss(json); + std::string response; + try { + pt::read_json(ss, params); + } catch (const pt::ptree_error& e) { + return Status(1, "Error parsing JSON: " + std::string(e.what())); + } + + return TLSRequestHelper::go( + write_uri_, params, response, FLAGS_distributed_tls_max_attempts); +} +} diff --git a/osquery/distributed/tests/distributed_tests.cpp b/osquery/distributed/tests/distributed_tests.cpp index 09c74fc8..925d5ac0 100644 --- a/osquery/distributed/tests/distributed_tests.cpp +++ b/osquery/distributed/tests/distributed_tests.cpp @@ -15,209 +15,117 @@ #include #include +#include #include -#include "osquery/distributed/distributed.h" +#include "osquery/core/test_util.h" +#include "osquery/remote/requests.h" +#include "osquery/remote/serializers/json.h" +#include "osquery/remote/transports/tls.h" + #include "osquery/sql/sqlite_util.h" namespace pt = boost::property_tree; namespace osquery { -// Distributed tests expect an SQL implementation for queries. -REGISTER_INTERNAL(SQLiteSQLPlugin, "sql", "sql"); +class TestDistributedPlugin : public DistributedPlugin { + public: + Status setUp() { + TLSServerRunner::start(); + host = "https://localhost:" + TLSServerRunner::port(); + return Status(0, "OK"); + } + + void tearDown() { TLSServerRunner::stop(); } + + Status getQueries(std::string& json) { + auto t = std::make_shared(); + t->disableVerifyPeer(); + auto url = host + "/distributed_read"; + auto r = Request(url, t); + + pt::ptree params; + params.put("node_key", "this_is_a_node_secret"); + auto s = r.call(params); + if (!s.ok()) { + throw std::runtime_error(s.toString()); + } + + pt::ptree recv; + s = r.getResponse(recv); + if (!s.ok()) { + throw std::runtime_error(s.toString()); + } + + auto serial = JSONSerializer(); + return serial.serialize(recv, json); + } + + Status writeResults(const std::string& json) { + pt::ptree tree; + std::stringstream ss(json); + pt::read_json(ss, tree); + + auto& queries = tree.get_child("queries"); + for (const auto& result : queries) { + if (result.first.empty()) { + throw std::runtime_error("result ID is empty"); + } + + QueryData qd; + auto s = deserializeQueryData(result.second, qd); + if (!s.ok()) { + throw std::runtime_error(s.toString()); + } + writeCount++; + } + + auto t = std::make_shared(); + t->disableVerifyPeer(); + auto url = host + "/distributed_write"; + auto r = Request(url, t); + + tree.put("node_key", "this_is_a_node_secret"); + auto s = r.call(tree); + if (!s.ok()) { + throw std::runtime_error(s.toString()); + } + + pt::ptree recv; + s = r.getResponse(recv); + if (!s.ok()) { + throw std::runtime_error(s.toString()); + } + + return Status(0, "OK"); + } + + int writeCount; + std::string host; +}; class DistributedTests : public testing::Test {}; -TEST_F(DistributedTests, test_test_distributed_provider) { - MockDistributedProvider p; - std::string query_string = "['foo']"; - std::string result_string = "['bar']"; +TEST_F(DistributedTests, test_workflow) { + Registry::add("distributed", "test"); + Registry::setActive("distributed", "test"); - p.queriesJSON_ = query_string; - std::string query_json; - Status s = p.getQueriesJSON(query_json); - ASSERT_EQ(Status(), s); - EXPECT_EQ(query_string, query_json); - - s = p.writeResultsJSON(result_string); + auto dist = Distributed(); + auto s = dist.pullUpdates(); EXPECT_TRUE(s.ok()); - EXPECT_EQ(result_string, p.resultsJSON_); -} + EXPECT_EQ(s.toString(), "OK"); -TEST_F(DistributedTests, test_parse_query_json) { - std::string request_json = "[{\"query\": \"foo\", \"id\": \"bar\"}]"; - std::vector requests; - Status s = DistributedQueryHandler::parseQueriesJSON(request_json, requests); - ASSERT_EQ(Status(), s); - EXPECT_EQ(1, requests.size()); - EXPECT_EQ("foo", requests[0].query); - EXPECT_EQ("bar", requests[0].id); + EXPECT_EQ(dist.getPendingQueryCount(), 2); + EXPECT_EQ(dist.results_.size(), 0); + s = dist.runQueries(); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(s.toString(), "OK"); + EXPECT_EQ(dist.getPendingQueryCount(), 0); + EXPECT_EQ(dist.results_.size(), 2); - std::string bad_json = - "[{\"query\": \"foo\", \"id\": \"bar\"}, {\"query\": \"b\"}]"; - requests.clear(); - s = DistributedQueryHandler::parseQueriesJSON(bad_json, requests); - ASSERT_FALSE(s.ok()); - EXPECT_EQ(0, requests.size()); -} - -TEST_F(DistributedTests, test_handle_query) { - // Access to the internal SQL implementation is only available in core. - SQL query = DistributedQueryHandler::handleQuery("SELECT hour from time"); - ASSERT_TRUE(query.ok()); - QueryData rows = query.rows(); - ASSERT_EQ(1, rows.size()); - EXPECT_EQ(rows[0]["_source_host"], getHostname()); - - query = DistributedQueryHandler::handleQuery("bad query"); - ASSERT_FALSE(query.ok()); - rows = query.rows(); - ASSERT_EQ(0, rows.size()); -} - -TEST_F(DistributedTests, test_serialize_results_empty) { - DistributedQueryRequest r0("foo", "foo_id"); - MockSQL q0 = MockSQL(); - pt::ptree tree; - - DistributedQueryHandler::serializeResults({{r0, q0}}, tree); - - EXPECT_EQ(0, tree.get("results.foo_id.status")); - EXPECT_TRUE(tree.get_child("results.foo_id.rows").empty()); -} - -TEST_F(DistributedTests, test_serialize_results_basic) { - DistributedQueryRequest r0("foo", "foo_id"); - QueryData rows0 = { - {{"foo0", "foo0_val"}, {"bar0", "bar0_val"}}, - {{"foo1", "foo1_val"}, {"bar1", "bar1_val"}}, - }; - MockSQL q0 = MockSQL(rows0); - pt::ptree tree; - - DistributedQueryHandler::serializeResults({{r0, q0}}, tree); - - EXPECT_EQ(0, tree.get("results.foo_id.status")); - - const pt::ptree& tree_rows = tree.get_child("results.foo_id.rows"); - EXPECT_EQ(2, tree_rows.size()); - - auto row = tree_rows.begin(); - EXPECT_EQ("foo0_val", row->second.get("foo0")); - EXPECT_EQ("bar0_val", row->second.get("bar0")); - ++row; - EXPECT_EQ("foo1_val", row->second.get("foo1")); - EXPECT_EQ("bar1_val", row->second.get("bar1")); -} - -TEST_F(DistributedTests, test_serialize_results_multiple) { - DistributedQueryRequest r0("foo", "foo_id"); - QueryData rows0 = { - {{"foo0", "foo0_val"}, {"bar0", "bar0_val"}}, - {{"foo1", "foo1_val"}, {"bar1", "bar1_val"}}, - }; - MockSQL q0 = MockSQL(rows0); - - DistributedQueryRequest r1("bar", "bar_id"); - MockSQL q1 = MockSQL({}, Status(1, "Fail")); - - pt::ptree tree; - - DistributedQueryHandler::serializeResults({{r0, q0}, {r1, q1}}, tree); - - EXPECT_EQ(0, tree.get("results.foo_id.status")); - const pt::ptree& tree_rows = tree.get_child("results.foo_id.rows"); - EXPECT_EQ(2, tree_rows.size()); - auto row = tree_rows.begin(); - EXPECT_EQ("foo0_val", row->second.get("foo0")); - EXPECT_EQ("bar0_val", row->second.get("bar0")); - ++row; - EXPECT_EQ("foo1_val", row->second.get("foo1")); - EXPECT_EQ("bar1_val", row->second.get("bar1")); - - EXPECT_EQ(1, tree.get("results.bar_id.status")); - const pt::ptree& fail_rows = tree.get_child("results.bar_id.rows"); - EXPECT_EQ(0, fail_rows.size()); -} - -TEST_F(DistributedTests, test_do_queries) { - // Access to the internal SQL implementation is only available in core. - auto provider_raw = new MockDistributedProvider(); - provider_raw->queriesJSON_ = - "[ \ - {\"query\": \"SELECT hour FROM time\", \"id\": \"hour\"},\ - {\"query\": \"bad\", \"id\": \"bad\"},\ - {\"query\": \"SELECT minutes FROM time\", \"id\": \"minutes\"}\ - ]"; - std::unique_ptr - provider(provider_raw); - DistributedQueryHandler handler(std::move(provider)); - - Status s = handler.doQueries(); - ASSERT_EQ(Status(), s); - - pt::ptree tree; - std::istringstream json_stream(provider_raw->resultsJSON_); - ASSERT_NO_THROW(pt::read_json(json_stream, tree)); - - { - EXPECT_EQ(0, tree.get("results.hour.status")); - const pt::ptree& tree_rows = tree.get_child("results.hour.rows"); - EXPECT_EQ(1, tree_rows.size()); - auto row = tree_rows.begin(); - EXPECT_GE(row->second.get("hour"), 0); - EXPECT_LE(row->second.get("hour"), 24); - EXPECT_EQ(getHostname(), row->second.get("_source_host")); - } - - { - // this query should have failed - EXPECT_EQ(1, tree.get("results.bad.status")); - const pt::ptree& tree_rows = tree.get_child("results.bad.rows"); - EXPECT_EQ(0, tree_rows.size()); - } - - { - EXPECT_EQ(0, tree.get("results.minutes.status")); - const pt::ptree& tree_rows = tree.get_child("results.minutes.rows"); - EXPECT_EQ(1, tree_rows.size()); - auto row = tree_rows.begin(); - EXPECT_GE(row->second.get("minutes"), 0); - EXPECT_LE(row->second.get("minutes"), 60); - EXPECT_EQ(getHostname(), row->second.get("_source_host")); - } -} - -TEST_F(DistributedTests, test_duplicate_request) { - // Access to the internal SQL implementation is only available in core. - auto provider_raw = new MockDistributedProvider(); - provider_raw->queriesJSON_ = - "[{\"query\": \"SELECT hour FROM time\", \"id\": \"hour\"}]"; - std::unique_ptr - provider(provider_raw); - DistributedQueryHandler handler(std::move(provider)); - - Status s = handler.doQueries(); - ASSERT_EQ(Status(), s); - - pt::ptree tree; - std::istringstream json_stream(provider_raw->resultsJSON_); - ASSERT_NO_THROW(pt::read_json(json_stream, tree)); - - EXPECT_EQ(0, tree.get("results.hour.status")); - const pt::ptree& tree_rows = tree.get_child("results.hour.rows"); - EXPECT_EQ(1, tree_rows.size()); - auto row = tree_rows.begin(); - EXPECT_GE(row->second.get("hour"), 0); - EXPECT_LE(row->second.get("hour"), 24); - EXPECT_EQ(getHostname(), row->second.get("_source_host")); - - // The second time, 'hour' should not be executed again - s = handler.doQueries(); - ASSERT_EQ(Status(), s); - json_stream.str(provider_raw->resultsJSON_); - ASSERT_NO_THROW(pt::read_json(json_stream, tree)); - EXPECT_EQ(0, tree.get_child("results").size()); + const auto& plugin = std::dynamic_pointer_cast( + Registry::get("distributed", "test")); + EXPECT_EQ(plugin->writeCount, 2); } } diff --git a/osquery/logger/plugins/tls.cpp b/osquery/logger/plugins/tls.cpp index cea2715d..cbf333f4 100644 --- a/osquery/logger/plugins/tls.cpp +++ b/osquery/logger/plugins/tls.cpp @@ -24,6 +24,7 @@ #include "osquery/remote/requests.h" #include "osquery/remote/transports/tls.h" #include "osquery/remote/serializers/json.h" +#include "osquery/remote/utility.h" namespace pt = boost::property_tree; @@ -112,19 +113,22 @@ bool TLSLoggerPlugin::stop_buffering = false; class TLSLogForwarderRunner : public InternalRunnable { public: explicit TLSLogForwarderRunner(const std::string& node_key) - : node_key_(node_key) {} + : node_key_(node_key) { + uri_ = TLSRequestHelper::makeURI(FLAGS_logger_tls_endpoint); + } /// A simple wait lock, and flush based on settings. void start(); - private: + protected: /// Send labeled result logs. - Status send(const std::string& uri, - const std::string& log_data, - const std::string& log_type); + Status send(const std::string& log_data, const std::string& log_type); /// Receive an enrollment/node key from the backing store cache. std::string node_key_; + + /// Endpoint URI + std::string uri_; }; REGISTER(TLSLoggerPlugin, "logger", "tls"); @@ -194,8 +198,7 @@ Status TLSLoggerPlugin::init(const std::string& name, return logStatus(log); } -Status TLSLogForwarderRunner::send(const std::string& uri, - const std::string& log_data, +Status TLSLogForwarderRunner::send(const std::string& log_data, const std::string& log_type) { pt::ptree params; params.put("node_key", node_key_); @@ -217,7 +220,7 @@ Status TLSLogForwarderRunner::send(const std::string& uri, params.put("data", log_data); } - auto request = Request(uri); + auto request = Request(uri_); return request.call(params); } @@ -232,21 +235,6 @@ inline void clearLogs(bool results, const std::vector& indexes) { } void TLSLogForwarderRunner::start() { - auto uri = "https://" + FLAGS_tls_hostname; - if (FLAGS_tls_node_api) { - // The TLS API should treat clients as nodes. - // In this case the node_key acts as an identifier (node) and the endpoints - // (if provided) are treated as edges from the nodes. - uri += "/" + node_key_; - } - uri += FLAGS_logger_tls_endpoint; - - // Some APIs may require persistent identification. - if (FLAGS_tls_secret_always) { - uri += ((uri.find("?") != std::string::npos) ? "&" : "?") + - FLAGS_tls_enroll_override + "=" + getEnrollSecret(); - } - while (true) { // Get a list of all the buffered log items. std::vector indexes; @@ -271,16 +259,16 @@ void TLSLogForwarderRunner::start() { // If any results/statuses were found in the flushed buffer, send. if (results.size() > 0) { - if (!send(uri, results, "result")) { - VLOG(1) << "Could not send results to logger URI: " << uri; + if (!send(results, "result")) { + VLOG(1) << "Could not send results to logger URI: " << uri_; } else { // Clear the results logs once they were sent. clearLogs(true, indexes); } } if (statuses.size() > 0) { - if (!send(uri, statuses, "status")) { - VLOG(1) << "Could not send status logs to logger URI: " << uri; + if (!send(statuses, "status")) { + VLOG(1) << "Could not send status logs to logger URI: " << uri_; } else { // Clear the status logs once they were sent. clearLogs(false, indexes); diff --git a/osquery/main/daemon.cpp b/osquery/main/daemon.cpp index e87439d8..52b80b2a 100644 --- a/osquery/main/daemon.cpp +++ b/osquery/main/daemon.cpp @@ -10,9 +10,11 @@ #include +#include #include #include +#include "osquery/dispatcher/distributed.h" #include "osquery/dispatcher/scheduler.h" const std::string kWatcherWorkerName = "osqueryd: worker"; @@ -31,8 +33,14 @@ int main(int argc, char* argv[]) { // Start osquery work. runner.start(); + // Conditionally begin the distributed query service + auto s = osquery::startDistributed(); + if (!s.ok()) { + VLOG(1) << "Not starting the distributed query service: " << s.toString(); + } + // Begin the schedule runloop. - auto s = osquery::startScheduler(); + s = osquery::startScheduler(); if (!s.ok()) { LOG(ERROR) << "Error starting scheduler: " << s.toString(); } diff --git a/osquery/remote/requests.h b/osquery/remote/requests.h index 62af9160..49dffed5 100644 --- a/osquery/remote/requests.h +++ b/osquery/remote/requests.h @@ -281,5 +281,7 @@ class Request { FRIEND_TEST(TLSTransportsTests, test_call_verify_peer); FRIEND_TEST(TLSTransportsTests, test_call_server_cert_pinning); FRIEND_TEST(TLSTransportsTests, test_call_client_auth); + + friend class TestDistributedPlugin; }; } diff --git a/osquery/remote/transports/tests/tls_transports_tests.cpp b/osquery/remote/transports/tests/tls_transports_tests.cpp index 6bc72bc2..05e04361 100644 --- a/osquery/remote/transports/tests/tls_transports_tests.cpp +++ b/osquery/remote/transports/tests/tls_transports_tests.cpp @@ -8,15 +8,10 @@ * */ -#include #include -#include - #include -#include - #include #include "osquery/remote/requests.h" @@ -29,55 +24,6 @@ namespace pt = boost::property_tree; namespace osquery { -class TLSServerRunner : private boost::noncopyable { - public: - /// Create a singleton TLS server runner. - static TLSServerRunner& instance() { - static TLSServerRunner instance; - return instance; - } - - /// TCP port accessor. - static const std::string& port() { return instance().port_; } - /// Start the server if it hasn't started already. - static void start(); - /// Stop the service when the process exits. - static void stop(); - - private: - TLSServerRunner() - : server_(0), port_(std::to_string(rand() % 10000 + 20000)){}; - TLSServerRunner(TLSServerRunner const&); - void operator=(TLSServerRunner const&); - virtual ~TLSServerRunner() { stop(); } - - private: - pid_t server_; - std::string port_; -}; - -void TLSServerRunner::start() { - auto& self = instance(); - if (self.server_ != 0) { - return; - } - - // Fork then exec a shell. - self.server_ = fork(); - if (self.server_ == 0) { - // Start a python TLS/HTTPS or HTTP server. - auto script = kTestDataPath + "/test_http_server.py --tls " + self.port_; - execlp("sh", "sh", "-c", script.c_str(), nullptr); - ::exit(0); - } - ::sleep(1); -} - -void TLSServerRunner::stop() { - auto& self = instance(); - kill(self.server_, SIGTERM); -} - class TLSTransportsTests : public testing::Test { public: bool verify(const Status &status) { diff --git a/osquery/remote/transports/tls.h b/osquery/remote/transports/tls.h index 1e6c6b01..441546c5 100644 --- a/osquery/remote/transports/tls.h +++ b/osquery/remote/transports/tls.h @@ -129,5 +129,7 @@ class TLSTransport : public Transport { FRIEND_TEST(TLSTransportsTests, test_call_server_cert_pinning); FRIEND_TEST(TLSTransportsTests, test_call_client_auth); FRIEND_TEST(TLSTransportsTests, test_call_http); + + friend class TestDistributedPlugin; }; } diff --git a/osquery/remote/utility.h b/osquery/remote/utility.h new file mode 100644 index 00000000..875001e1 --- /dev/null +++ b/osquery/remote/utility.h @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2014, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ + +#pragma once + +#include +#include + +#include "osquery/remote/requests.h" +#include "osquery/remote/transports/tls.h" + +namespace osquery { + +DECLARE_string(tls_enroll_override); +DECLARE_string(tls_hostname); +DECLARE_bool(tls_node_api); +DECLARE_bool(tls_secret_always); + +/** + * @brief Helper class for allowing TLS plugins to easily kick off requests + * + * There are many static functions in this class that have very similar + * behaviour, which allow them to be used in many context. Some methods accept + * parameters, some don't require them. Some have built-in retry logic, some + * don't. Some return results in a ptree, some return results in JSON, etc. + */ +class TLSRequestHelper { + public: + /** + * @brief Using the `tls_hostname` flag and an endpoint, construct a URI + * + * @param endpoint is the URI endpoint to be combined with `tls_hostname` + * @return a string representing the uri + */ + static std::string makeURI(const std::string& endpoint) { + auto node_key = getNodeKey("tls"); + auto uri = "https://" + FLAGS_tls_hostname; + if (FLAGS_tls_node_api) { + // The TLS API should treat clients as nodes. + // In this case the node_key acts as an identifier (node) and the + // endpoints + // (if provided) are treated as edges from the nodes. + uri += "/" + node_key; + } + uri += endpoint; + + // Some APIs may require persistent identification. + if (FLAGS_tls_secret_always) { + uri += ((uri.find("?") != std::string::npos) ? "&" : "?") + + FLAGS_tls_enroll_override + "=" + getEnrollSecret(); + } + return std::move(uri); + } + + /** + * @brief Send a TLS request + * + * @param uri is the URI to send the request to + * @param params is a ptree of the params to send to the server. This isn't + * const because it will be modified to include node_key. + * @param output is the ptree which will be populated with the deserialized + * results + * + * @return a Status object indicating the success or failure of the operation + */ + template + static Status go(const std::string& uri, + boost::property_tree::ptree& params, + boost::property_tree::ptree& output) { + auto node_key = getNodeKey("tls"); + + // If using a GET request, append the node_key to the URI variables. + std::string uri_suffix; + if (FLAGS_tls_node_api) { + uri_suffix = "&node_key=" + node_key; + } else { + params.put("node_key", node_key); + } + + // Again check for GET to call with/without parameters. + auto request = Request(uri + uri_suffix); + auto status = (FLAGS_tls_node_api) ? request.call() : request.call(params); + + if (!status.ok()) { + return status; + } + + // The call succeeded, store the enrolled key. + status = request.getResponse(output); + if (!status.ok()) { + return status; + } + + // Receive config or key rejection + if (output.count("node_invalid") > 0 || output.count("error") > 0) { + return Status(1, "Request failed: Invalid node key"); + } + return Status(0, "OK"); + } + + /** + * @brief Send a TLS request + * + * @param uri is the URI to send the request to + * @param params is a ptree of the params to send to the server. This isn't + * const because it will be modified to include node_key. + * + * @return a Status object indicating the success or failure of the operation + */ + template + static Status go(const std::string& uri, + boost::property_tree::ptree& output) { + boost::property_tree::ptree params; + return TLSRequestHelper::go(uri, params, output); + } + + /** + * @brief Send a TLS request + * + * @param uri is the URI to send the request to + * @param params is a ptree of the params to send to the server. This isn't + * const because it will be modified to include node_key. + * @param output is the string which will be populated with the deserialized + * results + * + * @return a Status object indicating the success or failure of the operation + */ + template + static Status go(const std::string& uri, + boost::property_tree::ptree& params, + std::string& output) { + boost::property_tree::ptree recv; + auto s = TLSRequestHelper::go(uri, params, recv); + if (s.ok()) { + auto serializer = TSerializer(); + return serializer.serialize(recv, output); + } + return s; + } + + /** + * @brief Send a TLS request + * + * @param uri is the URI to send the request to + * @param output is the string which will be populated with the deserialized + * results + * + * @return a Status object indicating the success or failure of the operation + */ + template + static Status go(const std::string& uri, std::string& output) { + boost::property_tree::ptree params; + return TLSRequestHelper::go(uri, params, output); + } + + /** + * @brief Send a TLS request + * + * @param uri is the URI to send the request to + * @param params is a ptree of the params to send to the server. This isn't + * const because it will be modified to include node_key. + * @param output is the string which will be populated with the deserialized + * results + * @param attempts is the number of attempts to make if the request fails + * + * @return a Status object indicating the success or failure of the operation + */ + template + static Status go(const std::string& uri, + boost::property_tree::ptree& params, + std::string& output, + const size_t attempts) { + Status s; + for (size_t i = 1; i <= attempts; i++) { + s = TLSRequestHelper::go(uri, params, output); + if (s.ok()) { + return s; + } + if (i == attempts) { + break; + } + ::sleep(i * i); + } + return s; + } + + /** + * @brief Send a TLS request + * + * @param uri is the URI to send the request to + * @param output is the string which will be populated with the deserialized + * results + * @param attempts is the number of attempts to make if the request fails + * + * @return a Status object indicating the success or failure of the operation + */ + template + static Status go(const std::string& uri, + std::string& output, + const size_t attempts) { + boost::property_tree::ptree params; + return TLSRequestHelper::go(uri, params, output, attempts); + } +}; +} diff --git a/osquery/sql/sql.cpp b/osquery/sql/sql.cpp index f7b5f870..b9a13473 100644 --- a/osquery/sql/sql.cpp +++ b/osquery/sql/sql.cpp @@ -30,22 +30,14 @@ const std::map kSQLOperatorRepr = { SQL::SQL(const std::string& q) { status_ = query(q, results_); } -const QueryData& SQL::rows() { return results_; } +const QueryData& SQL::rows() const { return results_; } bool SQL::ok() { return status_.ok(); } -Status SQL::getStatus() { return status_; } +const Status& SQL::getStatus() const { return status_; } std::string SQL::getMessageString() { return status_.toString(); } -const std::string SQL::kHostColumnName = "_source_host"; -void SQL::annotateHostInfo() { - std::string hostname = getHostname(); - for (Row& row : results_) { - row[kHostColumnName] = hostname; - } -} - std::vector SQL::getTableNames() { std::vector results; for (const auto& name : Registry::names("table")) { diff --git a/tools/tests/test_http_server.py b/tools/tests/test_http_server.py index 9949a4dd..b643cf45 100755 --- a/tools/tests/test_http_server.py +++ b/tools/tests/test_http_server.py @@ -4,7 +4,7 @@ # All rights reserved. # # This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. An additional grant +# LICENSE file in the root directory of this source tree. An additional grant # of patent rights can be found in the PATENTS file in the same directory. from __future__ import absolute_import @@ -29,6 +29,13 @@ EXAMPLE_CONFIG = { } } +EXAMPLE_DISTRIBUTED = { + "queries": { + "info": "select * from osquery_info", + "flags": "select * from osquery_flags", + } +} + TEST_RESPONSE = { "foo": "bar", } @@ -54,16 +61,16 @@ class RealSimpleHandler(BaseHTTPRequestHandler): self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() - + def do_GET(self): debug("RealSimpleHandler::get %s" % self.path) self._set_headers() self._reply(TEST_RESPONSE) - + def do_HEAD(self): debug("RealSimpleHandler::head %s" % self.path) self._set_headers() - + def do_POST(self): debug("RealSimpleHandler::post %s" % self.path) self._set_headers() @@ -77,6 +84,10 @@ class RealSimpleHandler(BaseHTTPRequestHandler): self.config(request) elif self.path == '/log': self.log(request) + elif self.path == '/distributed_read': + self.distributed_read(request) + elif self.path == '/distributed_write': + self.distributed_write(request) else: self._reply(TEST_RESPONSE) @@ -114,6 +125,17 @@ class RealSimpleHandler(BaseHTTPRequestHandler): return self._reply(EXAMPLE_CONFIG) + def distributed_read(self, request): + '''A basic distributed read endpoint''' + if "node_key" not in request or request["node_key"] not in NODE_KEYS: + self._reply(FAILED_ENROLL_RESPONSE) + return + self._reply(EXAMPLE_DISTRIBUTED) + + def distributed_write(self, request): + '''A basic distributed write endpoint''' + self._reply({}) + def log(self, request): self._reply({}) @@ -141,7 +163,7 @@ if __name__ == '__main__': help="Wrap the HTTP server socket in TLS." ) parser.add_argument( - "--timeout", default=5, type=int, + "--timeout", default=10, type=int, help="If not persisting, exit after a number of seconds" )