channel.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. /**
  2. * Module dependencies.
  3. */
  4. var Receiver = require('./receiver')
  5. /**
  6. * Expose `Channel`.
  7. */
  8. module.exports = Channel
  9. /**
  10. * Constants.
  11. */
  12. var CLOSED_ERROR_MSG = 'Cannot add to closed channel'
  13. /**
  14. * Initialize a `Channel`.
  15. *
  16. * @param {Function|Object} [empty=Object]
  17. * @api private
  18. */
  19. function Channel(bufferSize) {
  20. this.pendingAdds = []
  21. this.pendingGets = []
  22. this.items = []
  23. this.bufferSize = parseInt(bufferSize, 10) || 0
  24. this.isClosed = false
  25. this.isDone = false
  26. this.empty = {}
  27. }
  28. /**
  29. * Static reference to the most recently called callback
  30. */
  31. Channel.lastCalled = null
  32. /**
  33. * Get an item with `cb`.
  34. *
  35. * @param {Function} cb
  36. * @api private
  37. */
  38. Channel.prototype.get = function (cb){
  39. if (this.done()) {
  40. this.callEmpty(cb)
  41. } else if (this.items.length > 0 || this.pendingAdds.length > 0) {
  42. this.call(cb, this.nextItem())
  43. } else {
  44. this.pendingGets.push(cb)
  45. }
  46. }
  47. /**
  48. * Remove `cb` from the queue.
  49. *
  50. * @param {Function} cb
  51. * @api private
  52. */
  53. Channel.prototype.removeGet = function (cb) {
  54. var idx = this.pendingGets.indexOf(cb)
  55. if (idx > -1) {
  56. this.pendingGets.splice(idx, 1)
  57. }
  58. }
  59. /**
  60. * Get the next item and pull from pendingAdds to fill the buffer.
  61. *
  62. * @return {Mixed}
  63. * @api private
  64. */
  65. Channel.prototype.nextItem = function () {
  66. if (this.pendingAdds.length > 0) {
  67. this.items.push(this.pendingAdds.shift().add())
  68. }
  69. return this.items.shift()
  70. }
  71. /**
  72. * Add `val` to the channel.
  73. *
  74. * @param {Mixed} val
  75. * @return {Function} thunk
  76. * @api private
  77. */
  78. Channel.prototype.add = function (val){
  79. var receiver = new Receiver(val)
  80. if (this.isClosed) {
  81. receiver.error(Error(CLOSED_ERROR_MSG))
  82. } else if (this.pendingGets.length > 0) {
  83. this.call(this.pendingGets.shift(), receiver.add())
  84. } else if (this.items.length < this.bufferSize) {
  85. this.items.push(receiver.add())
  86. } else {
  87. this.pendingAdds.push(receiver)
  88. }
  89. return function (cb) {
  90. receiver.callback(cb)
  91. }
  92. }
  93. /**
  94. * Invoke `cb` with `val` facilitate both
  95. * `chan(value)` and the `chan(error, value)`
  96. * use-cases.
  97. *
  98. * @param {Function} cb
  99. * @param {Mixed} val
  100. * @api private
  101. */
  102. Channel.prototype.call = function (cb, val) {
  103. Channel.lastCalled = this.func
  104. if (val instanceof Error) {
  105. cb(val)
  106. } else {
  107. cb(null, val)
  108. }
  109. this.done()
  110. }
  111. /**
  112. * Invoke `cb` callback with the empty value.
  113. *
  114. * @param {Function} cb
  115. * @api private
  116. */
  117. Channel.prototype.callEmpty = function (cb) {
  118. this.call(cb, this.empty)
  119. }
  120. /**
  121. * Prevennt future values from being added to
  122. * the channel.
  123. *
  124. * @return {Boolean}
  125. * @api public
  126. */
  127. Channel.prototype.close = function () {
  128. this.isClosed = true
  129. var receiver
  130. while (receiver = this.pendingAdds.shift()) {
  131. receiver.error(Error(CLOSED_ERROR_MSG))
  132. }
  133. return this.done()
  134. }
  135. /**
  136. * Check to see if the channel is done and
  137. * call pending callbacks if necessary.
  138. *
  139. * @return {Boolean}
  140. * @api private
  141. */
  142. Channel.prototype.done = function () {
  143. if (!this.isDone && this.isClosed && this.items.length === 0) {
  144. this.isDone = true
  145. // call each pending callback with the empty value
  146. this.pendingGets.forEach(function (cb) { this.callEmpty(cb) }, this)
  147. }
  148. return this.isDone
  149. }