Merge pull request #710 from zwass/distributed

POC for client side of distributed queries.
This commit is contained in:
Zachary Wasserman 2015-02-13 14:25:52 -08:00
commit 1f450fb1ef
8 changed files with 598 additions and 7 deletions

View File

@ -3,7 +3,7 @@
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
@ -50,7 +50,7 @@ class SQL {
*
* @return A QueryData object of the query results
*/
QueryData rows();
const QueryData& rows();
/**
* @brief Accessor to switch off of when checking the success of a query
@ -59,6 +59,13 @@ class SQL {
*/
bool ok();
/**
* @brief Get the status returned by the query
*
* @return The query status
*/
Status getStatus();
/**
* @brief Accessor for the message string indicating the status of the query
*
@ -66,6 +73,15 @@ class SQL {
*/
std::string getMessageString();
/**
* @brief Add host info columns onto existing QueryData
*
* Use this to add columns providing host info to the query results.
* Distributed queries use this to add host information before returning
* results to the aggregator.
*/
void annotateHostInfo();
/**
* @brief Accessor for the list of queryable tables
*
@ -96,7 +112,7 @@ class SQL {
tables::ConstraintOperator op,
const std::string& expr);
private:
protected:
/**
* @brief Private default constructor
*
@ -104,7 +120,9 @@ class SQL {
*/
SQL(){};
private:
// The key used to store hostname for annotateHostInfo
static const std::string kHostColumnName;
/// the internal member which holds the results of the query
QueryData results_;
@ -152,4 +170,18 @@ Status query(const std::string& query, QueryData& results);
* @return status indicating success or failure of the operation
*/
Status getQueryColumns(const std::string& q, tables::TableColumns& columns);
/*
* @brief A mocked subclass of SQL useful for testing
*/
class MockSQL : public SQL {
public:
explicit MockSQL() : MockSQL({}) {}
explicit MockSQL(const QueryData& results) : MockSQL(results, Status()) {}
explicit MockSQL(const QueryData& results, const Status& status) {
results_ = results;
status_ = status;
}
};
}

View File

@ -58,6 +58,7 @@ add_subdirectory(registry)
add_subdirectory(scheduler)
add_subdirectory(sql)
add_subdirectory(tables)
add_subdirectory(distributed)
# Utility tables are table specs that are ALWAYS built into osquery core.
GENERATE_UTILITY("osquery_info")

View File

