123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- /**
- * Module dependencies.
- */
- var Receiver = require('./receiver')
- /**
- * Expose `Channel`.
- */
- module.exports = Channel
- /**
- * Constants.
- */
- var CLOSED_ERROR_MSG = 'Cannot add to closed channel'
- /**
- * Initialize a `Channel`.
- *
- * @param {Function|Object} [empty=Object]
- * @api private
- */
- function Channel(bufferSize) {
- this.pendingAdds = []
- this.pendingGets = []
- this.items = []
- this.bufferSize = parseInt(bufferSize, 10) || 0
- this.isClosed = false
- this.isDone = false
- this.empty = {}
- }
- /**
- * Static reference to the most recently called callback
- */
- Channel.lastCalled = null
- /**
- * Get an item with `cb`.
- *
- * @param {Function} cb
- * @api private
- */
- Channel.prototype.get = function (cb){
- if (this.done()) {
- this.callEmpty(cb)
- } else if (this.items.length > 0 || this.pendingAdds.length > 0) {
- this.call(cb, this.nextItem())
- } else {
- this.pendingGets.push(cb)
- }
- }
- /**
- * Remove `cb` from the queue.
- *
- * @param {Function} cb
- * @api private
- */
- Channel.prototype.removeGet = function (cb) {
- var idx = this.pendingGets.indexOf(cb)
- if (idx > -1) {
- this.pendingGets.splice(idx, 1)
- }
- }
- /**
- * Get the next item and pull from pendingAdds to fill the buffer.
- *
- * @return {Mixed}
- * @api private
- */
- Channel.prototype.nextItem = function () {
- if (this.pendingAdds.length > 0) {
- this.items.push(this.pendingAdds.shift().add())
- }
- return this.items.shift()
- }
- /**
- * Add `val` to the channel.
- *
- * @param {Mixed} val
- * @return {Function} thunk
- * @api private
- */
- Channel.prototype.add = function (val){
- var receiver = new Receiver(val)
- if (this.isClosed) {
- receiver.error(Error(CLOSED_ERROR_MSG))
- } else if (this.pendingGets.length > 0) {
- this.call(this.pendingGets.shift(), receiver.add())
- } else if (this.items.length < this.bufferSize) {
- this.items.push(receiver.add())
- } else {
- this.pendingAdds.push(receiver)
- }
- return function (cb) {
- receiver.callback(cb)
- }
- }
- /**
- * Invoke `cb` with `val` facilitate both
- * `chan(value)` and the `chan(error, value)`
- * use-cases.
- *
- * @param {Function} cb
- * @param {Mixed} val
- * @api private
- */
- Channel.prototype.call = function (cb, val) {
- Channel.lastCalled = this.func
- if (val instanceof Error) {
- cb(val)
- } else {
- cb(null, val)
- }
- this.done()
- }
- /**
- * Invoke `cb` callback with the empty value.
- *
- * @param {Function} cb
- * @api private
- */
- Channel.prototype.callEmpty = function (cb) {
- this.call(cb, this.empty)
- }
- /**
- * Prevennt future values from being added to
- * the channel.
- *
- * @return {Boolean}
- * @api public
- */
- Channel.prototype.close = function () {
- this.isClosed = true
- var receiver
- while (receiver = this.pendingAdds.shift()) {
- receiver.error(Error(CLOSED_ERROR_MSG))
- }
- return this.done()
- }
- /**
- * Check to see if the channel is done and
- * call pending callbacks if necessary.
- *
- * @return {Boolean}
- * @api private
- */
- Channel.prototype.done = function () {
- if (!this.isDone && this.isClosed && this.items.length === 0) {
- this.isDone = true
- // call each pending callback with the empty value
- this.pendingGets.forEach(function (cb) { this.callEmpty(cb) }, this)
- }
- return this.isDone
- }
|