event_handler.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const redis_errors_1 = require("redis-errors");
  4. const command_1 = require("../command");
  5. const errors_1 = require("../errors");
  6. const utils_1 = require("../utils");
  7. const DataHandler_1 = require("../DataHandler");
  8. const debug = utils_1.Debug("connection");
  9. function connectHandler(self) {
  10. return function () {
  11. self.setStatus("connect");
  12. self.resetCommandQueue();
  13. // AUTH command should be processed before any other commands
  14. let flushed = false;
  15. const { connectionEpoch } = self;
  16. if (self.condition.auth) {
  17. self.auth(self.condition.auth, function (err) {
  18. if (connectionEpoch !== self.connectionEpoch) {
  19. return;
  20. }
  21. if (err) {
  22. if (err.message.indexOf("no password is set") !== -1) {
  23. console.warn("[WARN] Redis server does not require a password, but a password was supplied.");
  24. }
  25. else if (err.message.indexOf("without any password configured for the default user") !== -1) {
  26. console.warn("[WARN] This Redis server's `default` user does not require a password, but a password was supplied");
  27. }
  28. else if (err.message.indexOf("wrong number of arguments for 'auth' command") !== -1) {
  29. console.warn(`[ERROR] The server returned "wrong number of arguments for 'auth' command". You are probably passing both username and password to Redis version 5 or below. You should only pass the 'password' option for Redis version 5 and under.`);
  30. }
  31. else {
  32. flushed = true;
  33. self.recoverFromFatalError(err, err);
  34. }
  35. }
  36. });
  37. }
  38. if (self.condition.select) {
  39. self.select(self.condition.select).catch((err) => {
  40. // If the node is in cluster mode, select is disallowed.
  41. // In this case, reconnect won't help.
  42. self.silentEmit("error", err);
  43. });
  44. }
  45. if (!self.options.enableReadyCheck) {
  46. exports.readyHandler(self)();
  47. }
  48. /*
  49. No need to keep the reference of DataHandler here
  50. because we don't need to do the cleanup.
  51. `Stream#end()` will remove all listeners for us.
  52. */
  53. new DataHandler_1.default(self, {
  54. stringNumbers: self.options.stringNumbers,
  55. dropBufferSupport: self.options.dropBufferSupport,
  56. });
  57. if (self.options.enableReadyCheck) {
  58. self._readyCheck(function (err, info) {
  59. if (connectionEpoch !== self.connectionEpoch) {
  60. return;
  61. }
  62. if (err) {
  63. if (!flushed) {
  64. self.recoverFromFatalError(new Error("Ready check failed: " + err.message), err);
  65. }
  66. }
  67. else {
  68. self.serverInfo = info;
  69. if (self.connector.check(info)) {
  70. exports.readyHandler(self)();
  71. }
  72. else {
  73. self.disconnect(true);
  74. }
  75. }
  76. });
  77. }
  78. };
  79. }
  80. exports.connectHandler = connectHandler;
  81. function abortError(command) {
  82. const err = new redis_errors_1.AbortError("Command aborted due to connection close");
  83. err.command = {
  84. name: command.name,
  85. args: command.args,
  86. };
  87. return err;
  88. }
  89. // If a contiguous set of pipeline commands starts from index zero then they
  90. // can be safely reattempted. If however we have a chain of pipelined commands
  91. // starting at index 1 or more it means we received a partial response before
  92. // the connection close and those pipelined commands must be aborted. For
  93. // example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
  94. // aborting and purging we'll have a queue that looks like this: [0, 1, 2]
  95. function abortIncompletePipelines(commandQueue) {
  96. let expectedIndex = 0;
  97. for (let i = 0; i < commandQueue.length;) {
  98. const command = commandQueue.peekAt(i).command;
  99. const pipelineIndex = command.pipelineIndex;
  100. if (pipelineIndex === undefined || pipelineIndex === 0) {
  101. expectedIndex = 0;
  102. }
  103. if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
  104. commandQueue.remove(i, 1);
  105. command.reject(abortError(command));
  106. continue;
  107. }
  108. i++;
  109. }
  110. }
  111. // If only a partial transaction result was received before connection close,
  112. // we have to abort any transaction fragments that may have ended up in the
  113. // offline queue
  114. function abortTransactionFragments(commandQueue) {
  115. for (let i = 0; i < commandQueue.length;) {
  116. const command = commandQueue.peekAt(i).command;
  117. if (command.name === "multi") {
  118. break;
  119. }
  120. if (command.name === "exec") {
  121. commandQueue.remove(i, 1);
  122. command.reject(abortError(command));
  123. break;
  124. }
  125. if (command.inTransaction) {
  126. commandQueue.remove(i, 1);
  127. command.reject(abortError(command));
  128. }
  129. else {
  130. i++;
  131. }
  132. }
  133. }
  134. function closeHandler(self) {
  135. return function () {
  136. self.setStatus("close");
  137. if (!self.prevCondition) {
  138. self.prevCondition = self.condition;
  139. }
  140. if (self.commandQueue.length) {
  141. abortIncompletePipelines(self.commandQueue);
  142. self.prevCommandQueue = self.commandQueue;
  143. }
  144. if (self.offlineQueue.length) {
  145. abortTransactionFragments(self.offlineQueue);
  146. }
  147. self.clearAddedScriptHashesCleanInterval();
  148. if (self.manuallyClosing) {
  149. self.manuallyClosing = false;
  150. debug("skip reconnecting since the connection is manually closed.");
  151. return close();
  152. }
  153. if (typeof self.options.retryStrategy !== "function") {
  154. debug("skip reconnecting because `retryStrategy` is not a function");
  155. return close();
  156. }
  157. const retryDelay = self.options.retryStrategy(++self.retryAttempts);
  158. if (typeof retryDelay !== "number") {
  159. debug("skip reconnecting because `retryStrategy` doesn't return a number");
  160. return close();
  161. }
  162. debug("reconnect in %sms", retryDelay);
  163. self.setStatus("reconnecting", retryDelay);
  164. self.reconnectTimeout = setTimeout(function () {
  165. self.reconnectTimeout = null;
  166. self.connect().catch(utils_1.noop);
  167. }, retryDelay);
  168. const { maxRetriesPerRequest } = self.options;
  169. if (typeof maxRetriesPerRequest === "number") {
  170. if (maxRetriesPerRequest < 0) {
  171. debug("maxRetriesPerRequest is negative, ignoring...");
  172. }
  173. else {
  174. const remainder = self.retryAttempts % (maxRetriesPerRequest + 1);
  175. if (remainder === 0) {
  176. debug("reach maxRetriesPerRequest limitation, flushing command queue...");
  177. self.flushQueue(new errors_1.MaxRetriesPerRequestError(maxRetriesPerRequest));
  178. }
  179. }
  180. }
  181. };
  182. function close() {
  183. self.setStatus("end");
  184. self.flushQueue(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  185. }
  186. }
  187. exports.closeHandler = closeHandler;
  188. function errorHandler(self) {
  189. return function (error) {
  190. debug("error: %s", error);
  191. self.silentEmit("error", error);
  192. };
  193. }
  194. exports.errorHandler = errorHandler;
  195. function readyHandler(self) {
  196. return function () {
  197. self.setStatus("ready");
  198. self.retryAttempts = 0;
  199. if (self.options.monitor) {
  200. self.call("monitor");
  201. const { sendCommand } = self;
  202. self.sendCommand = function (command) {
  203. if (command_1.default.checkFlag("VALID_IN_MONITOR_MODE", command.name)) {
  204. return sendCommand.call(self, command);
  205. }
  206. command.reject(new Error("Connection is in monitoring mode, can't process commands."));
  207. return command.promise;
  208. };
  209. self.once("close", function () {
  210. delete self.sendCommand;
  211. });
  212. self.setStatus("monitoring");
  213. return;
  214. }
  215. const finalSelect = self.prevCondition
  216. ? self.prevCondition.select
  217. : self.condition.select;
  218. if (self.options.connectionName) {
  219. debug("set the connection name [%s]", self.options.connectionName);
  220. self.client("setname", self.options.connectionName).catch(utils_1.noop);
  221. }
  222. if (self.options.readOnly) {
  223. debug("set the connection to readonly mode");
  224. self.readonly().catch(utils_1.noop);
  225. }
  226. if (self.prevCondition) {
  227. const condition = self.prevCondition;
  228. self.prevCondition = null;
  229. if (condition.subscriber && self.options.autoResubscribe) {
  230. // We re-select the previous db first since
  231. // `SELECT` command is not valid in sub mode.
  232. if (self.condition.select !== finalSelect) {
  233. debug("connect to db [%d]", finalSelect);
  234. self.select(finalSelect);
  235. }
  236. const subscribeChannels = condition.subscriber.channels("subscribe");
  237. if (subscribeChannels.length) {
  238. debug("subscribe %d channels", subscribeChannels.length);
  239. self.subscribe(subscribeChannels);
  240. }
  241. const psubscribeChannels = condition.subscriber.channels("psubscribe");
  242. if (psubscribeChannels.length) {
  243. debug("psubscribe %d channels", psubscribeChannels.length);
  244. self.psubscribe(psubscribeChannels);
  245. }
  246. }
  247. }
  248. if (self.prevCommandQueue) {
  249. if (self.options.autoResendUnfulfilledCommands) {
  250. debug("resend %d unfulfilled commands", self.prevCommandQueue.length);
  251. while (self.prevCommandQueue.length > 0) {
  252. const item = self.prevCommandQueue.shift();
  253. if (item.select !== self.condition.select &&
  254. item.command.name !== "select") {
  255. self.select(item.select);
  256. }
  257. self.sendCommand(item.command, item.stream);
  258. }
  259. }
  260. else {
  261. self.prevCommandQueue = null;
  262. }
  263. }
  264. if (self.offlineQueue.length) {
  265. debug("send %d commands in offline queue", self.offlineQueue.length);
  266. const offlineQueue = self.offlineQueue;
  267. self.resetOfflineQueue();
  268. while (offlineQueue.length > 0) {
  269. const item = offlineQueue.shift();
  270. if (item.select !== self.condition.select &&
  271. item.command.name !== "select") {
  272. self.select(item.select);
  273. }
  274. self.sendCommand(item.command, item.stream);
  275. }
  276. }
  277. if (self.condition.select !== finalSelect) {
  278. debug("connect to db [%d]", finalSelect);
  279. self.select(finalSelect);
  280. }
  281. };
  282. }
  283. exports.readyHandler = readyHandler;