@ -110,13 +110,14 @@ Status checkStalePid(const std::string& content) {
if (status != ESRCH) {
// The pid is running, check if it is an osqueryd process by name.
std::stringstream query_text;
query_text << "SELECT name FROM processes WHERE pid = " << pid << ";";
query_text << "SELECT name FROM processes WHERE pid = " << pid
<< " AND name = 'osqueryd';";
auto q = SQL(query_text.str());
if (!q.ok()) {
return Status(1, "Error querying processes: " + q.getMessageString());
}
if (q.rows().size() >= 1 && q.rows().front()["name"] == "osqueryd") {
if (q.rows().size() > 0) {
// If the process really is osqueryd, return an "error" status.
if (FLAGS_force) {
// The caller may choose to abort the existing daemon with --force.

View File

@ -0,0 +1,3 @@
ADD_OSQUERY_LIBRARY(TRUE osquery_distributed distributed.cpp)
ADD_OSQUERY_TEST(TRUE distributed_tests distributed_tests.cpp)

View File

@ -0,0 +1,171 @@
/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <sstream>
#include <boost/property_tree/json_parser.hpp>
#include <osquery/core.h>
#include <osquery/database.h>
#include <osquery/logger.h>
#include "osquery/distributed/distributed.h"
namespace pt = boost::property_tree;
namespace osquery {
DEFINE_osquery_flag(int32,
distributed_get_queries_retries,
3,
"Times to retry retrieving distributed queries");
DEFINE_osquery_flag(int32,
distributed_write_results_retries,
3,
"Times to retry writing distributed query results");
Status MockDistributedProvider::getQueriesJSON(std::string& query_json) {
query_json = queriesJSON_;
return Status();
}
Status MockDistributedProvider::writeResultsJSON(const std::string& results) {
resultsJSON_ = results;
return Status();
}
Status DistributedQueryHandler::parseQueriesJSON(
const std::string& query_json,
std::vector<DistributedQueryRequest>& requests) {
// Parse the JSON into a ptree
pt::ptree tree;
try {
std::istringstream query_stream(query_json);
pt::read_json(query_stream, tree);
}
catch (const std::exception& e) {
return Status(1, std::string("Error loading query JSON: ") + e.what());
}
// Parse the ptree into DistributedQueryRequests
std::vector<DistributedQueryRequest> results;
for (const auto& node : tree) {
const auto& request_tree = node.second;
DistributedQueryRequest request;
try {
request.query = request_tree.get_child("query").get_value<std::string>();
request.id = request_tree.get_child("id").get_value<std::string>();
} catch (const std::exception& e) {
return Status(1, std::string("Error parsing queries: ") + e.what());
}
results.push_back(request);
}
requests = std::move(results);
return Status();
}
SQL DistributedQueryHandler::handleQuery(const std::string& query_string) {
SQL query = SQL(query_string);
query.annotateHostInfo();
return query;
}
Status DistributedQueryHandler::serializeResults(
const std::vector<std::pair<DistributedQueryRequest, SQL> >& results,
pt::ptree& tree) {
try {
pt::ptree& res_tree = tree.put_child("results", pt::ptree());
for (const auto& result : results) {
DistributedQueryRequest request = result.first;
SQL sql = result.second;
pt::ptree& child = res_tree.put_child(request.id, pt::ptree());
child.put("status", sql.getStatus().getCode());
pt::ptree& rows_child = child.put_child("rows", pt::ptree());
Status s = serializeQueryData(sql.rows(), rows_child);
if (!s.ok()) {
return s;
}
}
}
catch (const std::exception& e) {
return Status(1, std::string("Error serializing results: ") + e.what());
}
return Status();
}
Status DistributedQueryHandler::doQueries() {
// Get and parse the queries
Status status;
std::string query_json;
int retries = 0;
do {
status = provider_->getQueriesJSON(query_json);
++retries;
} while (!status.ok() && retries <= FLAGS_distributed_get_queries_retries);
if (!status.ok()) {
return status;
}
std::vector<DistributedQueryRequest> requests;
status = parseQueriesJSON(query_json, requests);
if (!status.ok()) {
return status;
}
// Run the queries
std::vector<std::pair<DistributedQueryRequest, SQL> > query_results;
std::set<std::string> successful_query_ids;
for (const auto& request : requests) {
if (executedRequestIds_.find(request.id) != executedRequestIds_.end()) {
// We've already successfully returned results for this request, don't
// process it again.
continue;
}
SQL query_result = handleQuery(request.query);
if (query_result.ok()) {
successful_query_ids.insert(request.id);
}
query_results.push_back({request, query_result});
}
// Serialize the results
pt::ptree serialized_results;
serializeResults(query_results, serialized_results);
std::string json;
try {
std::ostringstream ss;
pt::write_json(ss, serialized_results, false);
json = ss.str();
}
catch (const std::exception& e) {
return Status(1, e.what());
}
// Write the results
retries = 0;
do {
status = provider_->writeResultsJSON(json);
++retries;
} while (!status.ok() && retries <= FLAGS_distributed_write_results_retries);
if (!status.ok()) {
return status;
}
// Only note that the queries were successfully completed if we were actually
// able to write the results.
executedRequestIds_.insert(successful_query_ids.begin(),
successful_query_ids.end());
return status;
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <set>
#include <vector>
#include <boost/property_tree/ptree.hpp>
#include <osquery/database/results.h>
#include <osquery/sql.h>
namespace osquery {
/**
* @brief This is an interface for distributed query "providers"
*
* Providers implement the communication between the distributed query master
* and the individual host. A provider may utilize any communications strategy
* that supports reading and writing JSON (i.e. HTTPS requests, reading from a
* file, querying a message queue, etc.)
*/
class IDistributedProvider {
public:
virtual ~IDistributedProvider() {}
/*
* @brief Get the JSON string containing the queries to be executed
*
* @param query_json A string to fill with the retrieved JSON
*
* @return osquery::Status indicating success or failure of the operation
*/
virtual Status getQueriesJSON(std::string& query_json) = 0;
/*
* @brief Write the results JSON back to the master
*
* @param results A string containing the results JSON
*
* @return osquery::Status indicating success or failure of the operation
*/
virtual Status writeResultsJSON(const std::string& results) = 0;
};
/**
* @brief A mocked implementation of IDistributedProvider
*
* This implementation is useful for writing unit tests of the
* DistributedQueryHandler functionality.
*/
class MockDistributedProvider : public IDistributedProvider {
public:
// These methods just read/write the corresponding public members
Status getQueriesJSON(std::string& query_json) override;
Status writeResultsJSON(const std::string& results) override;
std::string queriesJSON_;
std::string resultsJSON_;
};
/**
* @brief Small struct containing the query and ID information for a
* distributed query
*/
struct DistributedQueryRequest {
public:
explicit DistributedQueryRequest() {}
explicit DistributedQueryRequest(const std::string& q, const std::string& i)
: query(q), id(i) {}
std::string query;
std::string id;
};
/**
* @brief The main handler class for distributed queries
*
* This class is responsible for implementing the core functionality of
* distributed queries. It manages state, uses the provider to read/write from
* the master, and executes queries.
*/
class DistributedQueryHandler {
public:
/**
* @brief Construct a new handler with the given provider
*
* @param provider The provider used retrieving queries and writing results
*/
explicit DistributedQueryHandler(
std::unique_ptr<IDistributedProvider> provider)
: provider_(std::move(provider)) {}
/**
* @brief Retrieve queries, run them, and write results
*
* This is the core method of DistributedQueryHandler, tying together all the
* other components to read the requests from the provider, execute the
* queries, and write the results back to the provider.
*
* @return osquery::Status indicating success or failure of the operation
*/
Status doQueries();
/**
* @brief Run and annotate an individual query
*
* @param query_string A string containing the query to be executed
*
* @return A SQL object containing the (annotated) query results
*/
static SQL handleQuery(const std::string& query_string);
/**
* @brief Serialize the results of all requests into a ptree
*
* @param results The vector of requests and results
* @param tree The tree to serialize results into
*
* @return osquery::Status indicating success or failure of the operation
*/
static Status serializeResults(
const std::vector<std::pair<DistributedQueryRequest, SQL> >& results,
boost::property_tree::ptree& tree);
/**
* @brief Parse the query JSON into the individual query objects
*
* @param query_json The JSON string containing the queries
* @param requests A vector to fill with the query objects
*
* @return osquery::Status indicating success or failure of the parsing
*/
static Status parseQueriesJSON(const std::string& query_json,
std::vector<DistributedQueryRequest>& requests);
private:
// The provider used to read and write queries and results
std::unique_ptr<IDistributedProvider> provider_;
// Used to store already executed queries to avoid duplication. (Some master
// configurations may asynchronously process the results of requests, so a
// request might be seen by the host after it has already been executed.)
std::set<std::string> executedRequestIds_;
};
} // namespace osquery

View File

@ -0,0 +1,219 @@
/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <iostream>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <gtest/gtest.h>
#include <osquery/core.h>
#include <osquery/sql.h>
#include "osquery/distributed/distributed.h"
namespace pt = boost::property_tree;
namespace osquery {
class DistributedTests : public testing::Test {};
TEST_F(DistributedTests, test_test_distributed_provider) {
MockDistributedProvider p;
std::string query_string = "['foo']";
std::string result_string = "['bar']";
p.queriesJSON_ = query_string;
std::string query_json;
Status s = p.getQueriesJSON(query_json);
ASSERT_EQ(Status(), s);
EXPECT_EQ(query_string, query_json);
s = p.writeResultsJSON(result_string);
EXPECT_TRUE(s.ok());
EXPECT_EQ(result_string, p.resultsJSON_);
}
TEST_F(DistributedTests, test_parse_query_json) {
std::string request_json = R"([{"query": "foo", "id": "bar"}])";
std::vector<DistributedQueryRequest> requests;
Status s = DistributedQueryHandler::parseQueriesJSON(request_json, requests);
ASSERT_EQ(Status(), s);
EXPECT_EQ(1, requests.size());
EXPECT_EQ("foo", requests[0].query);
EXPECT_EQ("bar", requests[0].id);
std::string bad_json = R"([{"query": "foo", "id": "bar"}, {"query": "b"}])";
requests.clear();
s = DistributedQueryHandler::parseQueriesJSON(bad_json, requests);
ASSERT_FALSE(s.ok());
EXPECT_EQ(0, requests.size());
}
TEST_F(DistributedTests, test_handle_query) {
SQL query = DistributedQueryHandler::handleQuery("SELECT hour from time");
ASSERT_TRUE(query.ok());
QueryData rows = query.rows();
ASSERT_EQ(1, rows.size());
EXPECT_EQ(rows[0]["_source_host"], getHostname());
query = DistributedQueryHandler::handleQuery("bad query");
ASSERT_FALSE(query.ok());
rows = query.rows();
ASSERT_EQ(0, rows.size());
}
TEST_F(DistributedTests, test_serialize_results_empty) {
DistributedQueryRequest r0("foo", "foo_id");
MockSQL q0 = MockSQL();
pt::ptree tree;
DistributedQueryHandler::serializeResults({{r0, q0}}, tree);
EXPECT_EQ(0, tree.get<int>("results.foo_id.status"));
EXPECT_TRUE(tree.get_child("results.foo_id.rows").empty());
}
TEST_F(DistributedTests, test_serialize_results_basic) {
DistributedQueryRequest r0("foo", "foo_id");
QueryData rows0 = {{{"foo0", "foo0_val"}, {"bar0", "bar0_val"}},
{{"foo1", "foo1_val"}, {"bar1", "bar1_val"}}, };
MockSQL q0 = MockSQL(rows0);
pt::ptree tree;
DistributedQueryHandler::serializeResults({{r0, q0}}, tree);
EXPECT_EQ(0, tree.get<int>("results.foo_id.status"));
const pt::ptree& tree_rows = tree.get_child("results.foo_id.rows");
EXPECT_EQ(2, tree_rows.size());
auto row = tree_rows.begin();
EXPECT_EQ("foo0_val", row->second.get<std::string>("foo0"));
EXPECT_EQ("bar0_val", row->second.get<std::string>("bar0"));
++row;
EXPECT_EQ("foo1_val", row->second.get<std::string>("foo1"));
EXPECT_EQ("bar1_val", row->second.get<std::string>("bar1"));
}
TEST_F(DistributedTests, test_serialize_results_multiple) {
DistributedQueryRequest r0("foo", "foo_id");
QueryData rows0 = {{{"foo0", "foo0_val"}, {"bar0", "bar0_val"}},
{{"foo1", "foo1_val"}, {"bar1", "bar1_val"}}, };
MockSQL q0 = MockSQL(rows0);
DistributedQueryRequest r1("bar", "bar_id");
MockSQL q1 = MockSQL({}, Status(1, "Fail"));
pt::ptree tree;
DistributedQueryHandler::serializeResults({{r0, q0}, {r1, q1}}, tree);
EXPECT_EQ(0, tree.get<int>("results.foo_id.status"));
const pt::ptree& tree_rows = tree.get_child("results.foo_id.rows");
EXPECT_EQ(2, tree_rows.size());
auto row = tree_rows.begin();
EXPECT_EQ("foo0_val", row->second.get<std::string>("foo0"));
EXPECT_EQ("bar0_val", row->second.get<std::string>("bar0"));
++row;
EXPECT_EQ("foo1_val", row->second.get<std::string>("foo1"));
EXPECT_EQ("bar1_val", row->second.get<std::string>("bar1"));
EXPECT_EQ(1, tree.get<int>("results.bar_id.status"));
const pt::ptree& fail_rows = tree.get_child("results.bar_id.rows");
EXPECT_EQ(0, fail_rows.size());
}
TEST_F(DistributedTests, test_do_queries) {
auto provider_raw = new MockDistributedProvider();
provider_raw->queriesJSON_ =
R"([
{"query": "SELECT hour FROM time", "id": "hour"},
{"query": "bad", "id": "bad"},
{"query": "SELECT minutes FROM time", "id": "minutes"}
])";
std::unique_ptr<MockDistributedProvider>
provider(provider_raw);
DistributedQueryHandler handler(std::move(provider));
Status s = handler.doQueries();
ASSERT_EQ(Status(), s);
pt::ptree tree;
std::istringstream json_stream(provider_raw->resultsJSON_);
ASSERT_NO_THROW(pt::read_json(json_stream, tree));
{
EXPECT_EQ(0, tree.get<int>("results.hour.status"));
const pt::ptree& tree_rows = tree.get_child("results.hour.rows");
EXPECT_EQ(1, tree_rows.size());
auto row = tree_rows.begin();
EXPECT_GE(row->second.get<int>("hour"), 0);
EXPECT_LE(row->second.get<int>("hour"), 24);
EXPECT_EQ(getHostname(), row->second.get<std::string>("_source_host"));
}
{
// this query should have failed
EXPECT_EQ(1, tree.get<int>("results.bad.status"));
const pt::ptree& tree_rows = tree.get_child("results.bad.rows");
EXPECT_EQ(0, tree_rows.size());
}
{
EXPECT_EQ(0, tree.get<int>("results.minutes.status"));
const pt::ptree& tree_rows = tree.get_child("results.minutes.rows");
EXPECT_EQ(1, tree_rows.size());
auto row = tree_rows.begin();
EXPECT_GE(row->second.get<int>("minutes"), 0);
EXPECT_LE(row->second.get<int>("minutes"), 60);
EXPECT_EQ(getHostname(), row->second.get<std::string>("_source_host"));
}
}
TEST_F(DistributedTests, test_duplicate_request) {
auto provider_raw = new MockDistributedProvider();
provider_raw->queriesJSON_ =
R"([
{"query": "SELECT hour FROM time", "id": "hour"}
])";
std::unique_ptr<MockDistributedProvider>
provider(provider_raw);
DistributedQueryHandler handler(std::move(provider));
Status s = handler.doQueries();
ASSERT_EQ(Status(), s);
pt::ptree tree;
std::istringstream json_stream(provider_raw->resultsJSON_);
ASSERT_NO_THROW(pt::read_json(json_stream, tree));
EXPECT_EQ(0, tree.get<int>("results.hour.status"));
const pt::ptree& tree_rows = tree.get_child("results.hour.rows");
EXPECT_EQ(1, tree_rows.size());
auto row = tree_rows.begin();
EXPECT_GE(row->second.get<int>("hour"), 0);
EXPECT_LE(row->second.get<int>("hour"), 24);
EXPECT_EQ(getHostname(), row->second.get<std::string>("_source_host"));
// The second time, 'hour' should not be executed again
s = handler.doQueries();
ASSERT_EQ(Status(), s);
json_stream.str(provider_raw->resultsJSON_);
ASSERT_NO_THROW(pt::read_json(json_stream, tree));
EXPECT_EQ(0, tree.get_child("results").size());
}
}
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
osquery::initOsquery(argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -32,12 +32,22 @@ const std::map<tables::ConstraintOperator, std::string> kSQLOperatorRepr = {
SQL::SQL(const std::string& q) { status_ = query(q, results_); }
QueryData SQL::rows() { return results_; }
const QueryData& SQL::rows() { return results_; }
bool SQL::ok() { return status_.ok(); }
Status SQL::getStatus() { return status_; }
std::string SQL::getMessageString() { return status_.toString(); }
const std::string SQL::kHostColumnName = "_source_host";
void SQL::annotateHostInfo() {
std::string hostname = getHostname();
for (Row& row : results_) {
row[kHostColumnName] = hostname;
}
}
std::vector<std::string> SQL::getTableNames() {
std::vector<std::string> results;
for (const auto& name : Registry::names("table")) {