EventBus implementation using RxRelay
import com.jakewharton.rxrelay2.PublishRelay
import io.reactivex.Observable
/**
* EventBus implementation (pub-sub pattern). Enables posting and subscribing to any events.
* http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
*
* [post] method is shorthand for posting events to a bus
* [subscribe] method is shorthand for listening for specific type of events on main thread
* [observable] property exposes raw observable with events to enable custom operator usage
*
* WARNING: remember to dispose after subscribing to avoid memory leaks!
*
* @author jskierbi
*/
object EventBus {
/** Relay is implementation detail - hidden */
private val relay = PublishRelay.create<Any>()
/**
* Raw observable emitting events posted to a bus.
* Enables usage operators to cover complex use cases
*/
val observable: Observable<Any> = relay
/**
* Posts an event to the bus. Events are emitted by [observable] and
* delivered to any handler subscribed via [subscribe]
*/
fun post(event: Any) = relay.accept(event)
/**
* Shorthand for subscribing for specific event types on main thread.
* All events are delivered on main thread.
*/
inline fun <reified T> subscribe(noinline eventHandler: (T) -> Unit) = observable
.filter { it is T }
.observeOn(AndroidSchedulers.mainThread())
.subscribe { eventHandler(it as T) }!!
}
data class Event(val msg: String)
// Subscribe to a specific event type on main thread
val eventDisposable = EventBus.subscribe<Event> {
logd("Received event: $it")
}
// Post an event
EventBus.post(Event("Something happened!"))
// Rx operators integration (more complex use case)
// Event on bus triggers data refresh from API
val datasetRefreshDisposable = EventBus.observable
.filter { it is Event }
.flatMap { api.refreshData() }
.subscribe (
{ result -> updateUi(result) },
{ error -> showError(error) }
)
// Don't forget to dispose (as EventBus is a singleton)
eventDisposable.dispose()
datasetRefreshDisposable.dispose()
compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0'