index.js 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const events_1 = require("events");
  4. const ClusterAllFailedError_1 = require("../errors/ClusterAllFailedError");
  5. const utils_1 = require("../utils");
  6. const ConnectionPool_1 = require("./ConnectionPool");
  7. const util_1 = require("./util");
  8. const ClusterSubscriber_1 = require("./ClusterSubscriber");
  9. const DelayQueue_1 = require("./DelayQueue");
  10. const ScanStream_1 = require("../ScanStream");
  11. const redis_errors_1 = require("redis-errors");
  12. const standard_as_callback_1 = require("standard-as-callback");
  13. const PromiseContainer = require("../promiseContainer");
  14. const ClusterOptions_1 = require("./ClusterOptions");
  15. const utils_2 = require("../utils");
  16. const commands = require("redis-commands");
  17. const command_1 = require("../command");
  18. const redis_1 = require("../redis");
  19. const commander_1 = require("../commander");
  20. const Deque = require("denque");
  21. const debug = utils_1.Debug("cluster");
  22. /**
  23. * Client for the official Redis Cluster
  24. *
  25. * @class Cluster
  26. * @extends {EventEmitter}
  27. */
  28. class Cluster extends events_1.EventEmitter {
  29. /**
  30. * Creates an instance of Cluster.
  31. *
  32. * @param {((string | number | object)[])} startupNodes
  33. * @param {IClusterOptions} [options={}]
  34. * @memberof Cluster
  35. */
  36. constructor(startupNodes, options = {}) {
  37. super();
  38. this.slots = [];
  39. this.retryAttempts = 0;
  40. this.delayQueue = new DelayQueue_1.default();
  41. this.offlineQueue = new Deque();
  42. this.isRefreshing = false;
  43. this.isCluster = true;
  44. this._autoPipelines = new Map();
  45. this._groupsIds = {};
  46. this._groupsBySlot = Array(16384);
  47. this._runningAutoPipelines = new Set();
  48. this._readyDelayedCallbacks = [];
  49. this._addedScriptHashes = {};
  50. /**
  51. * Every time Cluster#connect() is called, this value will be
  52. * auto-incrementing. The purpose of this value is used for
  53. * discarding previous connect attampts when creating a new
  54. * connection.
  55. *
  56. * @private
  57. * @type {number}
  58. * @memberof Cluster
  59. */
  60. this.connectionEpoch = 0;
  61. commander_1.default.call(this);
  62. this.startupNodes = startupNodes;
  63. this.options = utils_1.defaults({}, options, ClusterOptions_1.DEFAULT_CLUSTER_OPTIONS, this.options);
  64. // validate options
  65. if (typeof this.options.scaleReads !== "function" &&
  66. ["all", "master", "slave"].indexOf(this.options.scaleReads) === -1) {
  67. throw new Error('Invalid option scaleReads "' +
  68. this.options.scaleReads +
  69. '". Expected "all", "master", "slave" or a custom function');
  70. }
  71. this.connectionPool = new ConnectionPool_1.default(this.options.redisOptions);
  72. this.connectionPool.on("-node", (redis, key) => {
  73. this.emit("-node", redis);
  74. });
  75. this.connectionPool.on("+node", (redis) => {
  76. this.emit("+node", redis);
  77. });
  78. this.connectionPool.on("drain", () => {
  79. this.setStatus("close");
  80. });
  81. this.connectionPool.on("nodeError", (error, key) => {
  82. this.emit("node error", error, key);
  83. });
  84. this.subscriber = new ClusterSubscriber_1.default(this.connectionPool, this);
  85. if (this.options.lazyConnect) {
  86. this.setStatus("wait");
  87. }
  88. else {
  89. this.connect().catch((err) => {
  90. debug("connecting failed: %s", err);
  91. });
  92. }
  93. }
  94. resetOfflineQueue() {
  95. this.offlineQueue = new Deque();
  96. }
  97. clearNodesRefreshInterval() {
  98. if (this.slotsTimer) {
  99. clearTimeout(this.slotsTimer);
  100. this.slotsTimer = null;
  101. }
  102. }
  103. clearAddedScriptHashesCleanInterval() {
  104. if (this._addedScriptHashesCleanInterval) {
  105. clearInterval(this._addedScriptHashesCleanInterval);
  106. this._addedScriptHashesCleanInterval = null;
  107. }
  108. }
  109. resetNodesRefreshInterval() {
  110. if (this.slotsTimer) {
  111. return;
  112. }
  113. const nextRound = () => {
  114. this.slotsTimer = setTimeout(() => {
  115. debug('refreshing slot caches... (triggered by "slotsRefreshInterval" option)');
  116. this.refreshSlotsCache(() => {
  117. nextRound();
  118. });
  119. }, this.options.slotsRefreshInterval);
  120. };
  121. nextRound();
  122. }
  123. /**
  124. * Connect to a cluster
  125. *
  126. * @returns {Promise<void>}
  127. * @memberof Cluster
  128. */
  129. connect() {
  130. const Promise = PromiseContainer.get();
  131. return new Promise((resolve, reject) => {
  132. if (this.status === "connecting" ||
  133. this.status === "connect" ||
  134. this.status === "ready") {
  135. reject(new Error("Redis is already connecting/connected"));
  136. return;
  137. }
  138. // Make sure only one timer is active at a time
  139. this.clearAddedScriptHashesCleanInterval();
  140. // Start the script cache cleaning
  141. this._addedScriptHashesCleanInterval = setInterval(() => {
  142. this._addedScriptHashes = {};
  143. }, this.options.maxScriptsCachingTime);
  144. const epoch = ++this.connectionEpoch;
  145. this.setStatus("connecting");
  146. this.resolveStartupNodeHostnames()
  147. .then((nodes) => {
  148. if (this.connectionEpoch !== epoch) {
  149. debug("discard connecting after resolving startup nodes because epoch not match: %d != %d", epoch, this.connectionEpoch);
  150. reject(new redis_errors_1.RedisError("Connection is discarded because a new connection is made"));
  151. return;
  152. }
  153. if (this.status !== "connecting") {
  154. debug("discard connecting after resolving startup nodes because the status changed to %s", this.status);
  155. reject(new redis_errors_1.RedisError("Connection is aborted"));
  156. return;
  157. }
  158. this.connectionPool.reset(nodes);
  159. function readyHandler() {
  160. this.setStatus("ready");
  161. this.retryAttempts = 0;
  162. this.executeOfflineCommands();
  163. this.resetNodesRefreshInterval();
  164. resolve();
  165. }
  166. let closeListener = undefined;
  167. const refreshListener = () => {
  168. this.invokeReadyDelayedCallbacks(undefined);
  169. this.removeListener("close", closeListener);
  170. this.manuallyClosing = false;
  171. this.setStatus("connect");
  172. if (this.options.enableReadyCheck) {
  173. this.readyCheck((err, fail) => {
  174. if (err || fail) {
  175. debug("Ready check failed (%s). Reconnecting...", err || fail);
  176. if (this.status === "connect") {
  177. this.disconnect(true);
  178. }
  179. }
  180. else {
  181. readyHandler.call(this);
  182. }
  183. });
  184. }
  185. else {
  186. readyHandler.call(this);
  187. }
  188. };
  189. closeListener = function () {
  190. const error = new Error("None of startup nodes is available");
  191. this.removeListener("refresh", refreshListener);
  192. this.invokeReadyDelayedCallbacks(error);
  193. reject(error);
  194. };
  195. this.once("refresh", refreshListener);
  196. this.once("close", closeListener);
  197. this.once("close", this.handleCloseEvent.bind(this));
  198. this.refreshSlotsCache(function (err) {
  199. if (err && err.message === "Failed to refresh slots cache.") {
  200. redis_1.default.prototype.silentEmit.call(this, "error", err);
  201. this.connectionPool.reset([]);
  202. }
  203. }.bind(this));
  204. this.subscriber.start();
  205. })
  206. .catch((err) => {
  207. this.setStatus("close");
  208. this.handleCloseEvent(err);
  209. this.invokeReadyDelayedCallbacks(err);
  210. reject(err);
  211. });
  212. });
  213. }
  214. /**
  215. * Called when closed to check whether a reconnection should be made
  216. *
  217. * @private
  218. * @memberof Cluster
  219. */
  220. handleCloseEvent(reason) {
  221. if (reason) {
  222. debug("closed because %s", reason);
  223. }
  224. this.clearAddedScriptHashesCleanInterval();
  225. let retryDelay;
  226. if (!this.manuallyClosing &&
  227. typeof this.options.clusterRetryStrategy === "function") {
  228. retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts, reason);
  229. }
  230. if (typeof retryDelay === "number") {
  231. this.setStatus("reconnecting");
  232. this.reconnectTimeout = setTimeout(function () {
  233. this.reconnectTimeout = null;
  234. debug("Cluster is disconnected. Retrying after %dms", retryDelay);
  235. this.connect().catch(function (err) {
  236. debug("Got error %s when reconnecting. Ignoring...", err);
  237. });
  238. }.bind(this), retryDelay);
  239. }
  240. else {
  241. this.setStatus("end");
  242. this.flushQueue(new Error("None of startup nodes is available"));
  243. }
  244. }
  245. /**
  246. * Disconnect from every node in the cluster.
  247. *
  248. * @param {boolean} [reconnect=false]
  249. * @memberof Cluster
  250. */
  251. disconnect(reconnect = false) {
  252. const status = this.status;
  253. this.setStatus("disconnecting");
  254. this.clearAddedScriptHashesCleanInterval();
  255. if (!reconnect) {
  256. this.manuallyClosing = true;
  257. }
  258. if (this.reconnectTimeout && !reconnect) {
  259. clearTimeout(this.reconnectTimeout);
  260. this.reconnectTimeout = null;
  261. debug("Canceled reconnecting attempts");
  262. }
  263. this.clearNodesRefreshInterval();
  264. this.subscriber.stop();
  265. if (status === "wait") {
  266. this.setStatus("close");
  267. this.handleCloseEvent();
  268. }
  269. else {
  270. this.connectionPool.reset([]);
  271. }
  272. }
  273. /**
  274. * Quit the cluster gracefully.
  275. *
  276. * @param {CallbackFunction<'OK'>} [callback]
  277. * @returns {Promise<'OK'>}
  278. * @memberof Cluster
  279. */
  280. quit(callback) {
  281. const status = this.status;
  282. this.setStatus("disconnecting");
  283. this.clearAddedScriptHashesCleanInterval();
  284. this.manuallyClosing = true;
  285. if (this.reconnectTimeout) {
  286. clearTimeout(this.reconnectTimeout);
  287. this.reconnectTimeout = null;
  288. }
  289. this.clearNodesRefreshInterval();
  290. this.subscriber.stop();
  291. const Promise = PromiseContainer.get();
  292. if (status === "wait") {
  293. const ret = standard_as_callback_1.default(Promise.resolve("OK"), callback);
  294. // use setImmediate to make sure "close" event
  295. // being emitted after quit() is returned
  296. setImmediate(function () {
  297. this.setStatus("close");
  298. this.handleCloseEvent();
  299. }.bind(this));
  300. return ret;
  301. }
  302. return standard_as_callback_1.default(Promise.all(this.nodes().map((node) => node.quit().catch((err) => {
  303. // Ignore the error caused by disconnecting since
  304. // we're disconnecting...
  305. if (err.message === utils_2.CONNECTION_CLOSED_ERROR_MSG) {
  306. return "OK";
  307. }
  308. throw err;
  309. }))).then(() => "OK"), callback);
  310. }
  311. /**
  312. * Create a new instance with the same startup nodes and options as the current one.
  313. *
  314. * @example
  315. * ```js
  316. * var cluster = new Redis.Cluster([{ host: "127.0.0.1", port: "30001" }]);
  317. * var anotherCluster = cluster.duplicate();
  318. * ```
  319. *
  320. * @public
  321. * @param {((string | number | object)[])} [overrideStartupNodes=[]]
  322. * @param {IClusterOptions} [overrideOptions={}]
  323. * @memberof Cluster
  324. */
  325. duplicate(overrideStartupNodes = [], overrideOptions = {}) {
  326. const startupNodes = overrideStartupNodes.length > 0
  327. ? overrideStartupNodes
  328. : this.startupNodes.slice(0);
  329. const options = Object.assign({}, this.options, overrideOptions);
  330. return new Cluster(startupNodes, options);
  331. }
  332. /**
  333. * Get nodes with the specified role
  334. *
  335. * @param {NodeRole} [role='all']
  336. * @returns {any[]}
  337. * @memberof Cluster
  338. */
  339. nodes(role = "all") {
  340. if (role !== "all" && role !== "master" && role !== "slave") {
  341. throw new Error('Invalid role "' + role + '". Expected "all", "master" or "slave"');
  342. }
  343. return this.connectionPool.getNodes(role);
  344. }
  345. // This is needed in order not to install a listener for each auto pipeline
  346. delayUntilReady(callback) {
  347. this._readyDelayedCallbacks.push(callback);
  348. }
  349. /**
  350. * Get the number of commands queued in automatic pipelines.
  351. *
  352. * This is not available (and returns 0) until the cluster is connected and slots information have been received.
  353. */
  354. get autoPipelineQueueSize() {
  355. let queued = 0;
  356. for (const pipeline of this._autoPipelines.values()) {
  357. queued += pipeline.length;
  358. }
  359. return queued;
  360. }
  361. /**
  362. * Change cluster instance's status
  363. *
  364. * @private
  365. * @param {ClusterStatus} status
  366. * @memberof Cluster
  367. */
  368. setStatus(status) {
  369. debug("status: %s -> %s", this.status || "[empty]", status);
  370. this.status = status;
  371. process.nextTick(() => {
  372. this.emit(status);
  373. });
  374. }
  375. /**
  376. * Refresh the slot cache
  377. *
  378. * @private
  379. * @param {CallbackFunction} [callback]
  380. * @memberof Cluster
  381. */
  382. refreshSlotsCache(callback) {
  383. if (this.isRefreshing) {
  384. if (typeof callback === "function") {
  385. process.nextTick(callback);
  386. }
  387. return;
  388. }
  389. this.isRefreshing = true;
  390. const _this = this;
  391. const wrapper = function (error) {
  392. _this.isRefreshing = false;
  393. if (typeof callback === "function") {
  394. callback(error);
  395. }
  396. };
  397. const nodes = utils_2.shuffle(this.connectionPool.getNodes());
  398. let lastNodeError = null;
  399. function tryNode(index) {
  400. if (index === nodes.length) {
  401. const error = new ClusterAllFailedError_1.default("Failed to refresh slots cache.", lastNodeError);
  402. return wrapper(error);
  403. }
  404. const node = nodes[index];
  405. const key = `${node.options.host}:${node.options.port}`;
  406. debug("getting slot cache from %s", key);
  407. _this.getInfoFromNode(node, function (err) {
  408. switch (_this.status) {
  409. case "close":
  410. case "end":
  411. return wrapper(new Error("Cluster is disconnected."));
  412. case "disconnecting":
  413. return wrapper(new Error("Cluster is disconnecting."));
  414. }
  415. if (err) {
  416. _this.emit("node error", err, key);
  417. lastNodeError = err;
  418. tryNode(index + 1);
  419. }
  420. else {
  421. _this.emit("refresh");
  422. wrapper();
  423. }
  424. });
  425. }
  426. tryNode(0);
  427. }
  428. /**
  429. * Flush offline queue with error.
  430. *
  431. * @param {Error} error
  432. * @memberof Cluster
  433. */
  434. flushQueue(error) {
  435. let item;
  436. while (this.offlineQueue.length > 0) {
  437. item = this.offlineQueue.shift();
  438. item.command.reject(error);
  439. }
  440. }
  441. executeOfflineCommands() {
  442. if (this.offlineQueue.length) {
  443. debug("send %d commands in offline queue", this.offlineQueue.length);
  444. const offlineQueue = this.offlineQueue;
  445. this.resetOfflineQueue();
  446. while (offlineQueue.length > 0) {
  447. const item = offlineQueue.shift();
  448. this.sendCommand(item.command, item.stream, item.node);
  449. }
  450. }
  451. }
  452. natMapper(nodeKey) {
  453. if (this.options.natMap && typeof this.options.natMap === "object") {
  454. const key = typeof nodeKey === "string"
  455. ? nodeKey
  456. : `${nodeKey.host}:${nodeKey.port}`;
  457. const mapped = this.options.natMap[key];
  458. if (mapped) {
  459. debug("NAT mapping %s -> %O", key, mapped);
  460. return Object.assign({}, mapped);
  461. }
  462. }
  463. return typeof nodeKey === "string"
  464. ? util_1.nodeKeyToRedisOptions(nodeKey)
  465. : nodeKey;
  466. }
  467. sendCommand(command, stream, node) {
  468. if (this.status === "wait") {
  469. this.connect().catch(utils_1.noop);
  470. }
  471. if (this.status === "end") {
  472. command.reject(new Error(utils_2.CONNECTION_CLOSED_ERROR_MSG));
  473. return command.promise;
  474. }
  475. let to = this.options.scaleReads;
  476. if (to !== "master") {
  477. const isCommandReadOnly = command.isReadOnly ||
  478. (commands.exists(command.name) &&
  479. commands.hasFlag(command.name, "readonly"));
  480. if (!isCommandReadOnly) {
  481. to = "master";
  482. }
  483. }
  484. let targetSlot = node ? node.slot : command.getSlot();
  485. const ttl = {};
  486. const _this = this;
  487. if (!node && !command.__is_reject_overwritten) {
  488. // eslint-disable-next-line @typescript-eslint/camelcase
  489. command.__is_reject_overwritten = true;
  490. const reject = command.reject;
  491. command.reject = function (err) {
  492. const partialTry = tryConnection.bind(null, true);
  493. _this.handleError(err, ttl, {
  494. moved: function (slot, key) {
  495. debug("command %s is moved to %s", command.name, key);
  496. targetSlot = Number(slot);
  497. if (_this.slots[slot]) {
  498. _this.slots[slot][0] = key;
  499. }
  500. else {
  501. _this.slots[slot] = [key];
  502. }
  503. _this._groupsBySlot[slot] =
  504. _this._groupsIds[_this.slots[slot].join(";")];
  505. _this.connectionPool.findOrCreate(_this.natMapper(key));
  506. tryConnection();
  507. debug("refreshing slot caches... (triggered by MOVED error)");
  508. _this.refreshSlotsCache();
  509. },
  510. ask: function (slot, key) {
  511. debug("command %s is required to ask %s:%s", command.name, key);
  512. const mapped = _this.natMapper(key);
  513. _this.connectionPool.findOrCreate(mapped);
  514. tryConnection(false, `${mapped.host}:${mapped.port}`);
  515. },
  516. tryagain: partialTry,
  517. clusterDown: partialTry,
  518. connectionClosed: partialTry,
  519. maxRedirections: function (redirectionError) {
  520. reject.call(command, redirectionError);
  521. },
  522. defaults: function () {
  523. reject.call(command, err);
  524. },
  525. });
  526. };
  527. }
  528. tryConnection();
  529. function tryConnection(random, asking) {
  530. if (_this.status === "end") {
  531. command.reject(new redis_errors_1.AbortError("Cluster is ended."));
  532. return;
  533. }
  534. let redis;
  535. if (_this.status === "ready" || command.name === "cluster") {
  536. if (node && node.redis) {
  537. redis = node.redis;
  538. }
  539. else if (command_1.default.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
  540. command_1.default.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)) {
  541. redis = _this.subscriber.getInstance();
  542. if (!redis) {
  543. command.reject(new redis_errors_1.AbortError("No subscriber for the cluster"));
  544. return;
  545. }
  546. }
  547. else {
  548. if (!random) {
  549. if (typeof targetSlot === "number" && _this.slots[targetSlot]) {
  550. const nodeKeys = _this.slots[targetSlot];
  551. if (typeof to === "function") {
  552. const nodes = nodeKeys.map(function (key) {
  553. return _this.connectionPool.getInstanceByKey(key);
  554. });
  555. redis = to(nodes, command);
  556. if (Array.isArray(redis)) {
  557. redis = utils_2.sample(redis);
  558. }
  559. if (!redis) {
  560. redis = nodes[0];
  561. }
  562. }
  563. else {
  564. let key;
  565. if (to === "all") {
  566. key = utils_2.sample(nodeKeys);
  567. }
  568. else if (to === "slave" && nodeKeys.length > 1) {
  569. key = utils_2.sample(nodeKeys, 1);
  570. }
  571. else {
  572. key = nodeKeys[0];
  573. }
  574. redis = _this.connectionPool.getInstanceByKey(key);
  575. }
  576. }
  577. if (asking) {
  578. redis = _this.connectionPool.getInstanceByKey(asking);
  579. redis.asking();
  580. }
  581. }
  582. if (!redis) {
  583. redis =
  584. (typeof to === "function"
  585. ? null
  586. : _this.connectionPool.getSampleInstance(to)) ||
  587. _this.connectionPool.getSampleInstance("all");
  588. }
  589. }
  590. if (node && !node.redis) {
  591. node.redis = redis;
  592. }
  593. }
  594. if (redis) {
  595. redis.sendCommand(command, stream);
  596. }
  597. else if (_this.options.enableOfflineQueue) {
  598. _this.offlineQueue.push({
  599. command: command,
  600. stream: stream,
  601. node: node,
  602. });
  603. }
  604. else {
  605. command.reject(new Error("Cluster isn't ready and enableOfflineQueue options is false"));
  606. }
  607. }
  608. return command.promise;
  609. }
  610. handleError(error, ttl, handlers) {
  611. if (typeof ttl.value === "undefined") {
  612. ttl.value = this.options.maxRedirections;
  613. }
  614. else {
  615. ttl.value -= 1;
  616. }
  617. if (ttl.value <= 0) {
  618. handlers.maxRedirections(new Error("Too many Cluster redirections. Last error: " + error));
  619. return;
  620. }
  621. const errv = error.message.split(" ");
  622. if (errv[0] === "MOVED") {
  623. const timeout = this.options.retryDelayOnMoved;
  624. if (timeout && typeof timeout === "number") {
  625. this.delayQueue.push("moved", handlers.moved.bind(null, errv[1], errv[2]), { timeout });
  626. }
  627. else {
  628. handlers.moved(errv[1], errv[2]);
  629. }
  630. }
  631. else if (errv[0] === "ASK") {
  632. handlers.ask(errv[1], errv[2]);
  633. }
  634. else if (errv[0] === "TRYAGAIN") {
  635. this.delayQueue.push("tryagain", handlers.tryagain, {
  636. timeout: this.options.retryDelayOnTryAgain,
  637. });
  638. }
  639. else if (errv[0] === "CLUSTERDOWN" &&
  640. this.options.retryDelayOnClusterDown > 0) {
  641. this.delayQueue.push("clusterdown", handlers.connectionClosed, {
  642. timeout: this.options.retryDelayOnClusterDown,
  643. callback: this.refreshSlotsCache.bind(this),
  644. });
  645. }
  646. else if (error.message === utils_2.CONNECTION_CLOSED_ERROR_MSG &&
  647. this.options.retryDelayOnFailover > 0 &&
  648. this.status === "ready") {
  649. this.delayQueue.push("failover", handlers.connectionClosed, {
  650. timeout: this.options.retryDelayOnFailover,
  651. callback: this.refreshSlotsCache.bind(this),
  652. });
  653. }
  654. else {
  655. handlers.defaults();
  656. }
  657. }
  658. getInfoFromNode(redis, callback) {
  659. if (!redis) {
  660. return callback(new Error("Node is disconnected"));
  661. }
  662. // Use a duplication of the connection to avoid
  663. // timeouts when the connection is in the blocking
  664. // mode (e.g. waiting for BLPOP).
  665. const duplicatedConnection = redis.duplicate({
  666. enableOfflineQueue: true,
  667. enableReadyCheck: false,
  668. retryStrategy: null,
  669. connectionName: util_1.getConnectionName("refresher", this.options.redisOptions && this.options.redisOptions.connectionName),
  670. });
  671. // Ignore error events since we will handle
  672. // exceptions for the CLUSTER SLOTS command.
  673. duplicatedConnection.on("error", utils_1.noop);
  674. duplicatedConnection.cluster("slots", utils_2.timeout((err, result) => {
  675. duplicatedConnection.disconnect();
  676. if (err) {
  677. return callback(err);
  678. }
  679. if (this.status === "disconnecting" ||
  680. this.status === "close" ||
  681. this.status === "end") {
  682. debug("ignore CLUSTER.SLOTS results (count: %d) since cluster status is %s", result.length, this.status);
  683. callback();
  684. return;
  685. }
  686. const nodes = [];
  687. debug("cluster slots result count: %d", result.length);
  688. for (let i = 0; i < result.length; ++i) {
  689. const items = result[i];
  690. const slotRangeStart = items[0];
  691. const slotRangeEnd = items[1];
  692. const keys = [];
  693. for (let j = 2; j < items.length; j++) {
  694. if (!items[j][0]) {
  695. continue;
  696. }
  697. items[j] = this.natMapper({ host: items[j][0], port: items[j][1] });
  698. items[j].readOnly = j !== 2;
  699. nodes.push(items[j]);
  700. keys.push(items[j].host + ":" + items[j].port);
  701. }
  702. debug("cluster slots result [%d]: slots %d~%d served by %s", i, slotRangeStart, slotRangeEnd, keys);
  703. for (let slot = slotRangeStart; slot <= slotRangeEnd; slot++) {
  704. this.slots[slot] = keys;
  705. }
  706. }
  707. // Assign to each node keys a numeric value to make autopipeline comparison faster.
  708. this._groupsIds = Object.create(null);
  709. let j = 0;
  710. for (let i = 0; i < 16384; i++) {
  711. const target = (this.slots[i] || []).join(";");
  712. if (!target.length) {
  713. this._groupsBySlot[i] = undefined;
  714. continue;
  715. }
  716. if (!this._groupsIds[target]) {
  717. this._groupsIds[target] = ++j;
  718. }
  719. this._groupsBySlot[i] = this._groupsIds[target];
  720. }
  721. this.connectionPool.reset(nodes);
  722. callback();
  723. }, this.options.slotsRefreshTimeout));
  724. }
  725. invokeReadyDelayedCallbacks(err) {
  726. for (const c of this._readyDelayedCallbacks) {
  727. process.nextTick(c, err);
  728. }
  729. this._readyDelayedCallbacks = [];
  730. }
  731. /**
  732. * Check whether Cluster is able to process commands
  733. *
  734. * @param {Function} callback
  735. * @private
  736. */
  737. readyCheck(callback) {
  738. this.cluster("info", function (err, res) {
  739. if (err) {
  740. return callback(err);
  741. }
  742. if (typeof res !== "string") {
  743. return callback();
  744. }
  745. let state;
  746. const lines = res.split("\r\n");
  747. for (let i = 0; i < lines.length; ++i) {
  748. const parts = lines[i].split(":");
  749. if (parts[0] === "cluster_state") {
  750. state = parts[1];
  751. break;
  752. }
  753. }
  754. if (state === "fail") {
  755. debug("cluster state not ok (%s)", state);
  756. callback(null, state);
  757. }
  758. else {
  759. callback();
  760. }
  761. });
  762. }
  763. resolveSrv(hostname) {
  764. return new Promise((resolve, reject) => {
  765. this.options.resolveSrv(hostname, (err, records) => {
  766. if (err) {
  767. return reject(err);
  768. }
  769. const self = this, groupedRecords = util_1.groupSrvRecords(records), sortedKeys = Object.keys(groupedRecords).sort((a, b) => parseInt(a) - parseInt(b));
  770. function tryFirstOne(err) {
  771. if (!sortedKeys.length) {
  772. return reject(err);
  773. }
  774. const key = sortedKeys[0], group = groupedRecords[key], record = util_1.weightSrvRecords(group);
  775. if (!group.records.length) {
  776. sortedKeys.shift();
  777. }
  778. self.dnsLookup(record.name).then((host) => resolve({
  779. host,
  780. port: record.port,
  781. }), tryFirstOne);
  782. }
  783. tryFirstOne();
  784. });
  785. });
  786. }
  787. dnsLookup(hostname) {
  788. return new Promise((resolve, reject) => {
  789. this.options.dnsLookup(hostname, (err, address) => {
  790. if (err) {
  791. debug("failed to resolve hostname %s to IP: %s", hostname, err.message);
  792. reject(err);
  793. }
  794. else {
  795. debug("resolved hostname %s to IP %s", hostname, address);
  796. resolve(address);
  797. }
  798. });
  799. });
  800. }
  801. /**
  802. * Normalize startup nodes, and resolving hostnames to IPs.
  803. *
  804. * This process happens every time when #connect() is called since
  805. * #startupNodes and DNS records may chanage.
  806. *
  807. * @private
  808. * @returns {Promise<IRedisOptions[]>}
  809. */
  810. resolveStartupNodeHostnames() {
  811. if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
  812. return Promise.reject(new Error("`startupNodes` should contain at least one node."));
  813. }
  814. const startupNodes = util_1.normalizeNodeOptions(this.startupNodes);
  815. const hostnames = util_1.getUniqueHostnamesFromOptions(startupNodes);
  816. if (hostnames.length === 0) {
  817. return Promise.resolve(startupNodes);
  818. }
  819. return Promise.all(hostnames.map((this.options.useSRVRecords ? this.resolveSrv : this.dnsLookup).bind(this))).then((configs) => {
  820. const hostnameToConfig = utils_2.zipMap(hostnames, configs);
  821. return startupNodes.map((node) => {
  822. const config = hostnameToConfig.get(node.host);
  823. if (!config) {
  824. return node;
  825. }
  826. else if (this.options.useSRVRecords) {
  827. return Object.assign({}, node, config);
  828. }
  829. else {
  830. return Object.assign({}, node, { host: config });
  831. }
  832. });
  833. });
  834. }
  835. }
  836. Object.getOwnPropertyNames(commander_1.default.prototype).forEach((name) => {
  837. if (!Cluster.prototype.hasOwnProperty(name)) {
  838. Cluster.prototype[name] = commander_1.default.prototype[name];
  839. }
  840. });
  841. const scanCommands = [
  842. "sscan",
  843. "hscan",
  844. "zscan",
  845. "sscanBuffer",
  846. "hscanBuffer",
  847. "zscanBuffer",
  848. ];
  849. scanCommands.forEach((command) => {
  850. Cluster.prototype[command + "Stream"] = function (key, options) {
  851. return new ScanStream_1.default(utils_1.defaults({
  852. objectMode: true,
  853. key: key,
  854. redis: this,
  855. command: command,
  856. }, options));
  857. };
  858. });
  859. require("../transaction").addTransactionSupport(Cluster.prototype);
  860. exports.default = Cluster;