manfred-exz
1/9/2018 - 2:32 AM

design_patterns

#pragma once
#include <boost/any.hpp>
#include "common/algorithm_utils.h"

namespace event {
	class Event {
		static common::Incrementer<size_t> event_id_inc_;
		size_t id_;
		std::string source_;
	public:
		Event() : id_(event_id_inc_.TakeAndIncrement()) {}
		virtual std::string expr() { return {}; }
		size_t id() const { return id_; }
		std::string source() const { return source_; }
		void set_source(std::string source) { source_ = std::move(source); }
		virtual ~Event() = default;
	};

	typedef Event* EventPtr;
}
#pragma once
#include <mutex>
#include <deque>
#include <boost/range/adaptors.hpp>
#include <boost/circular_buffer.hpp>
#include <glog/logging.h>
#include "event.h"
#include "event_loop_id.h"
#include "event_listener.h"

namespace event {
	typedef std::function<void(const EventPtr&)> event_handler_t;
	typedef std::shared_ptr<event_handler_t> event_handler_shared_ptr;
	typedef std::weak_ptr<event_handler_t> event_handler_weak_ptr;

	class EventLoop {
		constexpr static size_t kEventLoopCapacity = 1000;

		mutable std::mutex events_mu_;
		boost::circular_buffer<EventPtr> events_;

		mutable std::mutex mu_;
		std::vector<event_handler_weak_ptr> event_handlers_;
		std::thread loop_thread_;
		std::atomic_bool run_loop_ = false;

	public:
		EventLoop()
			: events_(kEventLoopCapacity)
		{}

		~EventLoop() {
			run_loop_ = false;
			if(loop_thread_.joinable())
				loop_thread_.join();

			for (auto e : events_) {
				delete e;
			}
		}

		// ! PushEvent takes ownership of EventPtr e
		void TakeAndPushEvent(EventPtr e) {
			std::lock_guard<std::mutex> lock(events_mu_);
			LOG_IF(ERROR, events_.size() == events_.capacity())
				<< "event: buffer is full, some events are lost.";
			events_.push_back(e);
		}

		bool HasNextEvent() const {
			std::lock_guard<std::mutex> lock(events_mu_);
			return !events_.empty();
		}

		void ProcessEvent() {
			using boost::adaptors::filtered;
			EventPtr e = [this] {
				std::lock_guard<std::mutex> lock(events_mu_);
				auto e = std::move(events_.front());
				events_.pop_front();
				return e;
			}();

			std::lock_guard<std::mutex> lock(mu_);
			for (auto iter = event_handlers_.begin(); iter != event_handlers_.end(); ) {
				if(auto handler = iter->lock()) {
					handler->operator()(e);
					++iter;
				} else {
					// if handler if invalid, erase it
					iter = event_handlers_.erase(iter);
				}
			}

			LOG(INFO) << "event: " << e->expr() << " processed";
		}

		void AddEventHandler(const event_handler_weak_ptr& handler_weak_ptr) {
			std::lock_guard<std::mutex> lock(mu_);
			event_handlers_.push_back(handler_weak_ptr);
		}

		// start a thread to handle events(it's also possible to call HasNextEvent/ProcessEvent without thread)
		// thread is auto joined when object is destructed.
		void StartLoopThread() {
			// check `loop_thread` is empty
			assert(loop_thread_.get_id() == std::thread().get_id());
			loop_thread_ = std::thread([&]
			{
				run_loop_ = true;
				while (run_loop_) {
					if (HasNextEvent()) {
						ProcessEvent();
					}
				}
			});
		}

		void StopLoopThread() {
			run_loop_ = false;
		}
	};
}