wesleybliss
1/3/2018 - 7:57 PM

Example queue in JS with max parallel option

Example queue in JS with max parallel option

'use strict'

const http = require('http')
const https = require('https')


const randomNumber = (from, to) => Math.floor(Math.random() * (to - from + 1) + from)

const longTask = id => () => new Promise((resolve, reject) => {
    setTimeout(() => { resolve(`Finished ${id}`) }, randomNumber(500, 7000))
})

class Queue {
    
    constructor(items, maxParallel = 3) {
        
        if (!items || !items.length)
            throw new Error('Items must be an array')
        
        items.forEach((item, index) => {
            if (!(typeof item === 'function') || !(item instanceof Function))
                throw new Error(`Item at index ${index} must be a function`)
        })
        
        this.queue = {}
        this.items = items
        this.maxParallel = maxParallel
        
        console.info('Queue initialized with', this.items.length, 'items')
        
    }
    
    createMonad(id, index, func) {
        
        return async () => {
            
            try {
                
                const val = await func()
                
                delete this.queue[id]
                
                this.onItemComplete(id, index, val,
                    (this.items.length + Object.keys(this.queue).length))
                
                this.processQueue()
                
            }
            catch (e) {
                
                this.onError(e, id, index)
                
            }
            
        }
        
    }
    
    processQueue() {
        
        if (this.items.length < 1 && Object.keys(this.queue).length < 1)
            return this.onFinished()
        
        while (
            (this.items.length > 0) &&
            (Object.keys(this.queue).length < this.maxParallel)) {
            
            const id = Math.random() * Date.now()
            this.queue[id.toString()] = this.createMonad(id, 0, this.items.shift())()
            
        }
        
    }
    
    start(
        onError = (err, id, index) => {},
        onItemComplete = (id, index, val, remainingCount) => {},
        onFinished = () => {}) {
        
        this.onError = onError
        this.onItemComplete = onItemComplete
        this.onFinished = onFinished
        
        this.processQueue()
        
    }
    
}

const items = []
for (let i = 0; i < 20; i++) {
    items.push(longTask(i + 1))
}

const onError = (err, id, index) => {
    console.error(`ID ${id} at index ${index} failed with ${err}`)
}

const onItemComplete = (id, index, val, remainingCount) => {
    console.info(`ID ${id} at index ${index} / ${remainingCount} says ${val}`)
}

const onFinished = () => {
    console.info('ALL COMPLETE')
}

const tasks = new Queue(items)
tasks.start(onError, onItemComplete, onFinished)