/** * Module dependencies. */ var Receiver = require('./receiver') /** * Expose `Channel`. */ module.exports = Channel /** * Constants. */ var CLOSED_ERROR_MSG = 'Cannot add to closed channel' /** * Initialize a `Channel`. * * @param {Function|Object} [empty=Object] * @api private */ function Channel(bufferSize) { this.pendingAdds = [] this.pendingGets = [] this.items = [] this.bufferSize = parseInt(bufferSize, 10) || 0 this.isClosed = false this.isDone = false this.empty = {} } /** * Static reference to the most recently called callback */ Channel.lastCalled = null /** * Get an item with `cb`. * * @param {Function} cb * @api private */ Channel.prototype.get = function (cb){ if (this.done()) { this.callEmpty(cb) } else if (this.items.length > 0 || this.pendingAdds.length > 0) { this.call(cb, this.nextItem()) } else { this.pendingGets.push(cb) } } /** * Remove `cb` from the queue. * * @param {Function} cb * @api private */ Channel.prototype.removeGet = function (cb) { var idx = this.pendingGets.indexOf(cb) if (idx > -1) { this.pendingGets.splice(idx, 1) } } /** * Get the next item and pull from pendingAdds to fill the buffer. * * @return {Mixed} * @api private */ Channel.prototype.nextItem = function () { if (this.pendingAdds.length > 0) { this.items.push(this.pendingAdds.shift().add()) } return this.items.shift() } /** * Add `val` to the channel. * * @param {Mixed} val * @return {Function} thunk * @api private */ Channel.prototype.add = function (val){ var receiver = new Receiver(val) if (this.isClosed) { receiver.error(Error(CLOSED_ERROR_MSG)) } else if (this.pendingGets.length > 0) { this.call(this.pendingGets.shift(), receiver.add()) } else if (this.items.length < this.bufferSize) { this.items.push(receiver.add()) } else { this.pendingAdds.push(receiver) } return function (cb) { receiver.callback(cb) } } /** * Invoke `cb` with `val` facilitate both * `chan(value)` and the `chan(error, value)` * use-cases. * * @param {Function} cb * @param {Mixed} val * @api private */ Channel.prototype.call = function (cb, val) { Channel.lastCalled = this.func if (val instanceof Error) { cb(val) } else { cb(null, val) } this.done() } /** * Invoke `cb` callback with the empty value. * * @param {Function} cb * @api private */ Channel.prototype.callEmpty = function (cb) { this.call(cb, this.empty) } /** * Prevennt future values from being added to * the channel. * * @return {Boolean} * @api public */ Channel.prototype.close = function () { this.isClosed = true var receiver while (receiver = this.pendingAdds.shift()) { receiver.error(Error(CLOSED_ERROR_MSG)) } return this.done() } /** * Check to see if the channel is done and * call pending callbacks if necessary. * * @return {Boolean} * @api private */ Channel.prototype.done = function () { if (!this.isDone && this.isClosed && this.items.length === 0) { this.isDone = true // call each pending callback with the empty value this.pendingGets.forEach(function (cb) { this.callEmpty(cb) }, this) } return this.isDone }