Sawtaytoes
3/17/2019 - 10:53 AM

CSP Alternative: Redux-Observable

const { delay, filter, map, mapTo, mergeAll, mergeMap, scan, switchMap, tap } = require('rxjs/operators')
const { of, Subject } = require('rxjs')
const { ofType } = require('redux-observable')

const ADD_TO_QUEUE = 'ADD_TO_QUEUE'
const FINISHED_PROCESSING_ITEM = 'FINISHED_PROCESSING_ITEM'
const REMOVE_FROM_QUEUE = 'REMOVE_FROM_QUEUE'
const START_PROCESSING_QUEUE = 'START_PROCESSING_QUEUE'

const queueReducer = (
  queuedItems,
  {
    item,
    type,
  },
) => (
  type === ADD_TO_QUEUE
  ? (
    queuedItems
    .concat(item)
  )
  : (
    type === REMOVE_FROM_QUEUE
    ? (
      queuedItems
      .filter((
        queuedItem,
      ) => (
        queuedItem !== item
      ))
    )
    : queuedItems
  )
)

const queuedItemsEpic = (
  action$,
) => (
  action$
  .pipe(
    tap(({
      type,
    }) => {
      console
      .info(
        '[Action]',
        type,
      )
    }),
    ofType(
      ADD_TO_QUEUE,
      REMOVE_FROM_QUEUE,
    ),
    scan(
      queueReducer,
      [],
    ),
    switchMap((
      queuedItems,
    ) => (
      action$
      .pipe(
        ofType(
          FINISHED_PROCESSING_ITEM,
          START_PROCESSING_QUEUE,
        ),
        mapTo(queuedItems[0]),
      )
    )),
    tap((
      value,
    ) => {
      console
      .info(
        'Processing value:',
        value,
      )
    }),
  )
  .pipe(
    filter(Boolean),
    mergeMap((
      item,
    ) => ([
      (
        of({
          item,
          type: REMOVE_FROM_QUEUE,
        })
      ),
      (
        of(item)
        .pipe(
          delay(4000),
          map((
            value,
          ) => (
            `'${value}' is done`
          )),
          tap(console.info),
          mapTo({
            type: FINISHED_PROCESSING_ITEM,
          }),
        )
      ),
    ])),
    mergeAll(),
  )
)

const action$ = new Subject()

queuedItemsEpic(action$)
.subscribe(action$)

action$
.next({
  item: 'A',
  type: 'ADD_TO_QUEUE',
})

action$
.next({
  item: 'B',
  type: 'ADD_TO_QUEUE',
})

action$
.next({
  item: 'C',
  type: 'ADD_TO_QUEUE',
})

action$
.next({
  type: 'START_PROCESSING_QUEUE',
})