Migrate fsevents to events 2.0

This commit is contained in:
Teddy Reed 2014-12-14 22:17:38 -08:00
parent d927495209
commit fbd56663d9
3 changed files with 67 additions and 55 deletions

View File

@ -73,7 +73,7 @@ typedef std::shared_ptr<EventSubscriber<BaseEventPublisher>> EventSubscriberRef;
/// Use a single placeholder for the EventContextRef passed to EventCallback.
using std::placeholders::_1;
using SubscriberCallback = Status(*)(EventContextRef);
using SubscriberCallback = Status (*)(EventContextRef);
typedef std::function<Status(EventContextRef)> EventCallback;
/// An EventPublisher must track every subscription added.
@ -216,7 +216,7 @@ class EventPublisherCore {
virtual EventPublisherID type() { return "publisher"; }
protected:
virtual void fireCallback(const SubscriptionRef sub,
virtual void fireCallback(const SubscriptionRef sub,
const EventContextRef ec) = 0;
/// The EventPublisher will keep track of Subscription%s that contain callins.
@ -269,7 +269,7 @@ class EventPublisherCore {
* (thus event) matches.
*/
template <typename SC, typename EC>
class EventPublisher: public EventPublisherCore {
class EventPublisher : public EventPublisherCore {
public:
typedef typename std::shared_ptr<SC> SCRef;
typedef typename std::shared_ptr<EC> ECRef;
@ -287,6 +287,12 @@ class EventPublisher: public EventPublisherCore {
static ECRef createEventContext() { return std::make_shared<EC>(); }
static SCRef createSubscriptionContext() { return std::make_shared<SC>(); }
template <class PUB>
static EventPublisherID getType() {
auto pub = std::make_shared<PUB>();
return pub->type();
}
protected:
void fireCallback(const SubscriptionRef sub, const EventContextRef ec) {
auto pub_sc = getSubscriptionContext(sub->context);
@ -329,6 +335,13 @@ class EventFactory {
/// Access to the EventFactory instance.
static EventFactory& getInstance();
template <class PUB>
static EventPublisherRef createEventPublisher() {
auto pub = std::make_shared<PUB>();
auto base_pub = reinterpret_cast<EventPublisherRef&>(pub);
return base_pub;
}
/**
* @brief Add an EventPublisher to the factory.
*
@ -337,7 +350,7 @@ class EventFactory {
template <class T>
static Status registerEventPublisher() {
auto pub = std::make_shared<T>();
return registerEventPublisher<T>(pub);
return registerEventPublisher(pub);
}
/**
@ -367,7 +380,7 @@ class EventFactory {
template <class T>
static Status registerEventSubscriber() {
auto sub = std::make_shared<T>();
return EventFactory::registerEventSubscriber(sub);
return registerEventSubscriber(sub);
}
/**
@ -411,10 +424,10 @@ class EventFactory {
/// Add a Subscription by templating the EventPublisher, using a
/// SubscriptionContext.
template <typename T>
template <typename PUB>
static Status addSubscription(const SubscriptionContextRef mc,
EventCallback cb = 0) {
return addSubscription(T::type(), mc, cb);
return addSubscription(BaseEventPublisher::getType<PUB>(), mc, cb);
}
/// Add a Subscription using a caller Subscription instance.
@ -460,9 +473,9 @@ class EventFactory {
public:
/// If a static EventPublisher callback wants to fire
template <typename T>
template <typename PUB>
static void fire(const EventContextRef ec) {
auto event_pub = getEventPublisher(T::type());
auto event_pub = getEventPublisher(BaseEventPublisher::getType<PUB>());
event_pub->fire(ec);
}
@ -681,26 +694,19 @@ class EventSubscriber: public EventSubscriberCore {
*/
virtual void init() {}
protected:
template<class T, typename C>
SCRef createSubscriptionContext() { return PUB::createSubscriptionContext(); }
template <class T, typename C>
void subscribe(Status (T::*entry)(const std::shared_ptr<C>),
const SubscriptionContextRef sc) {
auto self = dynamic_cast<T*>(this);
auto base_entry = reinterpret_cast<Status(T::*)(const EventContextRef)>(entry);
auto base_entry =
reinterpret_cast<Status (T::*)(const EventContextRef)>(entry);
auto cb = std::bind(base_entry, self, _1);
EventFactory::addSubscription(type(), sc, cb);
}
SCRef createSubscriptionContext() {
return PUB::createSubscriptionContext();
}
EventPublisherID type() {
auto pub = new PUB();
EventPublisherID type = pub->type();
delete pub;
return type;
}
EventPublisherID type() { return BaseEventPublisher::getType<PUB>(); }
private:
FRIEND_TEST(EventsTests, test_event_sub);
@ -712,8 +718,8 @@ class EventSubscriber: public EventSubscriberCore {
/// Expose a Plugin-like Registry for EventPublisher instances.
DECLARE_REGISTRY(EventPublishers, std::string, EventPublisherRef);
#define REGISTERED_EVENTPUBLISHERS REGISTRY(EventPublishers)
#define REGISTER_EVENTPUBLISHER(decorator) \
REGISTER(EventPublishers, #decorator, std::make_shared<decorator>());
#define REGISTER_EVENTPUBLISHER(PUB) \
REGISTER(EventPublishers, #PUB, EventFactory::createEventPublisher<PUB>());
/**
* @brief Expose a Plugin-link Registry for EventSubscriber instances.
@ -723,8 +729,8 @@ DECLARE_REGISTRY(EventPublishers, std::string, EventPublisherRef);
*/
DECLARE_REGISTRY(EventSubscribers, std::string, EventSubscriberRef);
#define REGISTERED_EVENTSUBSCRIBERS REGISTRY(EventSubscribers)
#define REGISTER_EVENTSUBSCRIBER(decorator) \
REGISTER(EventSubscribers, #decorator, std::make_shared<decorator>());
#define REGISTER_EVENTSUBSCRIBER(SUB) \
REGISTER(EventSubscribers, #SUB, std::make_shared<SUB>());
namespace osquery {
namespace registries {

View File

@ -56,10 +56,9 @@ typedef std::shared_ptr<FSEventsSubscriptionContext>
* preferred implementation of FSEvents handling.
*
*/
class FSEventsEventPublisher : public EventPublisher {
DECLARE_EVENTPUBLISHER(FSEventsEventPublisher,
FSEventsSubscriptionContext,
FSEventsEventContext)
class FSEventsEventPublisher
: public EventPublisher<FSEventsSubscriptionContext, FSEventsEventContext> {
DECLARE_PUBLISHER("FSEventsEventPublisher");
public:
void configure();

View File

@ -35,14 +35,6 @@ class FSEventsTests : public testing::Test {
temp_thread_ = boost::thread(EventFactory::run, "FSEventsEventPublisher");
}
void SubscriptionAction(uint32_t mask = 0, EventCallback ec = 0) {
auto mc = std::make_shared<FSEventsSubscriptionContext>();
mc->path = kRealTestPath;
mc->mask = mask;
EventFactory::addSubscription("FSEventsEventPublisher", mc, ec);
}
void WaitForStream(int max) {
int delay = 0;
while (delay < max * 1000) {
@ -171,10 +163,9 @@ TEST_F(FSEventsTests, test_fsevents_run) {
EventFactory::end(false);
}
class TestFSEventsEventSubscriber : public EventSubscriber {
DECLARE_EVENTSUBSCRIBER(TestFSEventsEventSubscriber, FSEventsEventPublisher);
DECLARE_CALLBACK(SimpleCallback, FSEventsEventContext);
DECLARE_CALLBACK(Callback, FSEventsEventContext);
class TestFSEventsEventSubscriber
: public EventSubscriber<FSEventsEventPublisher> {
DECLARE_SUBSCRIBER("TestFSEventsEventSubscriber");
public:
void init() { callback_count_ = 0; }
@ -183,6 +174,13 @@ class TestFSEventsEventSubscriber : public EventSubscriber {
return Status(0, "OK");
}
SCRef GetSubscription(uint32_t mask = 0) {
auto sc = createSubscriptionContext();
sc->path = kRealTestPath;
sc->mask = mask;
return sc;
}
Status Callback(const FSEventsEventContextRef ec) {
Row r;
r["action"] = ec->action;
@ -194,10 +192,10 @@ class TestFSEventsEventSubscriber : public EventSubscriber {
return Status(0, "OK");
}
static void WaitForEvents(int max) {
void WaitForEvents(int max) {
int delay = 0;
while (delay < max * 1000) {
if (getInstance()->callback_count_ > 0) {
if (callback_count_ > 0) {
return;
}
::usleep(50);
@ -213,18 +211,23 @@ class TestFSEventsEventSubscriber : public EventSubscriber {
TEST_F(FSEventsTests, test_fsevents_fire_event) {
// Assume event type is registered.
StartEventLoop();
TestFSEventsEventSubscriber::getInstance()->init();
// Simulate registering an event subscriber.
auto sub = std::make_shared<TestFSEventsEventSubscriber>();
sub->init();
// Create a subscriptioning context, note the added Event to the symbol
SubscriptionAction(0, TestFSEventsEventSubscriber::EventSimpleCallback);
auto sc = sub->GetSubscription(0);
sub->subscribe(&TestFSEventsEventSubscriber::SimpleCallback, sc);
// SubscriptionAction(0, TestFSEventsEventSubscriber::EventSimpleCallback);
CreateEvents();
// This time wait for the callback.
TestFSEventsEventSubscriber::WaitForEvents(kMaxEventLatency);
sub->WaitForEvents(kMaxEventLatency);
// Make sure our expected event fired (aka subscription callback was called).
EXPECT_TRUE(TestFSEventsEventSubscriber::getInstance()->callback_count_ > 0);
EXPECT_TRUE(sub->callback_count_ > 0);
// Cause the thread to tear down.
EndEventLoop();
@ -233,18 +236,22 @@ TEST_F(FSEventsTests, test_fsevents_fire_event) {
TEST_F(FSEventsTests, test_fsevents_event_action) {
// Assume event type is registered.
StartEventLoop();
TestFSEventsEventSubscriber::getInstance()->init();
SubscriptionAction(0, TestFSEventsEventSubscriber::EventCallback);
// Simulate registering an event subscriber.
auto sub = std::make_shared<TestFSEventsEventSubscriber>();
sub->init();
auto sc = sub->GetSubscription(0);
sub->subscribe(&TestFSEventsEventSubscriber::Callback, sc);
// SubscriptionAction(0, TestFSEventsEventSubscriber::EventCallback);
CreateEvents();
TestFSEventsEventSubscriber::WaitForEvents(kMaxEventLatency);
sub->WaitForEvents(kMaxEventLatency);
// Make sure the fsevents action was expected.
const auto& event_module = TestFSEventsEventSubscriber::getInstance();
EXPECT_TRUE(event_module->actions_.size() > 0);
if (event_module->actions_.size() > 1) {
EXPECT_EQ(event_module->actions_[0], "UPDATED");
EXPECT_TRUE(sub->actions_.size() > 0);
if (sub->actions_.size() > 1) {
EXPECT_EQ(sub->actions_[0], "UPDATED");
}
// Cause the thread to tear down.