Reduce restart times for event runloops

This commit is contained in:
Teddy Reed 2015-03-24 15:48:53 -07:00
parent 77022f9d52
commit eee5b7d462
5 changed files with 42 additions and 18 deletions

View File

@ -67,7 +67,9 @@ Status writeTextFile(const boost::filesystem::path& path,
int permissions = 0660,
bool force_permissions = false);
/// Check if a path is writable.
Status isWritable(const boost::filesystem::path& path);
/// Check if a path is readable.
Status isReadable(const boost::filesystem::path& path);
/**

View File

@ -384,6 +384,12 @@ Status EventSubscriberPlugin::add(const Row& r, EventTime time) {
}
void EventFactory::delay() {
// Caller may disable event publisher threads.
if (FLAGS_disable_events) {
return;
}
// Create a thread for each event publisher.
auto& ef = EventFactory::getInstance();
for (const auto& publisher : EventFactory::getInstance().event_pubs_) {
auto thread_ = std::make_shared<boost::thread>(
@ -592,6 +598,11 @@ void EventFactory::end(bool join) {
typedef std::shared_ptr<EventPublisherPlugin> EventPublisherPluginRef;
void attachEvents() {
// Caller may disable events, do not setup any publishers or subscribers.
if (FLAGS_disable_events) {
return;
}
const auto& publishers = Registry::all("event_publisher");
for (const auto& publisher : publishers) {
EventFactory::registerEventPublisher(publisher.second);

View File

@ -86,7 +86,7 @@ Status INotifyEventPublisher::run() {
FD_ZERO(&set);
FD_SET(getHandle(), &set);
struct timeval timeout = {0, kINotifyMLatency};
struct timeval timeout = {3, 3000};
int selector = ::select(getHandle() + 1, &set, nullptr, nullptr, &timeout);
if (selector == -1) {
LOG(ERROR) << "Could not read inotify handle";

View File

@ -53,7 +53,7 @@ Status UdevEventPublisher::run() {
FD_ZERO(&set);
FD_SET(fd, &set);
struct timeval timeout = {1, 1000};
struct timeval timeout = {3, 3000};
int selector = ::select(fd + 1, &set, nullptr, nullptr, &timeout);
if (selector == -1) {
LOG(ERROR) << "Could not read udev monitor";

View File

@ -123,6 +123,27 @@ void ExtensionManagerWatcher::watch() {
}
}
inline Status socketWritable(const fs::path& path) {
if (pathExists(path).ok()) {
if (!isWritable(path).ok()) {
return Status(1, "Cannot write extension socket: " + path.string());
}
if (!remove(path).ok()) {
return Status(1, "Cannot remove extension socket: " + path.string());
}
} else {
if (!pathExists(path.parent_path()).ok()) {
return Status(1, "Extension socket directory missing: " + path.string());
}
if (!isWritable(path.parent_path()).ok()) {
return Status(1, "Cannot write extension socket: " + path.string());
}
}
return Status(0, "OK");
}
void loadExtensions() {
// Optionally autoload extensions
auto status = loadExtensions(FLAGS_extensions_autoload);
@ -284,14 +305,9 @@ Status startExtension(const std::string& manager_path,
// Now that the uuid is known, try to clean up stale socket paths.
auto extension_path = getExtensionSocket(ext_status.uuid, manager_path);
if (pathExists(extension_path).ok()) {
if (!isWritable(extension_path).ok()) {
return Status(1, "Cannot write extension socket: " + extension_path);
}
if (!remove(extension_path).ok()) {
return Status(1, "Cannot remove extension socket: " + extension_path);
}
status = socketWritable(extension_path);
if (!status) {
return status;
}
// Set the active config and logger plugins. The core will arbitrate if the
@ -493,14 +509,9 @@ Status startExtensionManager() {
Status startExtensionManager(const std::string& manager_path) {
// Check if the socket location exists.
if (pathExists(manager_path).ok()) {
if (!isWritable(manager_path).ok()) {
return Status(1, "Cannot write extension socket: " + manager_path);
}
if (!remove(manager_path).ok()) {
return Status(1, "Cannot remove extension socket: " + manager_path);
}
auto status = socketWritable(manager_path);
if (!status.ok()) {
return status;
}
auto latency = atoi(FLAGS_extensions_interval.c_str()) * 1000;