autoPipelining.js 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const PromiseContainer = require("./promiseContainer");
  4. const lodash_1 = require("./utils/lodash");
  5. const calculateSlot = require("cluster-key-slot");
  6. const standard_as_callback_1 = require("standard-as-callback");
  7. exports.kExec = Symbol("exec");
  8. exports.kCallbacks = Symbol("callbacks");
  9. exports.notAllowedAutoPipelineCommands = [
  10. "auth",
  11. "info",
  12. "script",
  13. "quit",
  14. "cluster",
  15. "pipeline",
  16. "multi",
  17. "subscribe",
  18. "psubscribe",
  19. "unsubscribe",
  20. "unpsubscribe",
  21. ];
  22. function executeAutoPipeline(client, slotKey) {
  23. /*
  24. If a pipeline is already executing, keep queueing up commands
  25. since ioredis won't serve two pipelines at the same time
  26. */
  27. if (client._runningAutoPipelines.has(slotKey)) {
  28. return;
  29. }
  30. if (!client._autoPipelines.has(slotKey)) {
  31. /*
  32. Rare edge case. Somehow, something has deleted this running autopipeline in an immediate
  33. call to executeAutoPipeline.
  34. Maybe the callback in the pipeline.exec is sometimes called in the same tick,
  35. e.g. if redis is disconnected?
  36. */
  37. return;
  38. }
  39. client._runningAutoPipelines.add(slotKey);
  40. // Get the pipeline and immediately delete it so that new commands are queued on a new pipeline
  41. const pipeline = client._autoPipelines.get(slotKey);
  42. client._autoPipelines.delete(slotKey);
  43. const callbacks = pipeline[exports.kCallbacks];
  44. // Stop keeping a reference to callbacks immediately after the callbacks stop being used.
  45. // This allows the GC to reclaim objects referenced by callbacks, especially with 16384 slots
  46. // in Redis.Cluster
  47. pipeline[exports.kCallbacks] = null;
  48. // Perform the call
  49. pipeline.exec(function (err, results) {
  50. client._runningAutoPipelines.delete(slotKey);
  51. /*
  52. Invoke all callback in nextTick so the stack is cleared
  53. and callbacks can throw errors without affecting other callbacks.
  54. */
  55. if (err) {
  56. for (let i = 0; i < callbacks.length; i++) {
  57. process.nextTick(callbacks[i], err);
  58. }
  59. }
  60. else {
  61. for (let i = 0; i < callbacks.length; i++) {
  62. process.nextTick(callbacks[i], ...results[i]);
  63. }
  64. }
  65. // If there is another pipeline on the same node, immediately execute it without waiting for nextTick
  66. if (client._autoPipelines.has(slotKey)) {
  67. executeAutoPipeline(client, slotKey);
  68. }
  69. });
  70. }
  71. function shouldUseAutoPipelining(client, functionName, commandName) {
  72. return (functionName &&
  73. client.options.enableAutoPipelining &&
  74. !client.isPipeline &&
  75. !exports.notAllowedAutoPipelineCommands.includes(commandName) &&
  76. !client.options.autoPipeliningIgnoredCommands.includes(commandName));
  77. }
  78. exports.shouldUseAutoPipelining = shouldUseAutoPipelining;
  79. /**
  80. * @private
  81. */
  82. function getFirstValueInFlattenedArray(args) {
  83. for (let i = 0; i < args.length; i++) {
  84. const arg = args[i];
  85. if (typeof arg === "string") {
  86. return arg;
  87. }
  88. else if (Array.isArray(arg) || lodash_1.isArguments(arg)) {
  89. if (arg.length === 0) {
  90. continue;
  91. }
  92. return arg[0];
  93. }
  94. const flattened = lodash_1.flatten([arg]);
  95. if (flattened.length > 0) {
  96. return flattened[0];
  97. }
  98. }
  99. return undefined;
  100. }
  101. exports.getFirstValueInFlattenedArray = getFirstValueInFlattenedArray;
  102. function executeWithAutoPipelining(client, functionName, commandName, args, callback) {
  103. const CustomPromise = PromiseContainer.get();
  104. // On cluster mode let's wait for slots to be available
  105. if (client.isCluster && !client.slots.length) {
  106. if (client.status === "wait")
  107. client.connect().catch(lodash_1.noop);
  108. return standard_as_callback_1.default(new CustomPromise(function (resolve, reject) {
  109. client.delayUntilReady((err) => {
  110. if (err) {
  111. reject(err);
  112. return;
  113. }
  114. executeWithAutoPipelining(client, functionName, commandName, args, null).then(resolve, reject);
  115. });
  116. }), callback);
  117. }
  118. // If we have slot information, we can improve routing by grouping slots served by the same subset of nodes
  119. // Note that the first value in args may be a (possibly empty) array.
  120. // ioredis will only flatten one level of the array, in the Command constructor.
  121. const prefix = client.options.keyPrefix || "";
  122. const slotKey = client.isCluster
  123. ? client.slots[calculateSlot(`${prefix}${getFirstValueInFlattenedArray(args)}`)].join(",")
  124. : "main";
  125. if (!client._autoPipelines.has(slotKey)) {
  126. const pipeline = client.pipeline();
  127. pipeline[exports.kExec] = false;
  128. pipeline[exports.kCallbacks] = [];
  129. client._autoPipelines.set(slotKey, pipeline);
  130. }
  131. const pipeline = client._autoPipelines.get(slotKey);
  132. /*
  133. Mark the pipeline as scheduled.
  134. The symbol will make sure that the pipeline is only scheduled once per tick.
  135. New commands are appended to an already scheduled pipeline.
  136. */
  137. if (!pipeline[exports.kExec]) {
  138. pipeline[exports.kExec] = true;
  139. /*
  140. Deferring with setImmediate so we have a chance to capture multiple
  141. commands that can be scheduled by I/O events already in the event loop queue.
  142. */
  143. setImmediate(executeAutoPipeline, client, slotKey);
  144. }
  145. // Create the promise which will execute the command in the pipeline.
  146. const autoPipelinePromise = new CustomPromise(function (resolve, reject) {
  147. pipeline[exports.kCallbacks].push(function (err, value) {
  148. if (err) {
  149. reject(err);
  150. return;
  151. }
  152. resolve(value);
  153. });
  154. pipeline[functionName](...args);
  155. });
  156. return standard_as_callback_1.default(autoPipelinePromise, callback);
  157. }
  158. exports.executeWithAutoPipelining = executeWithAutoPipelining;