Example queue in JS with max parallel option
'use strict'
const http = require('http')
const https = require('https')
const randomNumber = (from, to) => Math.floor(Math.random() * (to - from + 1) + from)
const longTask = id => () => new Promise((resolve, reject) => {
setTimeout(() => { resolve(`Finished ${id}`) }, randomNumber(500, 7000))
})
class Queue {
constructor(items, maxParallel = 3) {
if (!items || !items.length)
throw new Error('Items must be an array')
items.forEach((item, index) => {
if (!(typeof item === 'function') || !(item instanceof Function))
throw new Error(`Item at index ${index} must be a function`)
})
this.queue = {}
this.items = items
this.maxParallel = maxParallel
console.info('Queue initialized with', this.items.length, 'items')
}
createMonad(id, index, func) {
return async () => {
try {
const val = await func()
delete this.queue[id]
this.onItemComplete(id, index, val,
(this.items.length + Object.keys(this.queue).length))
this.processQueue()
}
catch (e) {
this.onError(e, id, index)
}
}
}
processQueue() {
if (this.items.length < 1 && Object.keys(this.queue).length < 1)
return this.onFinished()
while (
(this.items.length > 0) &&
(Object.keys(this.queue).length < this.maxParallel)) {
const id = Math.random() * Date.now()
this.queue[id.toString()] = this.createMonad(id, 0, this.items.shift())()
}
}
start(
onError = (err, id, index) => {},
onItemComplete = (id, index, val, remainingCount) => {},
onFinished = () => {}) {
this.onError = onError
this.onItemComplete = onItemComplete
this.onFinished = onFinished
this.processQueue()
}
}
const items = []
for (let i = 0; i < 20; i++) {
items.push(longTask(i + 1))
}
const onError = (err, id, index) => {
console.error(`ID ${id} at index ${index} failed with ${err}`)
}
const onItemComplete = (id, index, val, remainingCount) => {
console.info(`ID ${id} at index ${index} / ${remainingCount} says ${val}`)
}
const onFinished = () => {
console.info('ALL COMPLETE')
}
const tasks = new Queue(items)
tasks.start(onError, onItemComplete, onFinished)