fbthrift: Second set of Apache and FbThrift refactors (#4312)

This commit is contained in:
Teddy Reed 2018-05-01 13:26:10 -04:00 committed by GitHub
parent cbb22c887f
commit 7919791637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 996 additions and 468 deletions

View File

@ -32,8 +32,8 @@ extern const size_t kExtensionInitializeLatency;
struct ExtensionInfo {
std::string name;
std::string version;
std::string min_sdk_version;
std::string sdk_version;
std::string min_sdk_version;
};
typedef std::map<RouteUUID, ExtensionInfo> ExtensionList;

View File

@ -28,8 +28,6 @@
#include "osquery/extensions/interface.h"
#include "osquery/filesystem/fileops.h"
using namespace osquery::extensions;
namespace fs = boost::filesystem;
namespace osquery {
@ -164,10 +162,9 @@ Status extensionPathActive(const std::string& path, bool use_timeout = false) {
return applyExtensionDelay(([path, &use_timeout](bool& stop) {
if (socketExists(path)) {
try {
ExtensionStatus status;
// Create a client with a 10-second receive timeout.
EXManagerClient client(path, 10 * 1000);
client.get()->API_PING(status);
ExtensionManagerClient client(path, 10 * 1000);
auto status = client.ping();
return Status(0, "OK");
} catch (const std::exception& /* e */) {
// Path might exist without a connected extension or extension manager.
@ -214,8 +211,8 @@ void ExtensionManagerWatcher::start() {
for (const auto& uuid : uuids) {
try {
auto path = getExtensionSocket(uuid);
EXClient client(path);
client.get()->API_SHUTDOWN();
ExtensionClient client(path);
client.shutdown();
} catch (const std::exception& /* e */) {
VLOG(1) << "Extension UUID " << uuid << " shutdown request failed";
continue;
@ -233,13 +230,13 @@ void ExtensionWatcher::exitFatal(int return_code) {
void ExtensionWatcher::watch() {
// Attempt to ping the extension core.
// This does NOT use pingExtension to avoid the latency checks applied.
ExtensionStatus status;
Status status;
bool core_sane = true;
if (socketExists(path_)) {
try {
EXManagerClient client(path_);
ExtensionManagerClient client(path_);
// Ping the extension manager until it goes down.
client.get()->API_PING(status);
status = client.ping();
} catch (const std::exception& /* e */) {
core_sane = false;
}
@ -253,7 +250,7 @@ void ExtensionWatcher::watch() {
exitFatal(0);
}
if (status.code != (int)ExtensionCode::EXT_SUCCESS && fatal_) {
if (status.getCode() != (int)ExtensionCode::EXT_SUCCESS && fatal_) {
// The core may be healthy but return a failed ping status.
exitFatal();
}
@ -264,7 +261,7 @@ void ExtensionManagerWatcher::watch() {
// will be deregistered.
const auto uuids = RegistryFactory::get().routeUUIDs();
ExtensionStatus status;
Status status;
for (const auto& uuid : uuids) {
auto path = getExtensionSocket(uuid);
auto exists = socketExists(path);
@ -281,9 +278,9 @@ void ExtensionManagerWatcher::watch() {
failures_[uuid] = 1;
if (exists.ok()) {
try {
EXClient client(path);
ExtensionClient client(path);
// Ping the extension until it goes down.
client.get()->API_PING(status);
status = client.ping();
} catch (const std::exception& /* e */) {
failures_[uuid] += 1;
continue;
@ -294,7 +291,7 @@ void ExtensionManagerWatcher::watch() {
continue;
}
if (status.code != (int)ExtensionCode::EXT_SUCCESS) {
if (status.getCode() != (int)ExtensionCode::EXT_SUCCESS) {
LOG(INFO) << "Extension UUID " << uuid << " ping failed";
failures_[uuid] += 1;
} else {
@ -471,34 +468,34 @@ Status startExtension(const std::string& manager_path,
// The Registry broadcast is used as the ExtensionRegistry.
auto broadcast = RegistryFactory::get().getBroadcast();
// The extension will register and provide name, version, sdk details.
InternalExtensionInfo info;
ExtensionInfo info;
info.name = name;
info.version = version;
info.sdk_version = sdk_version;
info.min_sdk_version = min_sdk_version;
// If registration is successful, we will also request the manager's options.
InternalOptionList options;
OptionList options;
// Register the extension's registry broadcast with the manager.
ExtensionStatus ext_status;
RouteUUID uuid = 0;
try {
EXManagerClient client(manager_path);
client.get()->API_REGISTER(ext_status, info, broadcast);
ExtensionManagerClient client(manager_path);
status = client.registerExtension(info, broadcast, uuid);
// The main reason for a failed registry is a duplicate extension name
// (the extension process is already running), or the extension broadcasts
// a duplicate registry item.
if (ext_status.code != (int)ExtensionCode::EXT_SUCCESS) {
return Status(ext_status.code, ext_status.message);
if (status.getCode() == (int)ExtensionCode::EXT_FAILED) {
return status;
}
// Request the core options, mainly to set the active registry plugins for
// logger and config.
client.get()->API_OPTIONS(options);
options = client.options();
} catch (const std::exception& e) {
return Status(1, "Extension register failed: " + std::string(e.what()));
}
// Now that the UUID is known, try to clean up stale socket paths.
auto extension_path = getExtensionSocket(ext_status.uuid, manager_path);
auto extension_path = getExtensionSocket(uuid, manager_path);
status = socketExists(extension_path, true);
if (!status) {
@ -515,11 +512,10 @@ Status startExtension(const std::string& manager_path,
rf.setUp();
// Start the extension's Thrift server
Dispatcher::addService(
std::make_shared<ExtensionRunner>(manager_path, ext_status.uuid));
VLOG(1) << "Extension (" << name << ", " << ext_status.uuid << ", " << version
<< ", " << sdk_version << ") registered";
return Status(0, std::to_string(ext_status.uuid));
Dispatcher::addService(std::make_shared<ExtensionRunner>(manager_path, uuid));
VLOG(1) << "Extension (" << name << ", " << uuid << ", " << version << ", "
<< sdk_version << ") registered";
return Status(0, std::to_string(uuid));
}
Status queryExternal(const std::string& manager_path,
@ -531,19 +527,14 @@ Status queryExternal(const std::string& manager_path,
return status;
}
ExtensionResponse response;
try {
EXManagerClient client(manager_path);
client.get()->API_QUERY(response, query);
ExtensionManagerClient client(manager_path);
status = client.query(query, results);
} catch (const std::exception& e) {
return Status(1, "Extension call failed: " + std::string(e.what()));
}
for (const auto& row : response.response) {
results.push_back(row);
}
return Status(response.status.code, response.status.message);
return status;
}
Status queryExternal(const std::string& query, QueryData& results) {
@ -559,23 +550,23 @@ Status getQueryColumnsExternal(const std::string& manager_path,
return status;
}
ExtensionResponse response;
QueryData qd;
try {
EXManagerClient client(manager_path);
client.get()->API_COLUMNS(response, query);
ExtensionManagerClient client(manager_path);
status = client.getQueryColumns(query, qd);
} catch (const std::exception& e) {
return Status(1, "Extension call failed: " + std::string(e.what()));
}
// Translate response map: {string: string} to a vector: pair(name, type).
for (const auto& column : response.response) {
for (const auto& column : qd) {
for (const auto& col : column) {
columns.push_back(std::make_tuple(
col.first, columnTypeName(col.second), ColumnOptions::DEFAULT));
}
}
return Status(response.status.code, response.status.message);
return status;
}
Status getQueryColumnsExternal(const std::string& query,
@ -594,15 +585,14 @@ Status pingExtension(const std::string& path) {
return status;
}
ExtensionStatus ext_status;
try {
EXClient client(path);
client.get()->API_PING(ext_status);
ExtensionClient client(path);
status = client.ping();
} catch (const std::exception& e) {
return Status(1, "Extension call failed: " + std::string(e.what()));
}
return Status(ext_status.code, ext_status.message);
return Status(0, status.getMessage());
}
Status getExtensions(ExtensionList& extensions) {
@ -620,10 +610,10 @@ Status getExtensions(const std::string& manager_path,
return status;
}
InternalExtensionList ext_list;
ExtensionList ext_list;
try {
EXManagerClient client(manager_path);
client.get()->API_EXTENSIONS(ext_list);
ExtensionManagerClient client(manager_path);
ext_list = client.extensions();
} catch (const std::exception& e) {
return Status(1, "Extension call failed: " + std::string(e.what()));
}
@ -665,21 +655,14 @@ Status callExtension(const std::string& extension_path,
return status;
}
ExtensionResponse ext_response;
try {
EXClient client(extension_path);
client.get()->API_CALL(ext_response, registry, item, request);
ExtensionClient client(extension_path);
status = client.call(registry, item, request, response);
} catch (const std::exception& e) {
return Status(1, "Extension call failed: " + std::string(e.what()));
}
// Convert from Thrift-internal list type to PluginResponse type.
if (ext_response.status.code == (int)ExtensionCode::EXT_SUCCESS) {
for (const auto& response_item : ext_response.response) {
response.push_back(response_item);
}
}
return Status(ext_response.status.code, ext_response.status.message);
return status;
}
Status startExtensionWatcher(const std::string& manager_path,

View File

@ -12,132 +12,418 @@
#include <osquery/filesystem.h>
#include <osquery/system.h>
#include "osquery/extensions/interface.h"
#include <thrift/lib/cpp/async/TAsyncSocket.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include <thrift/lib/cpp2/protocol/BinaryProtocol.h>
#include <thrift/lib/cpp2/server/ThriftServer.h>
using namespace osquery::extensions;
// Include intermediate Thrift-generated interface definitions.
#include "osquery/gen-cpp2/Extension.h"
#include "osquery/gen-cpp2/ExtensionManager.h"
#include "osquery/extensions/interface.h"
namespace osquery {
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
using namespace apache::thrift::concurrency;
using namespace extensions;
typedef std::shared_ptr<AsyncProcessorFactory> TProcessorRef;
using TThreadedServerRef = std::shared_ptr<ThriftServer>;
class ExtensionHandler : virtual public extensions::ExtensionSvIf,
public ExtensionInterface {
public:
ExtensionHandler() : ExtensionInterface(0) {}
explicit ExtensionHandler(RouteUUID uuid) : ExtensionInterface(uuid) {}
typedef std::shared_ptr<ExtensionHandler> ExtensionHandlerRef;
typedef std::shared_ptr<ExtensionManagerHandler> ExtensionManagerHandlerRef;
public:
using ExtensionInterface::ping;
void ping(ExtensionStatus& _return) override;
struct ImpExtensionRunner {
TThreadedServerRef server_{nullptr};
TProcessorRef processor_{nullptr};
using ExtensionInterface::call;
void call(ExtensionResponse& _return,
const std::string& registry,
const std::string& item,
const ExtensionPluginRequest& request) override;
using ExtensionInterface::shutdown;
void shutdown() override;
protected:
/// UUID accessor.
RouteUUID getUUID() const;
};
struct ImpExtensionManagerServer {
folly::EventBase base_;
class ExtensionManagerHandler : virtual public extensions::ExtensionManagerSvIf,
public ExtensionManagerInterface,
public ExtensionHandler {
public:
ExtensionManagerHandler() = default;
public:
using ExtensionManagerInterface::extensions;
void extensions(InternalExtensionList& _return) override;
using ExtensionManagerInterface::options;
void options(InternalOptionList& _return) override;
using ExtensionManagerInterface::registerExtension;
void registerExtension(ExtensionStatus& _return,
const InternalExtensionInfo& info,
const ExtensionRegistry& registry) override;
using ExtensionManagerInterface::deregisterExtension;
void deregisterExtension(ExtensionStatus& _return,
const ExtensionRouteUUID uuid) override;
using ExtensionManagerInterface::query;
void query(ExtensionResponse& _return, const std::string& sql) override;
using ExtensionManagerInterface::getQueryColumns;
void getQueryColumns(ExtensionResponse& _return,
const std::string& sql) override;
};
ExtensionRunnerImpl::~ExtensionRunnerImpl() {
struct ImplExtensionRunner {
std::unique_ptr<apache::thrift::ThriftServer> server;
std::shared_ptr<ExtensionHandler> handler;
/// Raw socket descriptor.
int sd;
};
struct ImplExtensionClient {
std::shared_ptr<extensions::ExtensionAsyncClient> e;
std::shared_ptr<extensions::ExtensionManagerAsyncClient> em;
folly::EventBase base;
/// Raw socket descriptor.
int sd;
};
void ExtensionHandler::ping(ExtensionStatus& _return) {
auto s = ExtensionInterface::ping();
_return.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.uuid = s.getCode();
_return.message = s.getMessage();
}
void ExtensionHandler::call(ExtensionResponse& _return,
const std::string& registry,
const std::string& item,
const ExtensionPluginRequest& request) {
PluginRequest plugin_request;
for (const auto& request_item : request) {
// Create a PluginRequest from an ExtensionPluginRequest.
plugin_request[request_item.first] = request_item.second;
}
PluginResponse response;
auto s = ExtensionInterface::call(registry, item, plugin_request, response);
_return.status.code = s.getCode();
_return.status.message = s.getMessage();
_return.status.uuid = getUUID();
if (s.ok()) {
for (const auto& response_item : response) {
// Translate a PluginResponse to an ExtensionPluginResponse.
_return.response.push_back(response_item);
}
}
}
void ExtensionHandler::shutdown() {}
RouteUUID ExtensionHandler::getUUID() const {
return uuid_;
}
void ExtensionManagerHandler::extensions(InternalExtensionList& _return) {
//_return = ExtensionManagerInterface::extensions();
auto extensions = ExtensionManagerInterface::extensions();
for (const auto& extension : extensions) {
auto& ext = _return[extension.first];
ext.min_sdk_version = extension.second.min_sdk_version;
ext.version = extension.second.version;
ext.sdk_version = extension.second.sdk_version;
ext.name = extension.second.name;
}
}
void ExtensionManagerHandler::options(InternalOptionList& _return) {
auto options = ExtensionManagerInterface::options();
for (const auto& option : options) {
_return[option.first].value = option.second.value;
_return[option.first].default_value = option.second.default_value;
_return[option.first].type = option.second.type;
}
}
void ExtensionManagerHandler::registerExtension(
ExtensionStatus& _return,
const InternalExtensionInfo& info,
const ExtensionRegistry& registry,
RouteUUID& uuid) {
ExtensionRegistry er;
for (const auto& rt : registry) {
er[rt.first] = rt.second;
}
RouteUUID uuid;
auto s = ExtensionManagerInterface::registerExtension(
{info.name, info.version, info.sdk_version, info.min_sdk_version},
er,
uuid);
_return.message = s.getMessage();
if (s.ok()) {
_return.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.uuid = uuid;
} else {
_return.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
void ExtensionManagerHandler::deregisterExtension(
ExtensionStatus& _return, const ExtensionRouteUUID uuid) {
auto s = ExtensionManagerInterface::deregisterExtension(uuid);
_return.message = s.getMessage();
if (s.ok()) {
_return.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.uuid = getUUID();
} else {
_return.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
void ExtensionManagerHandler::query(ExtensionResponse& _return,
const std::string& sql) {
QueryData qd;
auto s = ExtensionManagerInterface::query(sql, qd);
for (auto& row : qd) {
_return.response.emplace_back(std::move(row));
}
_return.status.message = s.getMessage();
if (s.ok()) {
_return.status.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.status.uuid = getUUID();
} else {
_return.status.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
void ExtensionManagerHandler::getQueryColumns(ExtensionResponse& _return,
const std::string& sql) {
QueryData qd;
auto s = ExtensionManagerInterface::getQueryColumns(sql, qd);
for (auto& row : qd) {
_return.response.emplace_back(std::move(row));
}
_return.status.message = s.getMessage();
if (s.ok()) {
_return.status.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.status.uuid = getUUID();
} else {
_return.status.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
ExtensionRunnerInterface::~ExtensionRunnerInterface() {
removePath(path_);
if (raw_socket_ > 0) {
close(raw_socket_);
raw_socket_ = 0;
if (server_->sd > 0) {
close(server_->sd);
server_->sd = 0;
}
};
ExtensionRunnerImpl::ExtensionRunnerImpl()
: server{std::make_unique<ImpExtensionRunner>()} {}
ExtensionRunnerInterface::ExtensionRunnerInterface()
: server_{std::make_unique<ImplExtensionRunner>()} {}
void ExtensionRunnerImpl::serve() {
void ExtensionRunnerInterface::serve() {
// Start the Thrift server's run loop.
server->server_->serve();
server_->server->serve();
}
void ExtensionRunnerImpl::connect() {
server->server_ = TThreadedServerRef(new ThriftServer());
server->server_->setProcessorFactory(server->processor_);
void ExtensionRunnerInterface::connect() {
server_->server = std::make_unique<apache::thrift::ThriftServer>();
server_->server->setInterface(server_->handler);
raw_socket_ = socket(AF_UNIX, SOCK_STREAM, 0);
server_->sd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, path_.c_str(), sizeof(addr.sun_path) - 1);
if (bind(raw_socket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
if (::bind(server_->sd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
throw std::runtime_error("Cannot bind to socket");
}
server->server_->useExistingSocket(raw_socket_);
server_->server->useExistingSocket(server_->sd);
}
void ExtensionRunnerImpl::init(RouteUUID uuid) {
void ExtensionRunnerInterface::init(RouteUUID uuid, bool manager) {
manager_ = manager;
// Create the thrift instances.
auto handler = ExtensionHandlerRef(new ExtensionHandler(uuid));
server->processor_ =
std::make_shared<ThriftServerAsyncProcessorFactory<ExtensionHandler>>(
handler);
}
void ExtensionRunnerImpl::initManager() {
// Create the thrift instances.
auto handler = ExtensionManagerHandlerRef(new ExtensionManagerHandler());
server->processor_ = std::make_shared<
ThriftServerAsyncProcessorFactory<ExtensionManagerHandler>>(handler);
}
void ExtensionRunnerImpl::stopServer() {
// In most cases the service thread has started before the stop request.
if (server->server_ != nullptr) {
server->server_->stop();
if (!manager_) {
server_->handler = std::make_shared<ExtensionHandler>(uuid);
} else {
server_->handler = std::make_shared<ExtensionManagerHandler>();
}
}
void ExtensionRunnerImpl::stopServerManager() {
if (server->server_ != nullptr) {
void ExtensionRunnerInterface::stopServer() {
// In most cases the service thread has started before the stop request.
if (server_->server != nullptr) {
server_->server->stop();
}
}
void ExtensionRunnerInterface::stopServerManager() {
if (server_->server != nullptr) {
removeStalePaths(path_);
}
}
EXInternal::EXInternal(const std::string& path)
: path_(path), server{std::make_unique<ImpExtensionManagerServer>()} {
raw_socket_ = socket(AF_UNIX, SOCK_STREAM, 0);
void ExtensionClientCore::init(const std::string& path, bool manager) {
path_ = path;
manager_ = manager;
client_ = std::make_unique<ImplExtensionClient>();
client_->sd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, path_.c_str(), sizeof(addr.sun_path) - 1);
if (connect(raw_socket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
if (::connect(client_->sd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
throw std::runtime_error("Cannot connect to socket");
}
auto tsock(apache::thrift::async::TAsyncSocket::newSocket(&client_->base,
client_->sd));
auto channel(apache::thrift::HeaderClientChannel::newChannel(tsock));
channel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL);
channel->setClientType(THRIFT_UNFRAMED_DEPRECATED);
if (!manager_) {
client_->e = std::make_shared<ExtensionAsyncClient>(std::move(channel));
} else {
client_->em =
std::make_shared<ExtensionManagerAsyncClient>(std::move(channel));
}
}
EXInternal::~EXInternal() = default;
ExtensionClientCore::~ExtensionClientCore() = default;
void EXInternal::setTimeouts(size_t timeouts) {}
void ExtensionClientCore::setTimeouts(size_t /* timeouts */) {}
EXClient::EXClient(const std::string& path, size_t timeout) : EXInternal(path) {
bool ExtensionClientCore::manager() {
return manager_;
}
ExtensionClient::ExtensionClient(const std::string& path, size_t timeout) {
init(path, false);
setTimeouts(timeout);
// client_ = std::make_shared<_Client>(HeaderClientChannel::newChannel(
// async::TAsyncSocket::newSocket(&server->base_, raw_socket_)));
}
EXManagerClient::EXManagerClient(const std::string& manager_path,
size_t timeout)
: EXInternal(manager_path) {
ExtensionManagerClient::ExtensionManagerClient(const std::string& path,
size_t timeout) {
init(path, true);
setTimeouts(timeout);
// client_ = std::make_shared<_ManagerClient>(HeaderClientChannel::newChannel(
// async::TAsyncSocket::newSocket(&server->base_, raw_socket_)));
}
const std::shared_ptr<_Client>& EXClient::get() const {
return client_;
Status ExtensionClient::ping() {
ExtensionStatus status;
auto client = manager() ? client_->em : client_->e;
client->sync_ping(status);
if (status.code != (int)extensions::ExtensionCode::EXT_FAILED) {
return Status(0, status.message);
}
return Status(1);
}
const std::shared_ptr<_ManagerClient>& EXManagerClient::get() const {
return client_;
Status ExtensionClient::call(const std::string& registry,
const std::string& item,
const PluginRequest& request,
PluginResponse& response) {
ExtensionResponse er;
auto client = manager() ? client_->em : client_->e;
client->sync_call(er, registry, item, request);
for (const auto& r : er.response) {
response.push_back(r);
}
return Status(er.status.code, er.status.message);
}
} // namespace osquery
void ExtensionClient::shutdown() {
auto client = manager() ? client_->em : client_->e;
client->sync_shutdown();
}
ExtensionList ExtensionManagerClient::extensions() {
ExtensionList el;
InternalExtensionList iel;
client_->em->sync_extensions(iel);
for (const auto& extension : iel) {
auto& ext = el[extension.first];
ext.min_sdk_version = extension.second.min_sdk_version;
ext.version = extension.second.version;
ext.sdk_version = extension.second.sdk_version;
ext.name = extension.second.name;
}
return el;
}
OptionList ExtensionManagerClient::options() {
OptionList ol;
InternalOptionList iol;
client_->em->sync_options(iol);
for (const auto& option : iol) {
auto& opt = option.second;
ol[option.first] = {opt.value, opt.default_value, opt.type};
}
return ol;
}
Status ExtensionManagerClient::registerExtension(
const ExtensionInfo& info,
const ExtensionRegistry& registry,
RouteUUID& uuid) {
InternalExtensionInfo iei;
iei.name = info.name;
iei.version = info.version;
iei.sdk_version = info.sdk_version;
iei.min_sdk_version = info.min_sdk_version;
ExtensionStatus status;
client_->em->sync_registerExtension(status, iei, registry);
uuid = status.uuid;
return Status(status.code, status.message);
}
Status ExtensionManagerClient::query(const std::string& sql, QueryData& qd) {
ExtensionResponse er;
client_->em->sync_query(er, sql);
for (const auto& row : er.response) {
qd.push_back(row);
}
return Status();
}
Status ExtensionManagerClient::getQueryColumns(const std::string& sql,
QueryData& qd) {
ExtensionResponse er;
client_->em->sync_getQueryColumns(er, sql);
for (const auto& row : er.response) {
qd.push_back(row);
}
return Status(er.status.code, er.status.message);
}
Status ExtensionManagerClient::deregisterExtension(RouteUUID uuid) {
ExtensionStatus status;
client_->em->sync_deregisterExtension(status, uuid);
return Status(status.code, status.message);
}
ExtensionClient::~ExtensionClient() {}
ExtensionManagerClient::~ExtensionManagerClient() {}
} // namespace osquery

View File

@ -12,8 +12,6 @@
#include <osquery/filesystem.h>
#include <osquery/system.h>
#include "osquery/extensions/interface.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TThreadedServer.h>
@ -27,7 +25,10 @@
#include <thrift/transport/TSocket.h>
#endif
using namespace osquery::extensions;
#include "Extension.h"
#include "ExtensionManager.h"
#include "osquery/extensions/interface.h"
namespace osquery {
@ -37,130 +38,422 @@ using namespace apache::thrift::server;
using namespace apache::thrift::concurrency;
#ifdef WIN32
typedef TPipe TPlatformSocket;
typedef TPipeServer TPlatformServerSocket;
typedef std::shared_ptr<TPipe> TPlatformSocketRef;
using TPlatformServerSocket = TPipeServer;
using TPlatformSocket = TPipe;
#else
typedef TSocket TPlatformSocket;
typedef TServerSocket TPlatformServerSocket;
typedef std::shared_ptr<TSocket> TPlatformSocketRef;
using TPlatformServerSocket = TServerSocket;
using TPlatformSocket = TSocket;
#endif
typedef std::shared_ptr<TTransport> TTransportRef;
typedef std::shared_ptr<TProtocol> TProtocolRef;
typedef std::shared_ptr<TServerTransport> TServerTransportRef;
typedef std::shared_ptr<TProcessor> TProcessorRef;
typedef std::shared_ptr<TTransportFactory> TTransportFactoryRef;
typedef std::shared_ptr<TProtocolFactory> TProtocolFactoryRef;
using TThreadedServerRef = std::shared_ptr<TThreadedServer>;
class ExtensionHandler : virtual public extensions::ExtensionIf,
public ExtensionInterface {
public:
ExtensionHandler() : ExtensionInterface(0) {}
explicit ExtensionHandler(RouteUUID uuid) : ExtensionInterface(uuid) {}
typedef std::shared_ptr<ExtensionHandler> ExtensionHandlerRef;
typedef std::shared_ptr<ExtensionManagerHandler> ExtensionManagerHandlerRef;
public:
using ExtensionInterface::ping;
void ping(extensions::ExtensionStatus& _return) override;
struct ImpExtensionRunner {
TServerTransportRef transport_{nullptr};
TThreadedServerRef server_{nullptr};
TProcessorRef processor_{nullptr};
using ExtensionInterface::call;
void call(extensions::ExtensionResponse& _return,
const std::string& registry,
const std::string& item,
const extensions::ExtensionPluginRequest& request) override;
using ExtensionInterface::shutdown;
void shutdown() override;
protected:
/// UUID accessor.
RouteUUID getUUID() const;
};
struct ImpExtensionManagerServer {
TPlatformSocketRef socket_;
TTransportRef transport_;
TProtocolRef protocol_;
#ifdef WIN32
#pragma warning(push, 3)
#pragma warning(disable : 4250)
#endif
class ExtensionManagerHandler : virtual public extensions::ExtensionManagerIf,
public ExtensionManagerInterface,
public ExtensionHandler {
public:
ExtensionManagerHandler() = default;
public:
using ExtensionManagerInterface::extensions;
void extensions(extensions::InternalExtensionList& _return) override;
using ExtensionManagerInterface::options;
void options(extensions::InternalOptionList& _return) override;
using ExtensionManagerInterface::registerExtension;
void registerExtension(
extensions::ExtensionStatus& _return,
const extensions::InternalExtensionInfo& info,
const extensions::ExtensionRegistry& registry) override;
using ExtensionManagerInterface::deregisterExtension;
void deregisterExtension(extensions::ExtensionStatus& _return,
const extensions::ExtensionRouteUUID uuid) override;
using ExtensionManagerInterface::query;
void query(extensions::ExtensionResponse& _return,
const std::string& sql) override;
using ExtensionManagerInterface::getQueryColumns;
void getQueryColumns(extensions::ExtensionResponse& _return,
const std::string& sql) override;
public:
using ExtensionHandler::call;
using ExtensionHandler::ping;
using ExtensionHandler::shutdown;
};
ExtensionRunnerImpl::~ExtensionRunnerImpl() {
if (pathExists(path_).ok()) {
removePath(path_);
}
#ifdef WIN32
#pragma warning(pop)
#endif
struct ImplExtensionRunner {
std::shared_ptr<TServerTransport> transport;
std::shared_ptr<TThreadedServer> server;
std::shared_ptr<TProcessor> processor;
};
ExtensionRunnerImpl::ExtensionRunnerImpl()
: server{std::make_unique<ImpExtensionRunner>()} {}
struct ImplExtensionClient {
std::shared_ptr<extensions::ExtensionClient> e;
std::shared_ptr<extensions::ExtensionManagerClient> em;
void ExtensionRunnerImpl::serve() {
// Start the Thrift server's run loop.
server->server_->serve();
std::shared_ptr<TBufferedTransport> transport;
std::shared_ptr<TPlatformSocket> socket;
};
void ExtensionHandler::ping(extensions::ExtensionStatus& _return) {
auto s = ExtensionInterface::ping();
_return.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.uuid = s.getCode();
_return.message = s.getMessage();
}
void ExtensionRunnerImpl::connect() {
server->transport_ = TServerTransportRef(new TPlatformServerSocket(path_));
void ExtensionHandler::call(extensions::ExtensionResponse& _return,
const std::string& registry,
const std::string& item,
const extensions::ExtensionPluginRequest& request) {
PluginRequest plugin_request;
for (const auto& request_item : request) {
// Create a PluginRequest from an ExtensionPluginRequest.
plugin_request[request_item.first] = request_item.second;
}
PluginResponse response;
auto s = ExtensionInterface::call(registry, item, plugin_request, response);
_return.status.code = s.getCode();
_return.status.message = s.getMessage();
_return.status.uuid = getUUID();
if (s.ok()) {
for (const auto& response_item : response) {
// Translate a PluginResponse to an ExtensionPluginResponse.
_return.response.push_back(response_item);
}
}
}
void ExtensionHandler::shutdown() {}
RouteUUID ExtensionHandler::getUUID() const {
return uuid_;
}
void ExtensionManagerHandler::extensions(
extensions::InternalExtensionList& _return) {
//_return = ExtensionManagerInterface::extensions();
auto extensions = ExtensionManagerInterface::extensions();
for (const auto& extension : extensions) {
auto& ext = _return[extension.first];
ext.min_sdk_version = extension.second.min_sdk_version;
ext.version = extension.second.version;
ext.sdk_version = extension.second.sdk_version;
ext.name = extension.second.name;
}
}
void ExtensionManagerHandler::options(extensions::InternalOptionList& _return) {
auto options = ExtensionManagerInterface::options();
for (const auto& option : options) {
_return[option.first].value = option.second.value;
_return[option.first].default_value = option.second.default_value;
_return[option.first].type = option.second.type;
}
}
void ExtensionManagerHandler::registerExtension(
extensions::ExtensionStatus& _return,
const extensions::InternalExtensionInfo& info,
const extensions::ExtensionRegistry& registry) {
extensions::ExtensionRegistry er;
for (const auto& rt : registry) {
er[rt.first] = rt.second;
}
RouteUUID uuid;
auto s = ExtensionManagerInterface::registerExtension(
{info.name, info.version, info.sdk_version, info.min_sdk_version},
er,
uuid);
_return.message = s.getMessage();
if (s.ok()) {
_return.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.uuid = uuid;
} else {
_return.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
void ExtensionManagerHandler::deregisterExtension(
extensions::ExtensionStatus& _return,
const extensions::ExtensionRouteUUID uuid) {
auto s = ExtensionManagerInterface::deregisterExtension(uuid);
_return.message = s.getMessage();
if (s.ok()) {
_return.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.uuid = getUUID();
} else {
_return.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
void ExtensionManagerHandler::query(extensions::ExtensionResponse& _return,
const std::string& sql) {
QueryData qd;
auto s = ExtensionManagerInterface::query(sql, qd);
for (auto& row : qd) {
_return.response.emplace_back(std::move(row));
}
_return.status.message = s.getMessage();
if (s.ok()) {
_return.status.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.status.uuid = getUUID();
} else {
_return.status.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
void ExtensionManagerHandler::getQueryColumns(
extensions::ExtensionResponse& _return, const std::string& sql) {
QueryData qd;
auto s = ExtensionManagerInterface::getQueryColumns(sql, qd);
for (auto& row : qd) {
_return.response.emplace_back(std::move(row));
}
_return.status.message = s.getMessage();
if (s.ok()) {
_return.status.code = (int)extensions::ExtensionCode::EXT_SUCCESS;
_return.status.uuid = getUUID();
} else {
_return.status.code = (int)extensions::ExtensionCode::EXT_FAILED;
}
}
ExtensionRunnerInterface::~ExtensionRunnerInterface() {
removePath(path_);
};
ExtensionRunnerInterface::ExtensionRunnerInterface()
: server_{std::make_unique<ImplExtensionRunner>()} {}
void ExtensionRunnerInterface::serve() {
// Start the Thrift server's run loop.
server_->server->serve();
}
void ExtensionRunnerInterface::connect() {
server_->transport = std::make_shared<TPlatformServerSocket>(path_);
// Construct the service's transport, protocol, thread pool.
auto transport_fac = TTransportFactoryRef(new TBufferedTransportFactory());
auto protocol_fac = TProtocolFactoryRef(new TBinaryProtocolFactory());
auto transport_fac = std::make_shared<TBufferedTransportFactory>();
auto protocol_fac = std::make_shared<TBinaryProtocolFactory>();
server->server_ = TThreadedServerRef(new TThreadedServer(
server->processor_, server->transport_, transport_fac, protocol_fac));
server_->server = std::make_shared<TThreadedServer>(
server_->processor, server_->transport, transport_fac, protocol_fac);
}
void ExtensionRunnerImpl::init(RouteUUID uuid) {
void ExtensionRunnerInterface::init(RouteUUID uuid, bool manager) {
manager_ = manager;
// Create the thrift instances.
auto handler = ExtensionHandlerRef(new ExtensionHandler(uuid));
server->processor_ = TProcessorRef(new ExtensionProcessor(handler));
}
void ExtensionRunnerImpl::initManager() {
// Create the thrift instances.
auto handler = ExtensionManagerHandlerRef(new ExtensionManagerHandler());
server->processor_ = TProcessorRef(new ExtensionManagerProcessor(handler));
}
void ExtensionRunnerImpl::stopServer() {
// In most cases the service thread has started before the stop request.
if (server->server_ != nullptr) {
server->server_->stop();
if (!manager_) {
auto handler = std::make_shared<ExtensionHandler>(uuid);
server_->processor =
std::make_shared<extensions::ExtensionProcessor>(handler);
} else {
auto handler = std::make_shared<ExtensionManagerHandler>();
server_->processor =
std::make_shared<extensions::ExtensionManagerProcessor>(handler);
}
}
void ExtensionRunnerImpl::stopServerManager() {
if (server->server_ != nullptr) {
void ExtensionRunnerInterface::stopServer() {
// In most cases the service thread has started before the stop request.
if (server_->server != nullptr) {
server_->server->stop();
}
}
void ExtensionRunnerInterface::stopServerManager() {
if (server_->server != nullptr) {
removeStalePaths(path_);
}
}
EXInternal::EXInternal(const std::string& path)
: path_(path), server{std::make_unique<ImpExtensionManagerServer>()} {
server->socket_ = std::make_shared<TPlatformSocket>(path);
server->transport_ = std::make_shared<TBufferedTransport>(server->socket_);
server->protocol_ = std::make_shared<TBinaryProtocol>(server->transport_);
void ExtensionClientCore::init(const std::string& path, bool manager) {
path_ = path;
manager_ = manager;
client_ = std::make_unique<ImplExtensionClient>();
client_->socket = std::make_shared<TPlatformSocket>(path);
client_->transport = std::make_shared<TBufferedTransport>(client_->socket);
auto protocol = std::make_shared<TBinaryProtocol>(client_->transport);
if (!manager_) {
client_->e = std::make_shared<extensions::ExtensionClient>(protocol);
} else {
client_->em =
std::make_shared<extensions::ExtensionManagerClient>(protocol);
}
(void)client_->transport->open();
}
EXInternal::~EXInternal() {
ExtensionClientCore::~ExtensionClientCore() {
try {
server->transport_->close();
client_->transport->close();
} catch (const std::exception& /* e */) {
// The transport/socket may have exited.
}
}
void EXInternal::setTimeouts(size_t timeouts) {
void ExtensionClientCore::setTimeouts(size_t timeouts) {
#if !defined(WIN32)
// Windows TPipe does not support timeouts.
server->socket_->setRecvTimeout(timeouts);
server->socket_->setSendTimeout(timeouts);
client_->socket->setRecvTimeout(timeouts);
client_->socket->setSendTimeout(timeouts);
#endif
}
EXClient::EXClient(const std::string& path, size_t timeout) : EXInternal(path) {
bool ExtensionClientCore::manager() {
return manager_;
}
ExtensionClient::ExtensionClient(const std::string& path, size_t timeout) {
init(path, false);
setTimeouts(timeout);
client_ = std::make_shared<_Client>(server->protocol_);
(void)server->transport_->open();
}
EXManagerClient::EXManagerClient(const std::string& manager_path,
size_t timeout)
: EXInternal(manager_path) {
ExtensionManagerClient::ExtensionManagerClient(const std::string& path,
size_t timeout) {
init(path, true);
setTimeouts(timeout);
client_ = std::make_shared<_ManagerClient>(server->protocol_);
(void)server->transport_->open();
}
const std::shared_ptr<_Client>& EXClient::get() const {
return client_;
Status ExtensionClient::ping() {
extensions::ExtensionStatus status;
auto client = manager() ? client_->em : client_->e;
client->ping(status);
if (status.code != (int)extensions::ExtensionCode::EXT_FAILED) {
return Status(0, status.message);
}
return Status(1);
}
const std::shared_ptr<_ManagerClient>& EXManagerClient::get() const {
return client_;
Status ExtensionClient::call(const std::string& registry,
const std::string& item,
const PluginRequest& request,
PluginResponse& response) {
extensions::ExtensionResponse er;
auto client = manager() ? client_->em : client_->e;
client->call(er, registry, item, request);
for (const auto& r : er.response) {
response.push_back(r);
}
return Status(er.status.code, er.status.message);
}
} // namespace osquery
void ExtensionClient::shutdown() {
auto client = manager() ? client_->em : client_->e;
client->shutdown();
}
ExtensionList ExtensionManagerClient::extensions() {
ExtensionList el;
extensions::InternalExtensionList iel;
client_->em->extensions(iel);
for (const auto& extension : iel) {
auto& ext = el[extension.first];
ext.min_sdk_version = extension.second.min_sdk_version;
ext.version = extension.second.version;
ext.sdk_version = extension.second.sdk_version;
ext.name = extension.second.name;
}
return el;
}
OptionList ExtensionManagerClient::options() {
OptionList ol;
extensions::InternalOptionList iol;
client_->em->options(iol);
for (const auto& option : iol) {
auto& opt = option.second;
ol[option.first] = {opt.value, opt.default_value, opt.type};
}
return ol;
}
Status ExtensionManagerClient::registerExtension(
const ExtensionInfo& info,
const extensions::ExtensionRegistry& registry,
RouteUUID& uuid) {
extensions::InternalExtensionInfo iei;
iei.name = info.name;
iei.version = info.version;
iei.sdk_version = info.sdk_version;
iei.min_sdk_version = info.min_sdk_version;
extensions::ExtensionStatus status;
client_->em->registerExtension(status, iei, registry);
uuid = status.uuid;
return Status(status.code, status.message);
}
Status ExtensionManagerClient::query(const std::string& sql, QueryData& qd) {
extensions::ExtensionResponse er;
client_->em->query(er, sql);
for (const auto& row : er.response) {
qd.push_back(row);
}
return Status();
}
Status ExtensionManagerClient::getQueryColumns(const std::string& sql,
QueryData& qd) {
extensions::ExtensionResponse er;
client_->em->getQueryColumns(er, sql);
for (const auto& row : er.response) {
qd.push_back(row);
}
return Status(er.status.code, er.status.message);
}
Status ExtensionManagerClient::deregisterExtension(RouteUUID uuid) {
extensions::ExtensionStatus status;
client_->em->deregisterExtension(status, uuid);
return Status(status.code, status.message);
}
ExtensionClient::~ExtensionClient() {}
ExtensionManagerClient::~ExtensionManagerClient() {}
} // namespace osquery

View File

@ -20,27 +20,24 @@
#include "osquery/extensions/interface.h"
using namespace osquery::extensions;
using chrono_clock = std::chrono::high_resolution_clock;
namespace osquery {
namespace extensions {
const std::vector<std::string> kSDKVersionChanges = {
{"1.7.7"},
};
void ExtensionHandler::ping(ExtensionStatus& _return) {
_return.code = (int)ExtensionCode::EXT_SUCCESS;
_return.message = "pong";
_return.uuid = uuid_;
Status ExtensionInterface::ping() {
// Need to translate return code into 0 and extract the UUID.
assert(uuid_ < INT_MAX);
return Status(static_cast<int>(uuid_), "pong");
}
void ExtensionHandler::call(ExtensionResponse& _return,
const std::string& registry,
const std::string& item,
const ExtensionPluginRequest& request) {
Status ExtensionInterface::call(const std::string& registry,
const std::string& item,
const PluginRequest& request,
PluginResponse& response) {
// Call will receive an extension or core's request to call the other's
// internal registry call. It is the ONLY actor that resolves registry
// item aliases.
@ -50,74 +47,41 @@ void ExtensionHandler::call(ExtensionResponse& _return,
local_item = RegistryFactory::get().getActive(registry);
}
PluginResponse response;
PluginRequest plugin_request;
for (const auto& request_item : request) {
// Create a PluginRequest from an ExtensionPluginRequest.
plugin_request[request_item.first] = request_item.second;
}
auto status =
RegistryFactory::call(registry, local_item, plugin_request, response);
_return.status.code = status.getCode();
_return.status.message = status.getMessage();
_return.status.uuid = uuid_;
if (status.ok()) {
for (const auto& response_item : response) {
// Translate a PluginResponse to an ExtensionPluginResponse.
_return.response.push_back(response_item);
}
}
return RegistryFactory::call(registry, local_item, request, response);
}
void ExtensionHandler::shutdown() {
void ExtensionInterface::shutdown() {
// Request a graceful shutdown of the Thrift listener.
VLOG(1) << "Extension " << uuid_ << " requested shutdown";
Initializer::requestShutdown(EXIT_SUCCESS);
}
/**
* @brief Updates the Thrift server output to be VLOG
*
* On Windows, the thrift server will output to stdout, which displays
* messages to the user on exiting the client. This function is used
* instead of the default output for thrift.
*
* @param msg The text to be logged
*/
void thriftLoggingOutput(const char* msg) {
VLOG(1) << "Thrift message: " << msg;
}
ExtensionManagerHandler::ExtensionManagerHandler() {
apache::thrift::GlobalOutput.setOutputFunction(thriftLoggingOutput);
}
void ExtensionManagerHandler::extensions(InternalExtensionList& _return) {
ExtensionList ExtensionManagerInterface::extensions() {
refresh();
ReadLock lock(extensions_mutex_);
_return = extensions_;
return extensions_;
}
void ExtensionManagerHandler::options(InternalOptionList& _return) {
OptionList ExtensionManagerInterface::options() {
OptionList options;
auto flags = Flag::flags();
for (const auto& flag : flags) {
_return[flag.first].value = flag.second.value;
_return[flag.first].default_value = flag.second.default_value;
_return[flag.first].type = flag.second.type;
options[flag.first].value = flag.second.value;
options[flag.first].default_value = flag.second.default_value;
options[flag.first].type = flag.second.type;
}
return options;
}
void ExtensionManagerHandler::registerExtension(
ExtensionStatus& _return,
const InternalExtensionInfo& info,
const ExtensionRegistry& registry) {
Status ExtensionManagerInterface::registerExtension(
const ExtensionInfo& info,
const ExtensionRegistry& registry,
RouteUUID& uuid) {
if (exists(info.name)) {
LOG(WARNING) << "Refusing to register duplicate extension " << info.name;
_return.code = (int)ExtensionCode::EXT_FAILED;
_return.message = "Duplicate extension registered";
return;
return Status((int)ExtensionCode::EXT_FAILED,
"Duplicate extension registered");
}
// Enforce API change requirements.
@ -125,9 +89,8 @@ void ExtensionManagerHandler::registerExtension(
if (!versionAtLeast(change, info.sdk_version)) {
LOG(WARNING) << "Could not add extension " << info.name
<< ": incompatible extension SDK " << info.sdk_version;
_return.code = (int)ExtensionCode::EXT_FAILED;
_return.message = "Incompatible extension SDK version";
return;
return Status((int)ExtensionCode::EXT_FAILED,
"Incompatible extension SDK version");
}
}
@ -137,7 +100,7 @@ void ExtensionManagerHandler::registerExtension(
chrono_clock::now().time_since_epoch().count()));
}
// Every call to registerExtension is assigned a new RouteUUID.
RouteUUID uuid = static_cast<uint16_t>(rand());
uuid = static_cast<uint16_t>(rand());
VLOG(1) << "Registering extension (" << info.name << ", " << uuid
<< ", version=" << info.version << ", sdk=" << info.sdk_version
<< ")";
@ -146,27 +109,24 @@ void ExtensionManagerHandler::registerExtension(
if (!status.ok()) {
LOG(WARNING) << "Could not add extension " << info.name << ": "
<< status.getMessage();
_return.code = (int)ExtensionCode::EXT_FAILED;
_return.message = "Failed adding registry: " + status.getMessage();
return;
return Status((int)ExtensionCode::EXT_FAILED,
"Failed adding registry: " + status.getMessage());
}
WriteLock lock(extensions_mutex_);
extensions_[uuid] = info;
_return.code = (int)ExtensionCode::EXT_SUCCESS;
_return.message = "OK";
_return.uuid = uuid;
return Status();
}
void ExtensionManagerHandler::deregisterExtension(
ExtensionStatus& _return, const ExtensionRouteUUID uuid) {
Status ExtensionManagerInterface::query(const std::string& sql, QueryData& qd) {
return osquery::query(sql, qd);
}
Status ExtensionManagerInterface::deregisterExtension(RouteUUID uuid) {
{
ReadLock lock(extensions_mutex_);
if (extensions_.count(uuid) == 0) {
_return.code = (int)ExtensionCode::EXT_FAILED;
_return.message = "No extension UUID registered";
_return.uuid = 0;
return;
return Status((int)ExtensionCode::EXT_FAILED, "No extension UUID found");
}
}
@ -175,42 +135,22 @@ void ExtensionManagerHandler::deregisterExtension(
WriteLock lock(extensions_mutex_);
extensions_.erase(uuid);
_return.code = (int)ExtensionCode::EXT_SUCCESS;
_return.uuid = uuid;
return Status();
}
void ExtensionManagerHandler::query(ExtensionResponse& _return,
const std::string& sql) {
QueryData results;
auto status = osquery::query(sql, results);
_return.status.code = status.getCode();
_return.status.message = status.getMessage();
_return.status.uuid = uuid_;
if (status.ok()) {
for (const auto& row : results) {
_return.response.push_back(row);
}
}
}
void ExtensionManagerHandler::getQueryColumns(ExtensionResponse& _return,
const std::string& sql) {
Status ExtensionManagerInterface::getQueryColumns(const std::string& sql,
QueryData& qd) {
TableColumns columns;
auto status = osquery::getQueryColumns(sql, columns);
_return.status.code = status.getCode();
_return.status.message = status.getMessage();
_return.status.uuid = uuid_;
if (status.ok()) {
for (const auto& col : columns) {
_return.response.push_back(
{{std::get<0>(col), columnTypeName(std::get<1>(col))}});
qd.push_back({{std::get<0>(col), columnTypeName(std::get<1>(col))}});
}
}
return status;
}
void ExtensionManagerHandler::refresh() {
void ExtensionManagerInterface::refresh() {
std::vector<RouteUUID> removed_routes;
const auto uuids = RegistryFactory::get().routeUUIDs();
@ -228,7 +168,7 @@ void ExtensionManagerHandler::refresh() {
}
}
bool ExtensionManagerHandler::exists(const std::string& name) {
bool ExtensionManagerInterface::exists(const std::string& name) {
refresh();
// Search the remaining extension list for duplicates.
@ -240,7 +180,6 @@ bool ExtensionManagerHandler::exists(const std::string& name) {
}
return false;
}
} // namespace extensions
void removeStalePaths(const std::string& manager) {
std::vector<std::string> paths;
@ -254,7 +193,7 @@ void removeStalePaths(const std::string& manager) {
ExtensionRunnerCore::~ExtensionRunnerCore() = default;
ExtensionRunnerCore::ExtensionRunnerCore(const std::string& path)
: InternalRunnable("ExtensionRunnerCore"), ExtensionRunnerImpl() {
: InternalRunnable("ExtensionRunnerCore"), ExtensionRunnerInterface() {
path_ = path;
}
@ -319,16 +258,14 @@ ExtensionManagerRunner::~ExtensionManagerRunner() {
}
void ExtensionManagerRunner::start() {
initManager();
init(0, true);
VLOG(1) << "Extension manager service starting: " << path_;
try {
startServer(/*processor*/);
startServer();
} catch (const std::exception& e) {
LOG(WARNING) << "Extensions disabled: cannot start extension manager ("
<< path_ << ") (" << e.what() << ")";
}
}
} // namespace osquery

View File

@ -12,59 +12,71 @@
#include <osquery/dispatcher.h>
#include <osquery/extensions.h>
#ifdef WIN32
#pragma warning(push, 3)
/*
* MSVC complains that ExtensionManagerHandler inherits the call() function from
* ExtensionHandler via dominance. This is because ExtensionManagerHandler
* implements ExtensionManagerIf and ExtensionHandler who both implement
* ExtensionIf. ExtensionIf declares a virtual call() function that
* ExtensionHandler defines. This _shouldn't_ cause any issues.
*/
#pragma warning(disable : 4250)
#endif
// Include intermediate Thrift-generated interface definitions.
#include "Extension.h"
#include "ExtensionManager.h"
#ifdef FBTHRIFT
#define API_PING sync_ping
#define API_CALL sync_call
#define API_QUERY sync_query
#define API_COLUMNS sync_getQueryColumns
#define API_REGISTER sync_registerExtension
#define API_OPTIONS sync_options
#define API_EXTENSIONS sync_extensions
#define API_SHUTDOWN sync_shutdown
#else
#define API_PING ping
#define API_CALL call
#define API_QUERY query
#define API_COLUMNS getQueryColumns
#define API_REGISTER registerExtension
#define API_OPTIONS options
#define API_EXTENSIONS extensions
#define API_SHUTDOWN shutdown
#endif
#include <osquery/query.h>
namespace osquery {
namespace extensions {
#ifdef FBTHRIFT
using namespace cpp2;
using _ExtensionIf = ExtensionSvIf;
using _ExtensionManagerIf = ExtensionManagerSvIf;
using _Client = extensions::cpp2::ExtensionAsyncClient;
using _ManagerClient = extensions::cpp2::ExtensionManagerAsyncClient;
#else
using _ExtensionIf = ExtensionIf;
using _ExtensionManagerIf = ExtensionManagerIf;
using _Client = extensions::ExtensionClient;
using _ManagerClient = extensions::ExtensionManagerClient;
#endif
/**
* An option is a 'basic' flag, the only important information is value.
*/
struct Option {
/// Current flag value.
std::string value;
/// Initial flag value.
std::string default_value;
/// String representation of type (unused).
std::string type;
};
/// This is replicated from the Thrift IDL.
enum class ExtensionCode {
EXT_SUCCESS = 0,
EXT_FAILED = 1,
EXT_FATAL = 2,
};
using OptionList = std::map<std::string, Option>;
using ExtensionRouteTable = std::map<std::string, PluginResponse>;
using ExtensionRegistry = std::map<std::string, ExtensionRouteTable>;
/**
* @brief The basic API functions that our Thrift server and client implements.
*
* We include this abstract to force the server (interface) and clients to
* include the required APIs.
*
* For each interface, a child must implement the actual Thrift endpoints and
* call the methods included here, which contain the logic. This is a little
* bit of overhead that was already a sunk cost for osquery-- meaning we
* were already translating Thrift structures to library structures.
*/
class ExtensionAPI {
public:
virtual ~ExtensionAPI() = default;
public:
virtual Status ping() = 0;
virtual Status call(const std::string& registry,
const std::string& item,
const PluginRequest& request,
PluginResponse& response) = 0;
virtual void shutdown() = 0;
};
class ExtensionManagerAPI {
public:
virtual ~ExtensionManagerAPI() = default;
public:
virtual ExtensionList extensions() = 0;
virtual OptionList options() = 0;
virtual Status registerExtension(const ExtensionInfo& info,
const ExtensionRegistry& registry,
RouteUUID& uuid) = 0;
virtual Status deregisterExtension(RouteUUID uuid) = 0;
virtual Status query(const std::string& sql, QueryData& qd) = 0;
virtual Status getQueryColumns(const std::string& sql, QueryData& qd) = 0;
};
/**
* @brief The Thrift API server used by an osquery Extension process.
@ -74,28 +86,17 @@ using _ManagerClient = extensions::ExtensionManagerClient;
* It implements all the Extension API handlers.
*
*/
class ExtensionHandler : virtual public _ExtensionIf {
class ExtensionInterface : public ExtensionAPI {
public:
ExtensionHandler() : uuid_(0) {}
explicit ExtensionHandler(RouteUUID uuid) : uuid_(uuid) {}
ExtensionInterface() : ExtensionInterface(0) {}
explicit ExtensionInterface(RouteUUID uuid) : uuid_(uuid) {}
/// Ping an Extension for status and metrics.
void ping(ExtensionStatus& _return) override;
/**
* @brief The Thrift API used by Registry::call for an extension route.
*
* @param _return The return response (combo Status and PluginResponse).
* @param registry The name of the Extension registry.
* @param item The Extension plugin name.
* @param request The plugin request.
*/
void call(ExtensionResponse& _return,
const std::string& registry,
const std::string& item,
const ExtensionPluginRequest& request) override;
/// Request an extension to shutdown.
public:
virtual Status ping() override;
virtual Status call(const std::string& registry,
const std::string& item,
const PluginRequest& request,
PluginResponse& response) override;
virtual void shutdown() override;
protected:
@ -114,13 +115,11 @@ class ExtensionHandler : virtual public _ExtensionIf {
* It implements all the ExtensionManager API handlers.
*
*/
class ExtensionManagerHandler : virtual public _ExtensionManagerIf,
public ExtensionHandler {
class ExtensionManagerInterface : public ExtensionInterface,
public ExtensionManagerAPI {
public:
ExtensionManagerHandler();
/// Return a list of Route UUIDs and extension metadata.
void extensions(InternalExtensionList& _return) override;
virtual ExtensionList extensions() override;
/**
* @brief Return a map of osquery options (Flags, bootstrap CLI flags).
@ -135,7 +134,7 @@ class ExtensionManagerHandler : virtual public _ExtensionManagerIf,
* of the current options. The best example is the `config_plugin` bootstrap
* flag.
*/
void options(InternalOptionList& _return) override;
virtual OptionList options() override;
/**
* @brief Request a Route UUID and advertise a set of Registry routes.
@ -146,13 +145,13 @@ class ExtensionManagerHandler : virtual public _ExtensionManagerIf,
* compatibility checks. On success the Extension is returned a Route UUID and
* begins to serve the ExtensionHandler Thrift API.
*
* @param _return The output Status and optional assigned RouteUUID.
* @return The output Status and optional assigned RouteUUID.
* @param info The osquery Thrift-internal Extension metadata container.
* @param registry The Extension's Registry::getBroadcast information.
*/
void registerExtension(ExtensionStatus& _return,
const InternalExtensionInfo& info,
const ExtensionRegistry& registry) override;
virtual Status registerExtension(const ExtensionInfo& info,
const ExtensionRegistry& registry,
RouteUUID& uuid) override;
/**
* @brief Request an Extension removal and removal of Registry routes.
@ -165,8 +164,7 @@ class ExtensionManagerHandler : virtual public _ExtensionManagerIf,
* @param _return The output Status.
* @param uuid The assigned Route UUID to deregister.
*/
void deregisterExtension(ExtensionStatus& _return,
const ExtensionRouteUUID uuid) override;
virtual Status deregisterExtension(RouteUUID uuid) override;
/**
* @brief Execute an SQL statement in osquery core.
@ -178,7 +176,7 @@ class ExtensionManagerHandler : virtual public _ExtensionManagerIf,
* @param _return The output Status and QueryData (as response).
* @param sql The sql statement.
*/
void query(ExtensionResponse& _return, const std::string& sql) override;
virtual Status query(const std::string& sql, QueryData& qd) override;
/**
* @brief Get SQL column information for SQL statements in osquery core.
@ -190,12 +188,8 @@ class ExtensionManagerHandler : virtual public _ExtensionManagerIf,
* @param _return The output Status and TableColumns (as response).
* @param sql The sql statement.
*/
void getQueryColumns(ExtensionResponse& _return,
const std::string& sql) override;
protected:
/// A shutdown request does not apply to ExtensionManagers.
void shutdown() override {}
virtual Status getQueryColumns(const std::string& sql,
QueryData& qd) override;
private:
/// Check if an extension exists by the name it registered.
@ -206,32 +200,37 @@ class ExtensionManagerHandler : virtual public _ExtensionManagerIf,
void refresh();
/// Maintain a map of extension UUID to metadata for tracking deregistration.
InternalExtensionList extensions_;
ExtensionList extensions_;
/// Mutex for extensions accessors.
Mutex extensions_mutex_;
};
}
struct ImpExtensionRunner;
struct ImpExtensionManagerServer;
struct ImplExtensionRunner;
struct ImplExtensionClient;
class ExtensionRunnerImpl {
/**
* This implements a small API around setting up and running Thrift
* Servers. The implementation details and members are private and stored in
* the PIMPL structures defined above.
*
* An implementation will exist for Apache Thrift and for FBThrift.
*/
class ExtensionRunnerInterface {
public:
virtual ~ExtensionRunnerImpl();
ExtensionRunnerImpl();
virtual ~ExtensionRunnerInterface();
ExtensionRunnerInterface();
/// Call serve.
/**
* Call into the Thrift server's server implementation.
*/
void serve();
/// Set up structures.
void connect();
/// Create processor.
void init(RouteUUID uuid);
/// Create manager processor.
void initManager();
/// Create handler/processor.
void init(RouteUUID uuid, bool manager = false);
/// Stop server.
void stopServer();
@ -243,15 +242,15 @@ class ExtensionRunnerImpl {
/// The UNIX domain socket used for requests from the ExtensionManager.
std::string path_;
/// Thrift server implementation.
std::unique_ptr<ImpExtensionRunner> server;
/// True if the extension is an extension manager.
bool manager_;
/// Raw socket (optional)
int raw_socket_{0};
/// Thrift server implementation.
std::unique_ptr<ImplExtensionRunner> server_;
};
class ExtensionRunnerCore : public InternalRunnable,
public ExtensionRunnerImpl {
public ExtensionRunnerInterface {
public:
virtual ~ExtensionRunnerCore();
explicit ExtensionRunnerCore(const std::string& path);
@ -311,28 +310,38 @@ class ExtensionManagerRunner : public ExtensionRunnerCore {
};
/// Internal accessor for extension clients.
class EXInternal : private boost::noncopyable {
class ExtensionClientCore : private boost::noncopyable {
public:
explicit EXInternal(const std::string& path);
virtual ~ExtensionClientCore();
// Set the receive and send timeout.
public:
/**
* @brief Initialize the UNIX socket from a string pathname.
*
* A very basic client can just store the string.
* More complex clients can create the client structure.
*/
void init(const std::string& path, bool manager = false);
/// Set the receive and send timeout.
void setTimeouts(size_t timeout);
virtual ~EXInternal();
/// Check if the client is an extension manager.
bool manager();
protected:
/// Raw socket (optional)
int raw_socket_{0};
/// Path to extension server socket.
std::string path_;
/// Thrift server implementation.
std::unique_ptr<ImpExtensionManagerServer> server;
/// True if the client is an extension manager client.
bool manager_;
/// Thrift client implementation.
std::unique_ptr<ImplExtensionClient> client_;
};
/// Internal accessor for a client to an extension (from an extension manager).
class EXClient : public EXInternal {
class ExtensionClient : public ExtensionClientCore, public ExtensionAPI {
public:
/**
* @brief Create a client to a client extension.
@ -342,16 +351,29 @@ class EXClient : public EXInternal {
* @param path This is the socket path for the client communication.
* @param timeout [optional] time in milliseconds to wait for input.
*/
explicit EXClient(const std::string& path, size_t timeout = 5000 * 60);
explicit ExtensionClient(const std::string& path, size_t timeout = 5000 * 60);
~ExtensionClient();
const std::shared_ptr<extensions::_Client>& get() const;
protected:
ExtensionClient() = default;
private:
std::shared_ptr<extensions::_Client> client_;
public:
/// Ping a server and have it fill in the extension's UUID as the code.
Status ping() override;
/// Call an extension's plugin.
Status call(const std::string& registry,
const std::string& item,
const PluginRequest& request,
PluginResponse& response) override;
/// Request that the extension stop.
void shutdown() override;
};
/// Internal accessor for a client to an extension manager (from an extension).
class EXManagerClient : public EXInternal {
class ExtensionManagerClient : public ExtensionClient,
public ExtensionManagerAPI {
public:
/**
* @brief Create a client to a manager extension.
@ -359,19 +381,32 @@ class EXManagerClient : public EXInternal {
* @param path This is the socket path for the manager communication.
* @param timeout [optional] time in milliseconds to wait for input.
*/
explicit EXManagerClient(const std::string& manager_path,
size_t timeout = 5000 * 60);
explicit ExtensionManagerClient(const std::string& path,
size_t timeout = 5000 * 60);
~ExtensionManagerClient();
const std::shared_ptr<extensions::_ManagerClient>& get() const;
public:
/// List all osquery extensions.
ExtensionList extensions() override;
private:
std::shared_ptr<extensions::_ManagerClient> client_;
/// List all osquery options (gflags).
OptionList options() override;
/// Regiester yourself as a new extension.
Status registerExtension(const ExtensionInfo& info,
const ExtensionRegistry& registry,
RouteUUID& uuid) override;
/// Remove an extension.
Status deregisterExtension(RouteUUID uuid) override;
/// Issue a query.
Status query(const std::string& sql, QueryData& qd) override;
/// Get column information from a query.
Status getQueryColumns(const std::string& sql, QueryData& qd) override;
};
/// Attempt to remove all stale extension sockets.
void removeStalePaths(const std::string& manager);
}
#ifdef WIN32
#pragma warning(pop)
#endif
} // namespace osquery

View File

@ -25,8 +25,6 @@
#include "osquery/filesystem/fileops.h"
#include "osquery/tests/test_util.h"
using namespace osquery::extensions;
namespace osquery {
const int kDelay = 20;
@ -62,12 +60,12 @@ class ExtensionsTest : public testing::Test {
bool ping(int attempts = 3) {
// Calling open will except if the socket does not exist.
ExtensionStatus status;
for (int i = 0; i < attempts; ++i) {
try {
EXManagerClient client(socket_path);
client.get()->API_PING(status);
return (status.code == (int)ExtensionCode::EXT_SUCCESS);
ExtensionManagerClient client(socket_path);
auto status = client.ping();
return (status.getCode() == (int)ExtensionCode::EXT_SUCCESS);
} catch (const std::exception& /* e */) {
sleepFor(kDelay);
}
@ -78,21 +76,17 @@ class ExtensionsTest : public testing::Test {
QueryData query(const std::string& sql, int attempts = 3) {
// Calling open will except if the socket does not exist.
ExtensionResponse response;
QueryData qd;
for (int i = 0; i < attempts; ++i) {
try {
EXManagerClient client(socket_path);
client.get()->API_QUERY(response, sql);
ExtensionManagerClient client(socket_path);
client.query(sql, qd);
} catch (const std::exception& /* e */) {
sleepFor(kDelay);
}
}
QueryData qd;
for (const auto& row : response.response) {
qd.push_back(row);
}
return qd;
}
@ -151,9 +145,9 @@ TEST_F(ExtensionsTest, test_extension_start) {
// Now allow duplicates (for testing, since EM/E are the same).
rf.allowDuplicates(true);
status = startExtension(socket_path, "test", "0.1", "0.0.0", "9.9.9");
// This will not be false since we are allowing deplicate items.
// This will not be false since we are allowing duplicate items.
// Otherwise, starting an extension and extensionManager would fatal.
ASSERT_TRUE(status.ok());
ASSERT_NE(status.getCode(), (int)ExtensionCode::EXT_FAILED);
// Checks for version comparisons (also used by packs).
ASSERT_FALSE(versionAtLeast("1.1.1", "0.0.1"));