index.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. "use strict";
  2. var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
  3. function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
  4. return new (P || (P = Promise))(function (resolve, reject) {
  5. function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
  6. function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
  7. function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
  8. step((generator = generator.apply(thisArg, _arguments || [])).next());
  9. });
  10. };
  11. Object.defineProperty(exports, "__esModule", { value: true });
  12. const net_1 = require("net");
  13. const utils_1 = require("../../utils");
  14. const tls_1 = require("tls");
  15. const StandaloneConnector_1 = require("../StandaloneConnector");
  16. const SentinelIterator_1 = require("./SentinelIterator");
  17. exports.SentinelIterator = SentinelIterator_1.default;
  18. const AbstractConnector_1 = require("../AbstractConnector");
  19. const redis_1 = require("../../redis");
  20. const FailoverDetector_1 = require("./FailoverDetector");
  21. const debug = utils_1.Debug("SentinelConnector");
  22. class SentinelConnector extends AbstractConnector_1.default {
  23. constructor(options) {
  24. super(options.disconnectTimeout);
  25. this.options = options;
  26. this.failoverDetector = null;
  27. this.emitter = null;
  28. if (!this.options.sentinels.length) {
  29. throw new Error("Requires at least one sentinel to connect to.");
  30. }
  31. if (!this.options.name) {
  32. throw new Error("Requires the name of master.");
  33. }
  34. this.sentinelIterator = new SentinelIterator_1.default(this.options.sentinels);
  35. }
  36. check(info) {
  37. const roleMatches = !info.role || this.options.role === info.role;
  38. if (!roleMatches) {
  39. debug("role invalid, expected %s, but got %s", this.options.role, info.role);
  40. // Start from the next item.
  41. // Note that `reset` will move the cursor to the previous element,
  42. // so we advance two steps here.
  43. this.sentinelIterator.next();
  44. this.sentinelIterator.next();
  45. this.sentinelIterator.reset(true);
  46. }
  47. return roleMatches;
  48. }
  49. disconnect() {
  50. super.disconnect();
  51. if (this.failoverDetector) {
  52. this.failoverDetector.cleanup();
  53. }
  54. }
  55. connect(eventEmitter) {
  56. this.connecting = true;
  57. this.retryAttempts = 0;
  58. let lastError;
  59. const connectToNext = () => __awaiter(this, void 0, void 0, function* () {
  60. const endpoint = this.sentinelIterator.next();
  61. if (endpoint.done) {
  62. this.sentinelIterator.reset(false);
  63. const retryDelay = typeof this.options.sentinelRetryStrategy === "function"
  64. ? this.options.sentinelRetryStrategy(++this.retryAttempts)
  65. : null;
  66. let errorMsg = typeof retryDelay !== "number"
  67. ? "All sentinels are unreachable and retry is disabled."
  68. : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.`;
  69. if (lastError) {
  70. errorMsg += ` Last error: ${lastError.message}`;
  71. }
  72. debug(errorMsg);
  73. const error = new Error(errorMsg);
  74. if (typeof retryDelay === "number") {
  75. eventEmitter("error", error);
  76. yield new Promise((resolve) => setTimeout(resolve, retryDelay));
  77. return connectToNext();
  78. }
  79. else {
  80. throw error;
  81. }
  82. }
  83. let resolved = null;
  84. let err = null;
  85. try {
  86. resolved = yield this.resolve(endpoint.value);
  87. }
  88. catch (error) {
  89. err = error;
  90. }
  91. if (!this.connecting) {
  92. throw new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG);
  93. }
  94. const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;
  95. if (resolved) {
  96. debug("resolved: %s:%s from sentinel %s", resolved.host, resolved.port, endpointAddress);
  97. if (this.options.enableTLSForSentinelMode && this.options.tls) {
  98. Object.assign(resolved, this.options.tls);
  99. this.stream = tls_1.connect(resolved);
  100. }
  101. else {
  102. this.stream = net_1.createConnection(resolved);
  103. }
  104. this.stream.once("connect", () => this.initFailoverDetector());
  105. this.stream.once("error", (err) => {
  106. this.firstError = err;
  107. });
  108. return this.stream;
  109. }
  110. else {
  111. const errorMsg = err
  112. ? "failed to connect to sentinel " +
  113. endpointAddress +
  114. " because " +
  115. err.message
  116. : "connected to sentinel " +
  117. endpointAddress +
  118. " successfully, but got an invalid reply: " +
  119. resolved;
  120. debug(errorMsg);
  121. eventEmitter("sentinelError", new Error(errorMsg));
  122. if (err) {
  123. lastError = err;
  124. }
  125. return connectToNext();
  126. }
  127. });
  128. return connectToNext();
  129. }
  130. updateSentinels(client) {
  131. return __awaiter(this, void 0, void 0, function* () {
  132. if (!this.options.updateSentinels) {
  133. return;
  134. }
  135. const result = yield client.sentinel("sentinels", this.options.name);
  136. if (!Array.isArray(result)) {
  137. return;
  138. }
  139. result
  140. .map(utils_1.packObject)
  141. .forEach((sentinel) => {
  142. const flags = sentinel.flags ? sentinel.flags.split(",") : [];
  143. if (flags.indexOf("disconnected") === -1 &&
  144. sentinel.ip &&
  145. sentinel.port) {
  146. const endpoint = this.sentinelNatResolve(addressResponseToAddress(sentinel));
  147. if (this.sentinelIterator.add(endpoint)) {
  148. debug("adding sentinel %s:%s", endpoint.host, endpoint.port);
  149. }
  150. }
  151. });
  152. debug("Updated internal sentinels: %s", this.sentinelIterator);
  153. });
  154. }
  155. resolveMaster(client) {
  156. return __awaiter(this, void 0, void 0, function* () {
  157. const result = yield client.sentinel("get-master-addr-by-name", this.options.name);
  158. yield this.updateSentinels(client);
  159. return this.sentinelNatResolve(Array.isArray(result)
  160. ? { host: result[0], port: Number(result[1]) }
  161. : null);
  162. });
  163. }
  164. resolveSlave(client) {
  165. return __awaiter(this, void 0, void 0, function* () {
  166. const result = yield client.sentinel("slaves", this.options.name);
  167. if (!Array.isArray(result)) {
  168. return null;
  169. }
  170. const availableSlaves = result
  171. .map(utils_1.packObject)
  172. .filter((slave) => slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/));
  173. return this.sentinelNatResolve(selectPreferredSentinel(availableSlaves, this.options.preferredSlaves));
  174. });
  175. }
  176. sentinelNatResolve(item) {
  177. if (!item || !this.options.natMap)
  178. return item;
  179. return this.options.natMap[`${item.host}:${item.port}`] || item;
  180. }
  181. connectToSentinel(endpoint, options) {
  182. return new redis_1.default(Object.assign({ port: endpoint.port || 26379, host: endpoint.host, username: this.options.sentinelUsername || null, password: this.options.sentinelPassword || null, family: endpoint.family ||
  183. (StandaloneConnector_1.isIIpcConnectionOptions(this.options)
  184. ? undefined
  185. : this.options.family), tls: this.options.sentinelTLS, retryStrategy: null, enableReadyCheck: false, connectTimeout: this.options.connectTimeout, commandTimeout: this.options.sentinelCommandTimeout, dropBufferSupport: true }, options));
  186. }
  187. resolve(endpoint) {
  188. return __awaiter(this, void 0, void 0, function* () {
  189. const client = this.connectToSentinel(endpoint);
  190. // ignore the errors since resolve* methods will handle them
  191. client.on("error", noop);
  192. try {
  193. if (this.options.role === "slave") {
  194. return yield this.resolveSlave(client);
  195. }
  196. else {
  197. return yield this.resolveMaster(client);
  198. }
  199. }
  200. finally {
  201. client.disconnect();
  202. }
  203. });
  204. }
  205. initFailoverDetector() {
  206. var _a;
  207. return __awaiter(this, void 0, void 0, function* () {
  208. if (!this.options.failoverDetector) {
  209. return;
  210. }
  211. // Move the current sentinel to the first position
  212. this.sentinelIterator.reset(true);
  213. const sentinels = [];
  214. // In case of a large amount of sentinels, limit the number of concurrent connections
  215. while (sentinels.length < this.options.sentinelMaxConnections) {
  216. const { done, value } = this.sentinelIterator.next();
  217. if (done) {
  218. break;
  219. }
  220. const client = this.connectToSentinel(value, {
  221. lazyConnect: true,
  222. retryStrategy: this.options.sentinelReconnectStrategy,
  223. });
  224. client.on("reconnecting", () => {
  225. var _a;
  226. // Tests listen to this event
  227. (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("sentinelReconnecting");
  228. });
  229. sentinels.push({ address: value, client });
  230. }
  231. this.sentinelIterator.reset(false);
  232. if (this.failoverDetector) {
  233. // Clean up previous detector
  234. this.failoverDetector.cleanup();
  235. }
  236. this.failoverDetector = new FailoverDetector_1.FailoverDetector(this, sentinels);
  237. yield this.failoverDetector.subscribe();
  238. // Tests listen to this event
  239. (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("failoverSubscribed");
  240. });
  241. }
  242. }
  243. exports.default = SentinelConnector;
  244. function selectPreferredSentinel(availableSlaves, preferredSlaves) {
  245. if (availableSlaves.length === 0) {
  246. return null;
  247. }
  248. let selectedSlave;
  249. if (typeof preferredSlaves === "function") {
  250. selectedSlave = preferredSlaves(availableSlaves);
  251. }
  252. else if (preferredSlaves !== null && typeof preferredSlaves === "object") {
  253. const preferredSlavesArray = Array.isArray(preferredSlaves)
  254. ? preferredSlaves
  255. : [preferredSlaves];
  256. // sort by priority
  257. preferredSlavesArray.sort((a, b) => {
  258. // default the priority to 1
  259. if (!a.prio) {
  260. a.prio = 1;
  261. }
  262. if (!b.prio) {
  263. b.prio = 1;
  264. }
  265. // lowest priority first
  266. if (a.prio < b.prio) {
  267. return -1;
  268. }
  269. if (a.prio > b.prio) {
  270. return 1;
  271. }
  272. return 0;
  273. });
  274. // loop over preferred slaves and return the first match
  275. for (let p = 0; p < preferredSlavesArray.length; p++) {
  276. for (let a = 0; a < availableSlaves.length; a++) {
  277. const slave = availableSlaves[a];
  278. if (slave.ip === preferredSlavesArray[p].ip) {
  279. if (slave.port === preferredSlavesArray[p].port) {
  280. selectedSlave = slave;
  281. break;
  282. }
  283. }
  284. }
  285. if (selectedSlave) {
  286. break;
  287. }
  288. }
  289. }
  290. // if none of the preferred slaves are available, a random available slave is returned
  291. if (!selectedSlave) {
  292. selectedSlave = utils_1.sample(availableSlaves);
  293. }
  294. return addressResponseToAddress(selectedSlave);
  295. }
  296. function addressResponseToAddress(input) {
  297. return { host: input.ip, port: Number(input.port) };
  298. }
  299. function noop() { }