Pool.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. "use strict";
  2. const Deferred = require("./Deferred");
  3. /**
  4. * Generate an Object pool with a specified `factory`.
  5. *
  6. * @class
  7. * @param {Object} factory
  8. * Factory to be used for generating and destroying the items.
  9. * @param {String} [factory.name]
  10. * Name of the factory. Serves only logging purposes.
  11. * @param {Function} factory.create
  12. * Should create the item to be acquired,
  13. * and call it's first callback argument with the generated item as it's argument.
  14. * @param {Function} factory.destroy
  15. * Should gently close any resources that the item is using.
  16. * Called before the items is destroyed.
  17. * @param {Function} factory.validate
  18. * Should return true if connection is still valid and false
  19. * If it should be removed from pool. Called before item is
  20. * acquired from pool.
  21. * @param {Number} factory.max
  22. * Maximum number of items that can exist at the same time.
  23. * Any further acquire requests will be pushed to the waiting list.
  24. * @param {Number} factory.min
  25. * Minimum number of items in pool (including in-use).
  26. * When the pool is created, or a resource destroyed, this minimum will
  27. * be checked. If the pool resource count is below the minimum, a new
  28. * resource will be created and added to the pool.
  29. * @param {Number} [factory.idleTimeoutMillis=30000]
  30. * Delay in milliseconds after which available resources in the pool will be destroyed.
  31. * This does not affects pending acquire requests.
  32. * @param {Number} [factory.acquireTimeoutMillis=30000]
  33. * Delay in milliseconds after which pending acquire request in the pool will be rejected.
  34. * Pending acquires are acquire calls which are yet to receive an response from factory.create
  35. * @param {Number} [factory.reapIntervalMillis=1000]
  36. * Clean up is scheduled in every `factory.reapIntervalMillis` milliseconds.
  37. * @param {Boolean|Function} [factory.log=false]
  38. * Whether the pool should log activity. If function is specified,
  39. * that will be used instead. The function expects the arguments msg, loglevel
  40. */
  41. class Pool {
  42. constructor(factory) {
  43. if (!factory.create) {
  44. throw new Error("create function is required");
  45. }
  46. if (!factory.destroy) {
  47. throw new Error("destroy function is required");
  48. }
  49. if (!factory.validate) {
  50. throw new Error("validate function is required");
  51. }
  52. if (
  53. typeof factory.min !== "number" ||
  54. factory.min < 0 ||
  55. factory.min !== Math.round(factory.min)
  56. ) {
  57. throw new Error("min must be an integer >= 0");
  58. }
  59. if (
  60. typeof factory.max !== "number" ||
  61. factory.max <= 0 ||
  62. factory.max !== Math.round(factory.max)
  63. ) {
  64. throw new Error("max must be an integer > 0");
  65. }
  66. if (factory.min > factory.max) {
  67. throw new Error("max is smaller than min");
  68. }
  69. // defaults
  70. factory.idleTimeoutMillis = factory.idleTimeoutMillis || 30000;
  71. factory.acquireTimeoutMillis = factory.acquireTimeoutMillis || 30000;
  72. factory.reapInterval = factory.reapIntervalMillis || 1000;
  73. factory.max = parseInt(factory.max, 10);
  74. factory.min = parseInt(factory.min, 10);
  75. factory.log = factory.log || false;
  76. this._factory = factory;
  77. this._count = 0;
  78. this._draining = false;
  79. // queues
  80. this._pendingAcquires = [];
  81. this._inUseObjects = [];
  82. this._availableObjects = [];
  83. // timing controls
  84. this._removeIdleTimer = null;
  85. this._removeIdleScheduled = false;
  86. }
  87. get size() {
  88. return this._count;
  89. }
  90. get name() {
  91. return this._factory.name;
  92. }
  93. get available() {
  94. return this._availableObjects.length;
  95. }
  96. get using() {
  97. return this._inUseObjects.length;
  98. }
  99. get waiting() {
  100. return this._pendingAcquires.length;
  101. }
  102. get maxSize() {
  103. return this._factory.max;
  104. }
  105. get minSize() {
  106. return this._factory.min;
  107. }
  108. /**
  109. * logs to console or user defined log function
  110. * @private
  111. * @param {string} message
  112. * @param {string} level
  113. */
  114. _log(message, level) {
  115. if (typeof this._factory.log === "function") {
  116. this._factory.log(message, level);
  117. } else if (this._factory.log) {
  118. console.log(`${level.toUpperCase()} pool ${this.name} - ${message}`);
  119. }
  120. }
  121. /**
  122. * Checks and removes the available (idle) clients that have timed out.
  123. * @private
  124. */
  125. _removeIdle() {
  126. const toRemove = [];
  127. const now = Date.now();
  128. let i;
  129. let available = this._availableObjects.length;
  130. const maxRemovable = this._count - this._factory.min;
  131. let timeout;
  132. this._removeIdleScheduled = false;
  133. // Go through the available (idle) items,
  134. // check if they have timed out
  135. for (i = 0; i < available && maxRemovable > toRemove.length; i++) {
  136. timeout = this._availableObjects[i].timeout;
  137. if (now >= timeout) {
  138. // Client timed out, so destroy it.
  139. this._log(
  140. "removeIdle() destroying obj - now:" + now + " timeout:" + timeout,
  141. "verbose"
  142. );
  143. toRemove.push(this._availableObjects[i].resource);
  144. }
  145. }
  146. toRemove.forEach(this.destroy, this);
  147. // NOTE: we are re-calculating this value because it may have changed
  148. // after destroying items above
  149. // Replace the available items with the ones to keep.
  150. available = this._availableObjects.length;
  151. if (available > 0) {
  152. this._log("this._availableObjects.length=" + available, "verbose");
  153. this._scheduleRemoveIdle();
  154. } else {
  155. this._log("removeIdle() all objects removed", "verbose");
  156. }
  157. }
  158. /**
  159. * Schedule removal of idle items in the pool.
  160. *
  161. * More schedules cannot run concurrently.
  162. */
  163. _scheduleRemoveIdle() {
  164. if (!this._removeIdleScheduled) {
  165. this._removeIdleScheduled = true;
  166. this._removeIdleTimer = setTimeout(() => {
  167. this._removeIdle();
  168. }, this._factory.reapInterval);
  169. }
  170. }
  171. /**
  172. * Try to get a new client to work, and clean up pool unused (idle) items.
  173. *
  174. * - If there are available clients waiting, pop the first one out (LIFO),
  175. * and call its callback.
  176. * - If there are no waiting clients, try to create one if it won't exceed
  177. * the maximum number of clients.
  178. * - If creating a new client would exceed the maximum, add the client to
  179. * the wait list.
  180. * @private
  181. */
  182. _dispense() {
  183. let resourceWithTimeout = null;
  184. const waitingCount = this._pendingAcquires.length;
  185. this._log(
  186. `dispense() clients=${waitingCount} available=${this._availableObjects.length}`,
  187. "info"
  188. );
  189. if (waitingCount < 1) {
  190. return;
  191. }
  192. while (this._availableObjects.length > 0) {
  193. this._log("dispense() - reusing obj", "verbose");
  194. resourceWithTimeout = this._availableObjects[
  195. this._availableObjects.length - 1
  196. ];
  197. if (!this._factory.validate(resourceWithTimeout.resource)) {
  198. this.destroy(resourceWithTimeout.resource);
  199. continue;
  200. }
  201. this._availableObjects.pop();
  202. this._inUseObjects.push(resourceWithTimeout.resource);
  203. const deferred = this._pendingAcquires.shift();
  204. return deferred.resolve(resourceWithTimeout.resource);
  205. }
  206. if (this._count < this._factory.max) {
  207. this._createResource();
  208. }
  209. }
  210. /**
  211. * @private
  212. */
  213. _createResource() {
  214. this._count += 1;
  215. this._log(
  216. `createResource() - creating obj - count=${this._count} min=${this._factory.min} max=${this._factory.max}`,
  217. "verbose"
  218. );
  219. this._factory
  220. .create()
  221. .then(resource => {
  222. const deferred = this._pendingAcquires.shift();
  223. this._inUseObjects.push(resource);
  224. if (deferred) {
  225. deferred.resolve(resource);
  226. } else {
  227. this._addResourceToAvailableObjects(resource);
  228. }
  229. })
  230. .catch(error => {
  231. const deferred = this._pendingAcquires.shift();
  232. this._count -= 1;
  233. if (this._count < 0) this._count = 0;
  234. if (deferred) {
  235. deferred.reject(error);
  236. }
  237. process.nextTick(() => {
  238. this._dispense();
  239. });
  240. });
  241. }
  242. _addResourceToAvailableObjects(resource) {
  243. const resourceWithTimeout = {
  244. resource: resource,
  245. timeout: Date.now() + this._factory.idleTimeoutMillis
  246. };
  247. this._availableObjects.push(resourceWithTimeout);
  248. this._dispense();
  249. this._scheduleRemoveIdle();
  250. }
  251. /**
  252. * @private
  253. */
  254. _ensureMinimum() {
  255. let i, diff;
  256. if (!this._draining && this._count < this._factory.min) {
  257. diff = this._factory.min - this._count;
  258. for (i = 0; i < diff; i++) {
  259. this._createResource();
  260. }
  261. }
  262. }
  263. /**
  264. * Requests a new resource. This will call factory.create to request new resource.
  265. *
  266. * It will be rejected with timeout error if `factory.create` didn't respond
  267. * back within specified `acquireTimeoutMillis`
  268. *
  269. * @returns {Promise<Object>}
  270. */
  271. acquire() {
  272. if (this._draining) {
  273. return Promise.reject(
  274. new Error("pool is draining and cannot accept work")
  275. );
  276. }
  277. const deferred = new Deferred();
  278. deferred.registerTimeout(this._factory.acquireTimeoutMillis, () => {
  279. // timeout triggered, promise will be rejected
  280. // remove this object from pending list
  281. this._pendingAcquires = this._pendingAcquires.filter(
  282. pending => pending !== deferred
  283. );
  284. });
  285. this._pendingAcquires.push(deferred);
  286. this._dispense();
  287. return deferred.promise();
  288. }
  289. /**
  290. * Return the resource to the pool, in case it is no longer required.
  291. *
  292. * @param {Object} resource The acquired object to be put back to the pool.
  293. *
  294. * @returns {void}
  295. */
  296. release(resource) {
  297. // check to see if this object has already been released
  298. // (i.e., is back in the pool of this._availableObjects)
  299. if (
  300. this._availableObjects.some(
  301. resourceWithTimeout => resourceWithTimeout.resource === resource
  302. )
  303. ) {
  304. this._log(
  305. "release called twice for the same resource: " + new Error().stack,
  306. "error"
  307. );
  308. return;
  309. }
  310. // check to see if this object exists in the `in use` list and remove it
  311. const index = this._inUseObjects.indexOf(resource);
  312. if (index < 0) {
  313. this._log(
  314. "attempt to release an invalid resource: " + new Error().stack,
  315. "error"
  316. );
  317. return;
  318. }
  319. this._inUseObjects.splice(index, 1);
  320. this._addResourceToAvailableObjects(resource);
  321. }
  322. /**
  323. * Request the client to be destroyed. The factory's destroy handler
  324. * will also be called.
  325. *
  326. * This should be called within an acquire() block as an alternative to release().
  327. *
  328. * @param {Object} resource The acquired item to be destroyed.
  329. *
  330. * @returns {void}
  331. */
  332. destroy(resource) {
  333. const available = this._availableObjects.length;
  334. const using = this._inUseObjects.length;
  335. this._availableObjects = this._availableObjects.filter(
  336. object => object.resource !== resource
  337. );
  338. this._inUseObjects = this._inUseObjects.filter(
  339. object => object !== resource
  340. );
  341. // resource was not removed, then no need to decrement _count
  342. if (
  343. available === this._availableObjects.length &&
  344. using === this._inUseObjects.length
  345. ) {
  346. this._ensureMinimum();
  347. return;
  348. }
  349. this._count -= 1;
  350. if (this._count < 0) this._count = 0;
  351. this._factory.destroy(resource);
  352. this._ensureMinimum();
  353. }
  354. /**
  355. * Disallow any new requests and let the request backlog dissipate.
  356. *
  357. * @returns {Promise}
  358. */
  359. drain() {
  360. this._log("draining", "info");
  361. // disable the ability to put more work on the queue.
  362. this._draining = true;
  363. const check = callback => {
  364. // wait until all client requests have been satisfied.
  365. if (this._pendingAcquires.length > 0) {
  366. // pool is draining so we wont accept new acquires but
  367. // we need to clear pending acquires
  368. this._dispense();
  369. return setTimeout(() => {
  370. check(callback);
  371. }, 100);
  372. }
  373. // wait until in use object have been released.
  374. if (this._availableObjects.length !== this._count) {
  375. return setTimeout(() => {
  376. check(callback);
  377. }, 100);
  378. }
  379. callback();
  380. };
  381. // No error handling needed here.
  382. return new Promise(resolve => check(resolve));
  383. }
  384. /**
  385. * Forcibly destroys all clients regardless of timeout. Intended to be
  386. * invoked as part of a drain. Does not prevent the creation of new
  387. * clients as a result of subsequent calls to acquire.
  388. *
  389. * Note that if factory.min > 0, the pool will destroy all idle resources
  390. * in the pool, but replace them with newly created resources up to the
  391. * specified factory.min value. If this is not desired, set factory.min
  392. * to zero before calling destroyAllNow()
  393. *
  394. * @returns {Promise}
  395. */
  396. destroyAllNow() {
  397. this._log("force destroying all objects", "info");
  398. const willDie = this._availableObjects.slice();
  399. const todo = willDie.length;
  400. this._removeIdleScheduled = false;
  401. clearTimeout(this._removeIdleTimer);
  402. return new Promise(resolve => {
  403. if (todo === 0) {
  404. return resolve();
  405. }
  406. let resource;
  407. let done = 0;
  408. while ((resource = willDie.shift())) {
  409. this.destroy(resource.resource);
  410. ++done;
  411. if (done === todo && resolve) {
  412. return resolve();
  413. }
  414. }
  415. });
  416. }
  417. }
  418. exports.Pool = Pool;
  419. exports.default = Pool;
  420. exports.TimeoutError = require("./TimeoutError").TimeoutError;