import assert from 'nanoassert'
import Deferred from './Deferred.mjs'
import asyncWrap from './asyncWrap.mjs'
import CancelledError from './CancelledError.mjs'
import delay from './delay.mjs'
/**
* A class representing a queue.
*
* Tasks added to the queue are processed in parallel (up to the concurrency limit).
* If all slots of the queue are occupied, the task is queued until one becomes available.
* When a slot is freed, the pending task with higher priority is executed. If multiple pending tasks have the same
* priority the first that was scheduled is executed.
*
* Once a task is completed, its corresponding promise is terminated accordingly.
*
* @example
* import { Queue, asyncRoot, sleep } from 'modern-async'
*
* asyncRoot(async () => {
* const queue = new Queue(3) // create a queue with concurrency 3
*
* const array = Array.from(Array(100).keys()) // an array of 100 numbers from 0 to 99
*
* const promises = []
* for (const i of array) {
* promises.push(queue.exec(async () => {
* console.log(`Starting task ${i}`)
* await sleep(Math.random() * 10) // waits a random amount of time between 0ms and 10ms
* console.log(`Ending task ${i}`)
* return i;
* }))
* }
* const results = await Promise.all(promises)
* // all the scheduled tasks will perform with a maximum concurrency of 3 and log when they start and stop
*
* console.log(results) // will display an array with the result of the execution of each separate task
* })
*/
class Queue {
/**
* Constructs a queue with the given concurrency
*
* @param {number} concurrency The concurrency of the queue, must be an integer greater than 0 or
* `Number.POSITIVE_INFINITY`.
*/
constructor (concurrency) {
assert(Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY,
'concurrency must be an integer or positive infinity')
assert(concurrency > 0, 'concurrency must be greater than 0')
if (concurrency !== Number.POSITIVE_INFINITY) {
this._queue = new _InternalQueuePriority(concurrency)
} else {
this._queue = new _InternalInfinityQueue()
}
}
/**
* (Read-only) The concurrency of the queue.
*
* @member {number}
*
* @returns {number} ignore
*/
get concurrency () {
return this._queue.concurrency
}
/**
* (Read-only) The current number of tasks that are processing.
*
* @member {number}
*
* @returns {number} ignore
*/
get running () {
return this._queue.running
}
/**
* (Read-only) The number of pending tasks.
*
* @member {number}
*
* @returns {number} ignore
*/
get pending () {
return this._queue.pending
}
/**
* Puts a task at the end of the queue. When the task is executed and completes the returned promise will be terminated
* accordingly.
*
* @param {Function} fct An asynchronous functions representing the task. It will be executed when the queue has
* available slots and its result will be propagated to the promise returned by exec().
* @param {number} priority (Optional) The priority of the task. The higher the priority is, the sooner the task will be
* executed regarding the priority of other pending tasks. Defaults to 0.
* @returns {Promise} A promise that will be resolved or rejected once the task has completed. Its state will be the same
* than the promise returned by the call to `fct`.
*/
async exec (fct, priority = 0) {
return this._queue.exec(fct, priority)
}
/**
* Puts a task at the end of the queue. When the task is executed and completes the returned promise will be terminated
* accordingly.
*
* This function returns both a promise and a cancel function. The cancel function allows to cancel the pending task,
* but only if it wasn't started yet. Calling the cancel function on a task that it already running has no effect.
* When a task is cancelled its corresponding promise will be rejected with a `CancelledError`.
*
* @param {Function} fct An asynchronous functions representing the task. It will be executed when the queue has
* available slots and its result will be propagated to the promise returned by exec().
* @param {number} priority (Optional) The priority of the task. The higher the priority is, the sooner the task will be
* executed regarding the priority of other pending tasks. Defaults to 0.
* @returns {Array} A tuple with two parameters:
* * `promise`: A promise that will be resolved or rejected once the task has completed. Its state will be the same
* than the promise returned by the call to `fct`.
* * `cancel`: A cancel function. When called it will cancel the task if it is still pending. It has no effect is the
* task has already started or already terminated. When a task is cancelled its corresponding promise will be
* rejected with a `CancelledError`. If will return `true` if the task was effectively pending and was cancelled,
* `false` in any other case.
*/
execCancellable (fct, priority = 0) {
return this._queue.execCancellable(fct, priority)
}
/**
* Cancels all pending tasks. Their corresponding promises will be rejected with a `CancelledError`. This method will
* not alter tasks that are already running.
*
* @returns {number} The number of pending tasks that were effectively cancelled.
*/
cancelAllPending () {
return this._queue.cancelAllPending()
}
}
export default Queue
/**
* @ignore
*/
class _InternalQueuePriority {
/**
* @ignore
*
* @param {number} concurrency ignore
*/
constructor (concurrency) {
this._concurrency = concurrency
this._iqueue = []
this._running = 0
}
/**
* @ignore
* @returns {number} ignore
*/
get concurrency () {
return this._concurrency
}
/**
* @ignore
* @returns {number} ignore
*/
get running () {
return this._running
}
/**
* @ignore
* @returns {number} ignore
*/
get pending () {
return this._iqueue.length - this.running
}
/**
* @ignore
*
* @param {*} fct ignored
* @param {*} priority ignored
* @returns {*} ignored
*/
async exec (fct, priority) {
return this.execCancellable(fct, priority)[0]
}
/**
* @ignore
* @param {*} fct ignore
* @param {*} priority ignore
* @returns {*} ignore
*/
execCancellable (fct, priority) {
assert(typeof fct === 'function', 'fct must be a function')
assert(typeof priority === 'number', 'priority must be a number')
const deferred = new Deferred()
let i = this._iqueue.length
while (i >= 1) {
const t = this._iqueue[i - 1]
if (t.priority >= priority) {
break
}
i -= 1
}
const task = {
asyncFct: asyncWrap(fct),
deferred,
running: false,
priority
}
this._iqueue.splice(i, 0, task)
this._checkQueue()
return [deferred.promise, () => {
if (task.running) {
return false
} else {
const filtered = this._iqueue.filter((v) => v !== task)
if (filtered.length < this._iqueue.length) {
this._iqueue = filtered
deferred.reject(new CancelledError())
return true
} else {
return false
}
}
}]
}
/**
* @ignore
*/
_checkQueue () {
while (true) {
assert(this.running >= 0, 'invalid state')
assert(this.running <= this.concurrency, 'invalid state')
if (this.running === this.concurrency) {
return
}
const task = this._iqueue.find((v) => !v.running)
if (task === undefined) {
return
}
task.running = true
this._running += 1
task.asyncFct().finally(() => {
this._running -= 1
this._iqueue = this._iqueue.filter((v) => v !== task)
// the following check is delayed to give the opportunity to components
// listening for promises to cancel pending tasks before they are
// started
delay().then(() => {
this._checkQueue()
})
}).then(task.deferred.resolve, task.deferred.reject)
}
}
/**
* @ignore
* @returns {*} ignore
*/
cancelAllPending () {
const toCancel = this._iqueue.filter((task) => !task.running)
this._iqueue = this._iqueue.filter((task) => task.running)
toCancel.forEach((task) => {
task.deferred.reject(new CancelledError())
})
return toCancel.length
}
}
/**
* @ignore
*/
class _InternalInfinityQueue {
/**
* @ignore
*/
constructor () {
this._running = 0
}
/**
* @ignore
* @returns {number} ignore
*/
get concurrency () {
return Number.POSITIVE_INFINITY
}
/**
* @ignore
* @returns {number} ignore
*/
get running () {
return this._running
}
/**
* @ignore
* @returns {number} ignore
*/
get pending () {
return 0
}
/**
* @ignore
*
* @param {Function} fct ignore
* @returns {Promise} ignore
*/
async exec (fct) {
return this.execCancellable(fct)[0]
}
/**
* @ignore
*
* @param {*} fct ignore
* @returns {*} ignore
*/
execCancellable (fct) {
this._running += 1
const asyncFct = asyncWrap(fct)
const p = asyncFct()
return [p.finally(() => {
this._running -= 1
}), () => false]
}
/**
* @ignore
* @returns {*} ignore
*/
cancelAllPending () {
return 0
}
}