#pragma once
#include <mutex>
#include <deque>
#include <boost/range/adaptors.hpp>
#include <boost/circular_buffer.hpp>
#include <glog/logging.h>
#include "event.h"
#include "event_loop_id.h"
#include "event_listener.h"
namespace event {
typedef std::function<void(const EventPtr&)> event_handler_t;
typedef std::shared_ptr<event_handler_t> event_handler_shared_ptr;
typedef std::weak_ptr<event_handler_t> event_handler_weak_ptr;
class EventLoop {
constexpr static size_t kEventLoopCapacity = 1000;
mutable std::mutex events_mu_;
boost::circular_buffer<EventPtr> events_;
mutable std::mutex mu_;
std::vector<event_handler_weak_ptr> event_handlers_;
std::thread loop_thread_;
std::atomic_bool run_loop_ = false;
public:
EventLoop()
: events_(kEventLoopCapacity)
{}
~EventLoop() {
run_loop_ = false;
if(loop_thread_.joinable())
loop_thread_.join();
for (auto e : events_) {
delete e;
}
}
// ! PushEvent takes ownership of EventPtr e
void TakeAndPushEvent(EventPtr e) {
std::lock_guard<std::mutex> lock(events_mu_);
LOG_IF(ERROR, events_.size() == events_.capacity())
<< "event: buffer is full, some events are lost.";
events_.push_back(e);
}
bool HasNextEvent() const {
std::lock_guard<std::mutex> lock(events_mu_);
return !events_.empty();
}
void ProcessEvent() {
using boost::adaptors::filtered;
EventPtr e = [this] {
std::lock_guard<std::mutex> lock(events_mu_);
auto e = std::move(events_.front());
events_.pop_front();
return e;
}();
std::lock_guard<std::mutex> lock(mu_);
for (auto iter = event_handlers_.begin(); iter != event_handlers_.end(); ) {
if(auto handler = iter->lock()) {
handler->operator()(e);
++iter;
} else {
// if handler if invalid, erase it
iter = event_handlers_.erase(iter);
}
}
LOG(INFO) << "event: " << e->expr() << " processed";
}
void AddEventHandler(const event_handler_weak_ptr& handler_weak_ptr) {
std::lock_guard<std::mutex> lock(mu_);
event_handlers_.push_back(handler_weak_ptr);
}
// start a thread to handle events(it's also possible to call HasNextEvent/ProcessEvent without thread)
// thread is auto joined when object is destructed.
void StartLoopThread() {
// check `loop_thread` is empty
assert(loop_thread_.get_id() == std::thread().get_id());
loop_thread_ = std::thread([&]
{
run_loop_ = true;
while (run_loop_) {
if (HasNextEvent()) {
ProcessEvent();
}
}
});
}
void StopLoopThread() {
run_loop_ = false;
}
};
}