/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed as defined on the LICENSE file found in the * root directory of this source tree. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace osquery { CREATE_REGISTRY(EventPublisherPlugin, "event_publisher"); CREATE_REGISTRY(EventSubscriberPlugin, "event_subscriber"); /// Checkpoint interval to inspect max event buffering. #define EVENTS_CHECKPOINT 256 FLAG(bool, disable_events, false, "Disable osquery publish/subscribe system"); FLAG(bool, events_optimize, true, "Optimize subscriber select queries (scheduler only)"); // Access this flag through EventSubscriberPlugin::getEventsExpiry to allow for // overriding in subclasses FLAG(uint64, events_expiry, 3600, "Timeout to expire event subscriber results"); // Access this flag through EventSubscriberPlugin::getEventsMax to allow for // overriding in subclasses FLAG(uint64, events_max, 50000, "Maximum number of events per type to buffer"); static inline EventTime timeFromRecord(const std::string& record) { // Convert a stored index "as string bytes" to a time value. return static_cast(tryTo(record).takeOr(0ll)); } static inline std::string toIndex(size_t i) { auto str_index = std::to_string(i); if (str_index.size() < 10) { str_index.insert(str_index.begin(), 10 - str_index.size(), '0'); } return str_index; } static inline void getOptimizeData(EventTime& o_time, size_t& o_eid, std::string& query_name, const std::string& publisher) { // Read the optimization time for the current executing query. getDatabaseValue(kPersistentSettings, kExecutingQuery, query_name); if (query_name.empty()) { o_time = 0; o_eid = 0; return; } { std::string content; getDatabaseValue(kEvents, "optimize." + query_name, content); o_time = timeFromRecord(content); } { std::string content; getDatabaseValue(kEvents, "optimize_eid." + query_name, content); o_eid = tryTo(content).takeOr(std::size_t{0}); } } static inline void setOptimizeData(EventTime time, size_t eid, const std::string& publisher) { // Store the optimization time and eid. std::string query_name; getDatabaseValue(kPersistentSettings, kExecutingQuery, query_name); if (query_name.empty()) { return; } setDatabaseValue(kEvents, "optimize." + query_name, std::to_string(time)); setDatabaseValue(kEvents, "optimize_eid." + query_name, toIndex(eid)); } void EventSubscriberPlugin::genTable(RowYield& yield, QueryContext& context) { // Stop is an unsigned (-1), our end of time equivalent. EventTime start = 0, stop = 0; if (context.constraints["time"].getAll().size() > 0) { // Use the 'time' constraint to optimize backing-store lookups. for (const auto& constraint : context.constraints["time"].getAll()) { EventTime expr = timeFromRecord(constraint.expr); if (constraint.op == EQUALS) { stop = start = expr; break; } else if (constraint.op == GREATER_THAN) { start = std::max(start, expr + 1); } else if (constraint.op == GREATER_THAN_OR_EQUALS) { start = std::max(start, expr); } else if (constraint.op == LESS_THAN) { stop = std::min(stop, expr - 1); } else if (constraint.op == LESS_THAN_OR_EQUALS) { stop = std::min(stop, expr); } } } else if (Initializer::isDaemon() && FLAGS_events_optimize) { // If the daemon is querying a subscriber without a 'time' constraint and // allows optimization, only emit events since the last query. std::string query_name; getOptimizeData(optimize_time_, optimize_eid_, query_name, dbNamespace()); start = optimize_time_; optimize_time_ = getUnixTime() - 1; // Track the queries that have selected data. WriteLock lock(event_query_record_); if (!query_name.empty() && queries_.count(query_name) == 0) { queries_.insert(query_name); } } get(yield, start, stop); } EventContextID EventPublisherPlugin::numEvents() const { return next_ec_id_.load(); } size_t EventPublisherPlugin::numSubscriptions() { ReadLock lock(subscription_lock_); return subscriptions_.size(); } void EventPublisherPlugin::fire(const EventContextRef& ec, EventTime time) { if (isEnding()) { // Cannot emit/fire while ending return; } EventContextID ec_id = 0; ec_id = next_ec_id_.fetch_add(1); // Fill in EventContext ID and time if needed. if (ec != nullptr) { ec->id = ec_id; if (ec->time == 0) { if (time == 0) { time = getUnixTime(); } ec->time = time; } } ReadLock lock(subscription_lock_); for (const auto& subscription : subscriptions_) { auto es = EventFactory::getEventSubscriber(subscription->subscriber_name); if (es != nullptr && es->state() == EventState::EVENT_RUNNING) { fireCallback(subscription, ec); } } } std::vector EventSubscriberPlugin::getIndexes(EventTime start, EventTime stop, bool sort) { auto index_key = "indexes." + dbNamespace(); std::vector indexes; EventTime l_start = (start > 0) ? start / 60 : 0; EventTime r_stop = (stop > 0) ? stop / 60 + 1 : 0; std::string content; getDatabaseValue(kEvents, index_key + ".60", content); if (content.empty()) { return indexes; } std::vector bins, expirations; boost::split(bins, content, boost::is_any_of(",")); for (const auto& bin : bins) { auto step = timeFromRecord(bin); auto step_start = step * 60; auto step_stop = (step + 1) * 60; if (step_stop <= expire_time_) { expirations.push_back(bin); } else if (step_start < expire_time_ && sort) { expireRecords("60", bin, false); } if (step >= l_start && (r_stop == 0 || step < r_stop) && sort) { indexes.push_back("60." + bin); } } // Rewrite the index lists and delete each expired item. if (!expirations.empty()) { expireIndexes("60", bins, expirations); } // Return indexes in binning order. if (sort) { std::sort(indexes.begin(), indexes.end(), [](const std::string& left, const std::string& right) { auto n1 = timeFromRecord(left.substr(left.find('.') + 1)); auto n2 = timeFromRecord(right.substr(right.find('.') + 1)); return n1 < n2; }); } // Update the new time that events expire to now - expiry. return indexes; } void EventSubscriberPlugin::expireRecords(const std::string& list_type, const std::string& index, bool all) { if (!executedAllQueries()) { return; } auto record_key = "records." + dbNamespace(); auto data_key = "data." + dbNamespace(); // If the expirations is not removing all records, rewrite the persisting. std::vector persisting_records; // Request all records within this list-size + bin offset. auto expired_records = getRecords({list_type + '.' + index}, false); if (all && expired_records.size() > 1) { deleteDatabaseRange(kEvents, data_key + '.' + expired_records.begin()->first, data_key + '.' + expired_records.rbegin()->first); } else { for (const auto& record : expired_records) { if (record.second <= expire_time_) { deleteDatabaseValue(kEvents, data_key + '.' + record.first); } else { persisting_records.push_back(record.first + ':' + std::to_string(record.second)); } } } // Either drop or overwrite the record list. if (all) { deleteDatabaseValue(kEvents, record_key + "." + list_type + "." + index); } else if (persisting_records.size() < expired_records.size()) { auto new_records = boost::algorithm::join(persisting_records, ","); setDatabaseValue( kEvents, record_key + "." + list_type + "." + index, new_records); } } void EventSubscriberPlugin::expireIndexes( const std::string& list_type, const std::vector& indexes, const std::vector& expirations) { if (!executedAllQueries()) { return; } auto index_key = "indexes." + dbNamespace(); // Construct a mutable list of persisting indexes to rewrite as records. std::vector persisting_indexes = indexes; // Remove the records using the list of expired indexes. for (const auto& bin : expirations) { expireRecords(list_type, bin, true); persisting_indexes.erase( std::remove(persisting_indexes.begin(), persisting_indexes.end(), bin), persisting_indexes.end()); } // Update the list of indexes with the non-expired indexes. auto new_indexes = boost::algorithm::join(persisting_indexes, ","); setDatabaseValue(kEvents, index_key + "." + list_type, new_indexes); } void EventSubscriberPlugin::expireCheck() { auto data_key = "data." + dbNamespace(); auto eid_key = "eid." + dbNamespace(); // Min key will be the last surviving key. size_t threshold_key = 0; { auto limit = getEventsMax(); std::vector keys; scanDatabaseKeys(kEvents, keys, data_key); if (keys.size() <= limit) { return; } // There is an overflow of events buffered for this subscriber. LOG(WARNING) << "Expiring events for subscriber: " << getName() << " (overflowed limit " << limit << ")"; VLOG(1) << "Subscriber events " << getName() << " exceeded limit " << limit << " by: " << keys.size() - limit; // Inspect the N-FLAGS_events_max -th event's value and expire before the // time within the content. std::string last_key; getDatabaseValue(kEvents, eid_key, last_key); // The EID is the next-index. // EID - events_max is the most last-recent event to keep. threshold_key = boost::lexical_cast(last_key) - getEventsMax(); // Scan each of the keys in keys, if their ID portion is < threshold. // Nix them, this requires lots of conversions, use with care. std::string max_key; std::string min_key; unsigned long min_key_value = 0; unsigned long max_key_value = 0; for (const auto& key : keys) { auto const key_value = tryTo(key.substr(key.rfind('.') + 1), 10) .takeOr(0ul); if (key_value < static_cast(threshold_key)) { min_key_value = (min_key_value == 0 || key_value < min_key_value) ? key_value : min_key_value; if (min_key_value == key_value) { min_key = key; } max_key_value = (key_value > max_key_value) ? key_value : max_key_value; if (max_key_value == key_value) { max_key = key; } } } if (!min_key.empty()) { if (max_key_value == min_key_value) { deleteDatabaseValue(kEvents, min_key); } else { deleteDatabaseRange(kEvents, min_key, max_key); } } } // Convert the key index into a time using the content. // The last-recent event is fetched and the corresponding time is used as // the expiration time for the subscriber. std::string content; getDatabaseValue(kEvents, data_key + "." + toIndex(threshold_key), content); // Decode the value into a row structure to extract the time. Row r; if (!deserializeRowJSON(content, r) || r.count("time") == 0) { return; } // The last time will become the implicit expiration time. size_t last_time = boost::lexical_cast(r.at("time")); if (last_time > 0) { expire_time_ = last_time - (last_time % 60); } // Finally, attempt an index query to trigger expirations. // In this case the result set is not used. getIndexes(expire_time_, 0, false); } bool EventSubscriberPlugin::executedAllQueries() const { ReadLock lock(event_query_record_); return queries_.size() >= query_count_; } std::vector EventSubscriberPlugin::getRecords( const std::vector& indexes, bool optimize) { auto record_key = "records." + dbNamespace(); std::vector records; for (const auto& index : indexes) { std::vector bin_records; { std::string record_value; getDatabaseValue(kEvents, record_key + "." + index, record_value); if (record_value.empty()) { // There are actually no events in this bin, interesting error case. continue; } // Each list is tokenized into a record=event_id:time. bin_records = split(record_value, ","); } // Iterate over every 2 items: EID:TIME. for (const auto& record : bin_records) { const auto vals = split(record, ":"); if (vals.size() != 2) { LOG(WARNING) << "Event records mismatch: " << record << " does not have a matching eid/event_time"; continue; } const auto eid = vals[0]; const EventTime et = timeFromRecord(vals[1]); if (FLAGS_events_optimize && optimize && et <= optimize_time_ + 1) { auto eidr = timeFromRecord(eid); if (eidr <= optimize_eid_) { continue; } } records.push_back(std::make_pair(eid, et)); } } return records; } Status EventSubscriberPlugin::recordEvents( const std::vector& event_id_list, EventTime event_time) { WriteLock lock(event_record_lock_); DatabaseStringValueList database_data; database_data.reserve(2U); // The list key includes the list type (bin size) and the list ID (bin). // The list_id is the MOST-Specific key ID, the bin for this list. // If the event time was 13 and the time_list is 5 seconds, lid = 2. auto list_id = boost::lexical_cast(event_time / 60); std::string time_value = boost::lexical_cast(event_time); // The record is identified by the event type then module name. // Append the record (eid, unix_time) to the list bin. auto database_key = "records." + dbNamespace() + ".60." + list_id; auto index_key = "indexes." + dbNamespace() + ".60"; std::string record_value; getDatabaseValue(kEvents, database_key, record_value); for (const auto& eid : event_id_list) { if (record_value.length() == 0) { // This is a new list_id for list_key, append the ID to the indirect // lookup for this list_key. std::string index_value; getDatabaseValue(kEvents, index_key, index_value); if (index_value.length() == 0) { // A new index. index_value = list_id; } else { index_value += "," + list_id; } database_data.push_back(std::make_pair(index_key, index_value)); record_value = eid + ":" + time_value; } else { // Tokenize a record using ',' and the EID/time using ':'. record_value += "," + eid + ":" + time_value; } } database_data.push_back(std::make_pair(database_key, record_value)); auto status = setDatabaseBatch(kEvents, database_data); if (!status.ok()) { LOG(ERROR) << "Could not put Event Records"; } return status; } size_t EventSubscriberPlugin::getEventsExpiry() { return FLAGS_events_expiry; } size_t EventSubscriberPlugin::getEventsMax() { return FLAGS_events_max; } const std::string EventSubscriberPlugin::getEventID() { Status status; // First get an event ID from the meta key. std::string eid_key = "eid." + dbNamespace(); { WriteLock lock(event_id_lock_); if (last_eid_ == 0) { std::string last_eid_value; status = getDatabaseValue(kEvents, eid_key, last_eid_value); if (!status.ok() || last_eid_value.empty()) { last_eid_value = "0"; } last_eid_ = boost::lexical_cast(last_eid_value); } if (last_eid_ % 10 == 0) { status = setDatabaseValue(kEvents, eid_key, toIndex(last_eid_ + 10)); } last_eid_++; } return toIndex(last_eid_); } void EventSubscriberPlugin::get(RowYield& yield, EventTime start, EventTime stop) { // Get the records for this time range. auto indexes = getIndexes(start, stop); auto records = getRecords(indexes); std::string events_key = "data." + dbNamespace(); std::vector mapped_records; for (const auto& record : records) { if (record.second >= start && (record.second <= stop || stop == 0)) { mapped_records.push_back(events_key + "." + record.first); } } if (FLAGS_events_optimize && !records.empty()) { // If records were returned save the ordered-last as the optimization EID. auto const eidr_exp = tryTo(records.back().first, 10); if (eidr_exp.isValue()) { optimize_eid_ = static_cast(eidr_exp.get()); } } // Select mapped_records using event_ids as keys. std::string data_value; for (const auto& record : mapped_records) { Row r; auto status = getDatabaseValue(kEvents, record, data_value); if (data_value.length() == 0) { // There is no record here, interesting error case. continue; } status = deserializeRowJSON(data_value, r); data_value.clear(); if (status.ok()) { yield(TableRowHolder(new DynamicTableRow(std::move(r)))); } } auto expiry = getEventsExpiry(); if (expiry > 0) { // Make sure the configured expiration is at least the minimum needed to // execute all related queries in the schedule. if (expiry < min_expiration_) { expiry = min_expiration_; } // Set the expire time to NOW - "configured lifetime". // Index retrieval will apply the constraints checking and auto-expire. expire_time_ = getUnixTime() - expiry; expire_time_ = expire_time_ - (expire_time_ % 60); } if (FLAGS_events_optimize) { setOptimizeData(optimize_time_, optimize_eid_, dbNamespace()); } } Status EventSubscriberPlugin::add(const Row& r) { std::vector batch = {r}; return addBatch(batch, getUnixTime()); } Status EventSubscriberPlugin::addBatch(std::vector& row_list) { return addBatch(row_list, getUnixTime()); } Status EventSubscriberPlugin::addBatch(std::vector& row_list, EventTime custom_event_time) { DatabaseStringValueList database_data; database_data.reserve(row_list.size()); std::vector event_id_list; event_id_list.reserve(row_list.size()); auto event_time = custom_event_time != 0 ? custom_event_time : getUnixTime(); auto event_time_str = std::to_string(event_time); for (auto& row : row_list) { row["time"] = event_time_str; row["eid"] = getEventID(); // Serialize and store the row data, for query-time retrieval. std::string serialized_row; auto status = serializeRowJSON(row, serialized_row); if (!status.ok()) { VLOG(1) << status.getMessage(); continue; } // Then remove the newline. if (serialized_row.size() > 0 && serialized_row.back() == '\n') { serialized_row.pop_back(); } // Logger plugins may request events to be forwarded directly. // If no active logger is marked 'usesLogEvent' then this is a no-op. EventFactory::forwardEvent(serialized_row); // Store the event data in the batch database_data.push_back(std::make_pair( "data." + dbNamespace() + "." + row["eid"], serialized_row)); event_id_list.push_back(std::move(row["eid"])); event_count_++; // Use the last EventID and a checkpoint bucket size to periodically apply // buffer eviction. Eviction occurs if the total count exceeds events_max. if (last_eid_ % EVENTS_CHECKPOINT == 0) { expireCheck(); } } if (database_data.empty()) { return Status(1, "Failed to process the rows"); } // Save the batched data inside the database auto status = setDatabaseBatch(kEvents, database_data); if (!status.ok()) { return status; } return recordEvents(event_id_list, event_time); } EventPublisherRef EventSubscriberPlugin::getPublisher() const { return EventFactory::getEventPublisher(getType()); } void EventSubscriberPlugin::removeSubscriptions() { subscription_count_ = 0; auto publisher = getPublisher(); if (publisher == nullptr) { LOG(WARNING) << "Cannot remove subscriptions from: " << getName(); return; } getPublisher()->removeSubscriptions(getName()); } void EventFactory::delay() { // Caller may disable event publisher threads. if (FLAGS_disable_events) { return; } // Create a thread for each event publisher. auto& ef = EventFactory::getInstance(); for (const auto& publisher : EventFactory::getInstance().event_pubs_) { // Publishers that did not set up correctly are put into an ending state. if (!publisher.second->isEnding()) { auto thread_ = std::make_shared( std::bind(&EventFactory::run, publisher.first)); ef.threads_.push_back(thread_); } } } Status EventPublisherPlugin::addSubscription( const SubscriptionRef& subscription) { // The publisher threads may be running and if they fire events the list of // subscriptions will be walked. WriteLock lock(subscription_lock_); subscriptions_.push_back(subscription); return Status(0); } void EventPublisherPlugin::removeSubscriptions(const std::string& subscriber) { // See addSubscription for details on the critical section. WriteLock lock(subscription_lock_); auto end = std::remove_if(subscriptions_.begin(), subscriptions_.end(), [&subscriber](const SubscriptionRef& subscription) { return (subscription->subscriber_name == subscriber); }); subscriptions_.erase(end, subscriptions_.end()); } void EventFactory::addForwarder(const std::string& logger) { getInstance().loggers_.push_back(logger); } void EventFactory::forwardEvent(const std::string& event) { for (const auto& logger : getInstance().loggers_) { Registry::call("logger", logger, {{"event", event}}); } } void EventFactory::configUpdate() { // Scan the schedule for queries that touch "_events" tables. // We will count the queries std::map subscriber_details; Config::get().scheduledQueries( [&subscriber_details](std::string name, const ScheduledQuery& query) { std::vector tables; // Convert query string into a list of virtual tables effected. if (!getQueryTables(query.query, tables)) { VLOG(1) << "Cannot get tables from query: " << name; return; } // Remove duplicates and select only the subscriber tables. std::set subscribers; for (const auto& table : tables) { if (Registry::get().exists("event_subscriber", table)) { subscribers.insert(table); } } for (const auto& subscriber : subscribers) { auto& details = subscriber_details[subscriber]; details.max_interval = (query.interval > details.max_interval) ? query.interval : details.max_interval; details.query_count++; } }); auto& ef = EventFactory::getInstance(); for (const auto& details : subscriber_details) { if (!ef.exists(details.first)) { continue; } WriteLock lock(ef.factory_lock_); auto subscriber = ef.getEventSubscriber(details.first); subscriber->min_expiration_ = details.second.max_interval * 3; subscriber->min_expiration_ += (60 - (subscriber->min_expiration_ % 60)); // Emit a warning for each subscriber affected by the small expiration. auto expiry = subscriber->getEventsExpiry(); if (expiry > 0 && subscriber->min_expiration_ > expiry) { LOG(INFO) << "Subscriber expiration is too low: " << subscriber->getName(); } subscriber->query_count_ = details.second.query_count; WriteLock subscriber_lock(subscriber->event_query_record_); subscriber->queries_.clear(); } // If events are enabled configure the subscribers before publishers. if (!FLAGS_disable_events) { RegistryFactory::get().registry("event_subscriber")->configure(); RegistryFactory::get().registry("event_publisher")->configure(); } } Status EventFactory::run(const std::string& type_id) { if (FLAGS_disable_events) { return Status(0, "Events disabled"); } // An interesting take on an event dispatched entrypoint. // There is little introspection into the event type. // Assume it can either make use of an entrypoint poller/selector or // take care of async callback registrations in setUp/configure/run // only once and handle event queuing/firing in callbacks. EventPublisherRef publisher = nullptr; { auto& ef = EventFactory::getInstance(); WriteLock lock(ef.factory_lock_); publisher = ef.getEventPublisher(type_id); } if (publisher == nullptr) { return Status(1, "Event publisher is missing"); } else if (publisher->hasStarted()) { return Status(1, "Cannot restart an event publisher"); } setThreadName(publisher->name()); VLOG(1) << "Starting event publisher run loop: " + type_id; publisher->hasStarted(true); auto status = Status(0, "OK"); while (!publisher->isEnding()) { // Can optionally implement a global cooloff latency here. status = publisher->run(); if (!status.ok()) { break; } publisher->restart_count_++; // This is a 'default' cool-off implemented in InterruptableRunnable. // If a publisher fails to perform some sort of interruption point, this // prevents the thread from thrashing through exiting checks. publisher->pause(std::chrono::milliseconds(200)); } if (!status.ok()) { // The runloop status is not reflective of the event type's. VLOG(1) << "Event publisher " << publisher->type() << " run loop terminated for reason: " << status.getMessage(); // Publishers auto tear down when their run loop stops. } publisher->tearDown(); publisher->state(EventState::EVENT_NONE); // Do not remove the publisher from the event factory. // If the event factory's `end` method was called these publishers will be // cleaned up after their thread context is removed; otherwise, a removed // thread context and failed publisher will remain available for stats. return Status(0, "OK"); } // There's no reason for the event factory to keep multiple instances. EventFactory& EventFactory::getInstance() { static EventFactory ef; return ef; } Status EventFactory::registerEventPublisher(const PluginRef& pub) { // Try to downcast the plugin to an event publisher. EventPublisherRef specialized_pub; try { auto base_pub = std::dynamic_pointer_cast(pub); specialized_pub = std::static_pointer_cast(base_pub); } catch (const std::bad_cast& /* e */) { return Status(1, "Incorrect plugin"); } if (specialized_pub == nullptr || specialized_pub.get() == nullptr) { return Status(0, "Invalid publisher"); } auto type_id = specialized_pub->type(); if (type_id.empty()) { // This subscriber did not override its name. return Status(1, "Publishers must have a type"); } auto& ef = EventFactory::getInstance(); { WriteLock lock(getInstance().factory_lock_); if (ef.event_pubs_.count(type_id) != 0) { // This is a duplicate event publisher. return Status(1, "Duplicate publisher type"); } ef.event_pubs_[type_id] = specialized_pub; } // Do not set up event publisher if events are disabled. if (!FLAGS_disable_events) { if (specialized_pub->state() != EventState::EVENT_NONE) { specialized_pub->tearDown(); } auto status = specialized_pub->setUp(); specialized_pub->state(EventState::EVENT_SETUP); if (!status.ok()) { // Only start event loop if setUp succeeds. LOG(INFO) << "Event publisher not enabled: " << type_id << ": " << status.what(); specialized_pub->isEnding(true); return status; } } return Status(0, "OK"); } Status EventFactory::registerEventSubscriber(const PluginRef& sub) { // Try to downcast the plugin to an event subscriber. EventSubscriberRef specialized_sub; try { auto base_sub = std::dynamic_pointer_cast(sub); specialized_sub = std::static_pointer_cast(base_sub); } catch (const std::bad_cast& /* e */) { return Status(1, "Incorrect plugin"); } if (specialized_sub == nullptr || specialized_sub.get() == nullptr) { return Status(1, "Invalid subscriber"); } // The config may use an "events" key to explicitly enabled or disable // event subscribers. See EventSubscriber::disable. auto name = specialized_sub->getName(); if (name.empty()) { // This subscriber did not override its name. return Status(1, "Subscribers must have set a name"); } auto plugin = Config::get().getParser("events"); if (plugin != nullptr && plugin.get() != nullptr) { const auto& data = plugin->getData().doc(); // First perform explicit enabling. if (data["events"].HasMember("enable_subscribers")) { for (const auto& item : data["events"]["enable_subscribers"].GetArray()) { if (item.GetString() == name) { VLOG(1) << "Enabling event subscriber: " << name; specialized_sub->disabled = false; } } } // Then use explicit disabling as an ultimate override. if (data["events"].HasMember("disable_subscribers")) { for (const auto& item : data["events"]["disable_subscribers"].GetArray()) { if (item.GetString() == name) { VLOG(1) << "Disabling event subscriber: " << name; specialized_sub->disabled = true; } } } } if (specialized_sub->state() != EventState::EVENT_NONE) { specialized_sub->tearDown(); } // Allow subscribers a configure-time setup to determine if they should run. auto status = specialized_sub->setUp(); if (!status) { specialized_sub->disabled = true; } specialized_sub->state(EventState::EVENT_SETUP); // Let the subscriber initialize any Subscriptions. if (!FLAGS_disable_events && !specialized_sub->disabled) { specialized_sub->expireCheck(); status = specialized_sub->init(); specialized_sub->state(EventState::EVENT_RUNNING); } else { specialized_sub->state(EventState::EVENT_PAUSED); } auto& ef = EventFactory::getInstance(); { WriteLock lock(getInstance().factory_lock_); ef.event_subs_[name] = specialized_sub; } // Set state of subscriber. if (!status.ok()) { specialized_sub->state(EventState::EVENT_FAILED); return Status(1, status.getMessage()); } else { return Status(0, "OK"); } } Status EventFactory::addSubscription(const std::string& type_id, const std::string& name_id, const SubscriptionContextRef& mc, EventCallback cb) { auto subscription = Subscription::create(name_id, mc, cb); return EventFactory::addSubscription(type_id, subscription); } Status EventFactory::addSubscription(const std::string& type_id, const SubscriptionRef& subscription) { EventPublisherRef publisher = getInstance().getEventPublisher(type_id); if (publisher == nullptr) { return Status(1, "Unknown event publisher"); } // The event factory is responsible for configuring the event types. return publisher->addSubscription(subscription); } size_t EventFactory::numSubscriptions(const std::string& type_id) { EventPublisherRef publisher; try { publisher = EventFactory::getInstance().getEventPublisher(type_id); } catch (std::out_of_range& /* e */) { return 0; } if (publisher == nullptr) { return 0; } return publisher->numSubscriptions(); } EventPublisherRef EventFactory::getEventPublisher(const std::string& type_id) { if (getInstance().event_pubs_.count(type_id) == 0) { LOG(ERROR) << "Requested unknown/failed event publisher: " + type_id; return nullptr; } return getInstance().event_pubs_.at(type_id); } EventSubscriberRef EventFactory::getEventSubscriber( const std::string& name_id) { if (!exists(name_id)) { LOG(ERROR) << "Requested unknown event subscriber: " + name_id; return nullptr; } return getInstance().event_subs_.at(name_id); } bool EventFactory::exists(const std::string& name_id) { return (getInstance().event_subs_.count(name_id) > 0); } Status EventFactory::deregisterEventPublisher(const EventPublisherRef& pub) { return EventFactory::deregisterEventPublisher(pub->type()); } Status EventFactory::deregisterEventPublisher(const std::string& type_id) { auto& ef = EventFactory::getInstance(); WriteLock lock(ef.factory_lock_); EventPublisherRef publisher = ef.getEventPublisher(type_id); if (publisher == nullptr) { return Status(1, "No event publisher to deregister"); } if (!FLAGS_disable_events) { publisher->isEnding(true); if (!publisher->hasStarted()) { // If a publisher's run loop was not started, call tearDown since // the setUp happened at publisher registration time. publisher->tearDown(); publisher->state(EventState::EVENT_NONE); // If the run loop did run the tear down and erase will happen in the // event thread wrapper when isEnding is next checked. ef.event_pubs_.erase(type_id); } else { publisher->stop(); } } return Status(0, "OK"); } Status EventFactory::deregisterEventSubscriber(const std::string& sub) { auto& ef = EventFactory::getInstance(); WriteLock lock(ef.factory_lock_); if (ef.event_subs_.count(sub) == 0) { return Status(1, "Event subscriber is missing"); } auto& subscriber = ef.event_subs_.at(sub); subscriber->state(EventState::EVENT_NONE); subscriber->tearDown(); ef.event_subs_.erase(sub); return Status(0); } std::vector EventFactory::publisherTypes() { WriteLock lock(getInstance().factory_lock_); std::vector types; for (const auto& publisher : getInstance().event_pubs_) { types.push_back(publisher.first); } return types; } std::vector EventFactory::subscriberNames() { WriteLock lock(getInstance().factory_lock_); std::vector names; for (const auto& subscriber : getInstance().event_subs_) { names.push_back(subscriber.first); } return names; } void EventFactory::end(bool join) { auto& ef = EventFactory::getInstance(); // Call deregister on each publisher. for (const auto& publisher : ef.publisherTypes()) { deregisterEventPublisher(publisher); } // Stop handling exceptions for the publisher threads. for (const auto& thread : ef.threads_) { if (join) { thread->join(); } else { thread->detach(); } } { WriteLock lock(getInstance().factory_lock_); // A small cool off helps OS API event publisher flushing. if (!FLAGS_disable_events) { ef.threads_.clear(); } // Threads may still be executing, when they finish, release publishers. ef.event_pubs_.clear(); ef.event_subs_.clear(); } } void attachEvents() { const auto& publishers = RegistryFactory::get().plugins("event_publisher"); for (const auto& publisher : publishers) { EventFactory::registerEventPublisher(publisher.second); } const auto& subscribers = RegistryFactory::get().plugins("event_subscriber"); for (const auto& subscriber : subscribers) { if (!boost::ends_with(subscriber.first, "_events")) { LOG(ERROR) << "Error registering subscriber: " << subscriber.first << ": Must use a '_events' suffix"; continue; } auto status = EventFactory::registerEventSubscriber(subscriber.second); if (!status.ok()) { VLOG(1) << "Error registering subscriber: " << subscriber.first << ": " << status.getMessage(); } } // Configure the event publishers and subscribers. EventFactory::configUpdate(); } } // namespace osquery