jskierbi
6/1/2017 - 1:20 PM

EventBus implementation using RxRelay

EventBus implementation using RxRelay

import com.jakewharton.rxrelay2.PublishRelay
import io.reactivex.Observable

/**
 * EventBus implementation (pub-sub pattern). Enables posting and subscribing to any events.
 * http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
 *
 * [post] method is shorthand for posting events to a bus
 * [subscribe] method is shorthand for listening for specific type of events on main thread
 * [observable] property exposes raw observable with events to enable custom operator usage
 *
 * WARNING: remember to dispose after subscribing to avoid memory leaks!
 *
 * @author jskierbi
 */
object EventBus {

  /** Relay is implementation detail - hidden */
  private val relay = PublishRelay.create<Any>()

  /**
   * Raw observable emitting events posted to a bus.
   * Enables usage operators to cover complex use cases
   */
  val observable: Observable<Any> = relay

  /**
   * Posts an event to the bus. Events are emitted by [observable] and
   * delivered to any handler subscribed via [subscribe]
   */
  fun post(event: Any) = relay.accept(event)

  /**
   * Shorthand for subscribing for specific event types on main thread.
   * All events are delivered on main thread.
   */
  inline fun <reified T> subscribe(noinline eventHandler: (T) -> Unit) = observable
    .filter { it is T }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { eventHandler(it as T) }!!
}
data class Event(val msg: String)

// Subscribe to a specific event type on main thread
val eventDisposable = EventBus.subscribe<Event> {
  logd("Received event: $it")
}

// Post an event
EventBus.post(Event("Something happened!"))

// Rx operators integration (more complex use case)
// Event on bus triggers data refresh from API
val datasetRefreshDisposable = EventBus.observable
  .filter { it is Event }
  .flatMap { api.refreshData() }
  .subscribe (
    { result -> updateUi(result) },
    { error -> showError(error) }
  )

// Don't forget to dispose (as EventBus is a singleton)
eventDisposable.dispose()
datasetRefreshDisposable.dispose()
compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0'