Add expiration to events

This commit is contained in:
Teddy Reed 2014-12-06 02:18:29 -07:00
parent 7b16e45f55
commit 19695d40aa
3 changed files with 102 additions and 35 deletions

View File

@ -503,7 +503,7 @@ class EventSubscriber {
*
* @return List of EventID, EventTime%s
*/
std::vector<EventRecord> getRecords(EventTime start, EventTime stop);
std::vector<EventRecord> getRecords(const std::vector<std::string>& indexes);
private:
/**
@ -521,7 +521,7 @@ class EventSubscriber {
*/
EventID getEventID();
/*
/**
* @brief Plan the best set of indexes for event record access.
*
* @param start an inclusive time to begin searching.
@ -534,7 +534,20 @@ class EventSubscriber {
EventTime stop,
int list_key = 0);
/*
/**
* @brief Expire indexes and eventually records.
*
* @param list_type the string representation of list binning type.
* @param indexes complete set of 'index.step' indexes for the list_type.
* @param expirations of the indexes, the set to expire.
*
* @return status if the indexes and records were removed.
*/
Status expireIndexes(const std::string& list_type,
const std::vector<std::string>& indexes,
const std::vector<std::string>& expirations);
/**
* @brief Add an EventID, EventTime pair to all matching list types.
*
* The list types are defined by time size. Based on the EventTime this pair
@ -558,7 +571,10 @@ class EventSubscriber {
* EventPublisher instances will have run `setUp` and initialized their run
* loops.
*/
EventSubscriber() { expire_events_ = true; }
EventSubscriber() {
expire_events_ = true;
expire_time_ = 0;
}
virtual ~EventSubscriber() {}
/// Backing storage indexing namespace definition methods.

View File

@ -128,20 +128,23 @@ std::vector<std::string> EventSubscriber::getIndexes(EventTime start,
// the first bin's start time and the last bin's stop time.
// (3) The last iteration's range includes relaxed bounds outside the
// requested start to stop range.
std::vector<std::string> all_bins, bins;
std::vector<std::string> all_bins, bins, expirations;
boost::split(all_bins, time_list, boost::is_any_of(","));
for (const auto& bin : all_bins) {
// Bins are identified by the binning size step.
auto step = boost::lexical_cast<int>(bin);
auto step = boost::lexical_cast<EventTime>(bin);
// Check if size * step -> size * (step + 1) is within a range.
int bin_start = size * step, bin_stop = size * (step + 1);
if (bin_start >= start && bin_stop <= start_max) {
if (expire_events_ && step * size < expire_time_) {
expirations.push_back(list_type + "." + bin);
} else if (bin_start >= start && bin_stop <= start_max) {
bins.push_back(bin);
} else if ((bin_start >= stop_min && bin_stop <= stop) || stop == 0) {
bins.push_back(bin);
}
}
expireIndexes(list_type, all_bins, expirations);
if (bins.size() != 0) {
// If more percision was acheived though this list's binning.
local_start = boost::lexical_cast<EventTime>(bins.front()) * size;
@ -162,14 +165,49 @@ std::vector<std::string> EventSubscriber::getIndexes(EventTime start,
return indexes;
}
std::vector<EventRecord> EventSubscriber::getRecords(EventTime start,
EventTime stop) {
Status EventSubscriber::expireIndexes(
const std::string& list_type,
const std::vector<std::string>& indexes,
const std::vector<std::string>& expirations) {
auto db = DBHandle::getInstance();
auto index_key = "indexes." + dbNamespace();
auto record_key = "records." + dbNamespace();
auto data_key = "data." + dbNamespace();
// Get the records list for the soon-to-be expired records.
std::vector<std::string> record_indexes;
for (const auto& bin : expirations) {
record_indexes.push_back(list_type + "." + bin);
}
auto expired_records = getRecords(record_indexes);
// Remove the records using the list of expired indexes.
std::vector<std::string> persisting_indexes = indexes;
for (const auto& bin : expirations) {
db->Delete(kEvents, record_key + "." + list_type + "." + bin);
persisting_indexes.erase(
std::remove(persisting_indexes.begin(), persisting_indexes.end(), bin),
persisting_indexes.end());
}
// Update the list of indexes with the non-expired indexes.
auto new_indexes = boost::algorithm::join(persisting_indexes, ",");
db->Put(kEvents, index_key + "." + list_type, new_indexes);
// Delete record events.
for (const auto& record : expired_records) {
db->Delete(kEvents, data_key + "." + record.first);
}
return Status(0, "OK");
}
std::vector<EventRecord> EventSubscriber::getRecords(
const std::vector<std::string>& indexes) {
auto db = DBHandle::getInstance();
auto record_key = "records." + dbNamespace();
std::vector<EventRecord> records;
auto indexes = getIndexes(start, stop);
for (const auto& index : indexes) {
std::string record_value;
if (!db->Get(kEvents, record_key + "." + index, record_value).ok()) {
@ -192,15 +230,7 @@ std::vector<EventRecord> EventSubscriber::getRecords(EventTime start,
}
}
// Now all the event_ids/event_times within the binned range exist.
// Select further on the EXACT time range.
std::vector<EventRecord> mapped_records;
for (const auto& record : records) {
if (record.second >= start && (record.second <= stop || stop == 0)) {
mapped_records.push_back(record);
}
}
return mapped_records;
return records;
}
Status EventSubscriber::recordEvent(EventID eid, EventTime time) {
@ -292,13 +322,21 @@ QueryData EventSubscriber::get(EventTime start, EventTime stop) {
auto db = DBHandle::getInstance();
// Get the records for this time range.
auto records = getRecords(start, stop);
auto indexes = getIndexes(start, stop);
auto records = getRecords(indexes);
std::vector<EventRecord> mapped_records;
for (const auto& record : records) {
if (record.second >= start && (record.second <= stop || stop == 0)) {
mapped_records.push_back(record);
}
}
std::string events_key = "data." + dbNamespace();
// Select records using event_ids as keys.
// Select mapped_records using event_ids as keys.
std::string data_value;
for (const auto& record : records) {
for (const auto& record : mapped_records) {
Row r;
status = db->Get(kEvents, events_key + "." + record.first, data_value);
if (data_value.length() == 0) {

View File

@ -110,29 +110,42 @@ TEST_F(EventsDatabaseTests, test_record_indexing) {
TEST_F(EventsDatabaseTests, test_record_range) {
auto fake_event_module = FakeEventSubscriber::getInstance();
// Search within a specific record range.
auto records = fake_event_module->getRecords(0, 10);
EXPECT_EQ(records.size(), 2); // 2, 11
auto indexes = fake_event_module->getIndexes(0, 10);
auto records = fake_event_module->getRecords(indexes);
EXPECT_EQ(records.size(), 2); // 1, 2
// Search within a large bound.
records = fake_event_module->getRecords(3, 3601);
EXPECT_EQ(records.size(), 3); // 11, 61, 3601
indexes = fake_event_module->getIndexes(3, 3601);
// This will include the 0-10 bucket meaning 1, 2 will show up.
records = fake_event_module->getRecords(indexes);
EXPECT_EQ(records.size(), 5); // 1, 2, 11, 61, 3601
// Get all of the records.
records = fake_event_module->getRecords(0, 3 * 3600);
indexes = fake_event_module->getIndexes(0, 3 * 3600);
records = fake_event_module->getRecords(indexes);
EXPECT_EQ(records.size(), 8); // 1, 2, 11, 61, 3601, 7201, 7211, 7261
// stop = 0 is an alias for everything.
records = fake_event_module->getRecords(0, 0);
indexes = fake_event_module->getIndexes(0, 0);
records = fake_event_module->getRecords(indexes);
EXPECT_EQ(records.size(), 8);
}
TEST_F(EventsDatabaseTests, test_record_expiration) {
auto fake_event_module = FakeEventSubscriber::getInstance();
auto status = fake_event_module->testAdd(1);
EXPECT_TRUE(status.ok());
// No expiration
auto indexes = fake_event_module->getIndexes(0, 60);
auto records = fake_event_module->getRecords(indexes);
EXPECT_EQ(records.size(), 3); // 1, 2, 11
fake_event_module->expire_time_ = 10;
indexes = fake_event_module->getIndexes(0, 60);
records = fake_event_module->getRecords(indexes);
EXPECT_EQ(records.size(), 1); // 11
}
}
int main(int argc, char* argv[]) {