Asynchronous Bounded Queue in JS/TS using async/await Asynchronous Bounded Queue in JS/TS using async/await typescript typescript

Asynchronous Bounded Queue in JS/TS using async/await


17/04/2019 Update: Long story short, there's a bug in the AsyncSemaphore implementation below, that was caught using property-based testing. You can read all about this "tale" here. Here's the fixed version:

class AsyncSemaphore {    private promises = Array<() => void>()    constructor(private permits: number) {}    signal() {        this.permits += 1        if (this.promises.length > 0) this.promises.pop()!()    }    async wait() {        this.permits -= 1        if (this.permits < 0 || this.promises.length > 0)            await new Promise(r => this.promises.unshift(r))    }}

Finally, after considerable effort, and inspired by @Titian answer, I think I solved this. The code is filled with debug messages, but it might serve pedagogical purposes regarding the flow of control:

class AsyncQueue<T> {    waitingEnqueue = new Array<() => void>()    waitingDequeue = new Array<() => void>()    enqueuePointer = 0    dequeuePointer = 0    queue = Array<T>()    maxSize = 1    trace = 0    async enqueue(x: T) {        this.trace += 1        const localTrace = this.trace        if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {            console.debug(`[${localTrace}] Producer Waiting`)            this.dequeuePointer += 1            await new Promise(r => this.waitingDequeue.unshift(r))            this.waitingDequeue.pop()            console.debug(`[${localTrace}] Producer Ready`)        }        this.queue.unshift(x)        console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)        if (this.enqueuePointer > 0) {            console.debug(`[${localTrace}] Notify Consumer`)            this.waitingEnqueue[this.enqueuePointer-1]()            this.enqueuePointer -= 1        }    }    async dequeue() {        this.trace += 1        const localTrace = this.trace        console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)        if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {            console.debug(`[${localTrace}] Consumer Waiting`)            this.enqueuePointer += 1            await new Promise(r => this.waitingEnqueue.unshift(r))            this.waitingEnqueue.pop()            console.debug(`[${localTrace}] Consumer Ready`)        }        const x = this.queue.pop()!        console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)        if (this.dequeuePointer > 0) {            console.debug(`[${localTrace}] Notify Producer`)            this.waitingDequeue[this.dequeuePointer - 1]()            this.dequeuePointer -= 1        }        return x    }}

Update: Here's a clean version using an AsyncSemaphore, that really encapsulates the way things are usually done using concurrency primitives, but adapted to the asynchronous-CPS-single-threaded-event-loop™ style of JavaScript with async/await. You can see that the logic of AsyncQueue becomes much more intuitive, and the double synchronisation through Promises is delegated to the two semaphores:

class AsyncSemaphore {    private promises = Array<() => void>()    constructor(private permits: number) {}    signal() {        this.permits += 1        if (this.promises.length > 0) this.promises.pop()()    }    async wait() {        if (this.permits == 0 || this.promises.length > 0)            await new Promise(r => this.promises.unshift(r))        this.permits -= 1    }}class AsyncQueue<T> {    private queue = Array<T>()    private waitingEnqueue: AsyncSemaphore    private waitingDequeue: AsyncSemaphore    constructor(readonly maxSize: number) {        this.waitingEnqueue = new AsyncSemaphore(0)        this.waitingDequeue = new AsyncSemaphore(maxSize)    }    async enqueue(x: T) {        await this.waitingDequeue.wait()        this.queue.unshift(x)        this.waitingEnqueue.signal()    }    async dequeue() {        await this.waitingEnqueue.wait()        this.waitingDequeue.signal()        return this.queue.pop()!    }}

Update 2: There seemed to be a subtle bug hidden in the above code, that became evident when trying to use an AsyncQueue of size 0. The semantics do make sense: it is a queue without any buffer, where the publisher always awaits for an consumer to exist. The lines that were preventing it to work were:

await this.waitingEnqueue.wait()this.waitingDequeue.signal()

If you look closely, you'll see that dequeue() isn't perfectly symmetric to enqueue(). In fact, if one swaps the order of these two instructions:

this.waitingDequeue.signal()await this.waitingEnqueue.wait()

Then all works again; it seems intuitive to me that we signal that there's something interested in dequeuing() before actually waiting for an enqueuing to take place.

I'm still not sure this doesn't reintroduce subtle bugs, without extensive testing. I'll leave this as a challenge ;)