connection-manager.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. 'use strict';
  2. const { Pool, TimeoutError } = require('sequelize-pool');
  3. const _ = require('lodash');
  4. const semver = require('semver');
  5. const Promise = require('../../promise');
  6. const errors = require('../../errors');
  7. const { logger } = require('../../utils/logger');
  8. const debug = logger.debugContext('pool');
  9. /**
  10. * Abstract Connection Manager
  11. *
  12. * Connection manager which handles pooling & replication.
  13. * Uses sequelize-pool for pooling
  14. *
  15. * @private
  16. */
  17. class ConnectionManager {
  18. constructor(dialect, sequelize) {
  19. const config = _.cloneDeep(sequelize.config);
  20. this.sequelize = sequelize;
  21. this.config = config;
  22. this.dialect = dialect;
  23. this.versionPromise = null;
  24. this.dialectName = this.sequelize.options.dialect;
  25. if (config.pool === false) {
  26. throw new Error('Support for pool:false was removed in v4.0');
  27. }
  28. config.pool = _.defaults(config.pool || {}, {
  29. max: 5,
  30. min: 0,
  31. idle: 10000,
  32. acquire: 60000,
  33. evict: 1000,
  34. validate: this._validate.bind(this)
  35. });
  36. this.initPools();
  37. }
  38. refreshTypeParser(dataTypes) {
  39. _.each(dataTypes, dataType => {
  40. if (Object.prototype.hasOwnProperty.call(dataType, 'parse')) {
  41. if (dataType.types[this.dialectName]) {
  42. this._refreshTypeParser(dataType);
  43. } else {
  44. throw new Error(`Parse function not supported for type ${dataType.key} in dialect ${this.dialectName}`);
  45. }
  46. }
  47. });
  48. }
  49. /**
  50. * Try to load dialect module from various configured options.
  51. * Priority goes like dialectModulePath > dialectModule > require(default)
  52. *
  53. * @param {string} moduleName Name of dialect module to lookup
  54. *
  55. * @private
  56. * @returns {Object}
  57. */
  58. _loadDialectModule(moduleName) {
  59. try {
  60. if (this.sequelize.config.dialectModulePath) {
  61. return require(this.sequelize.config.dialectModulePath);
  62. }
  63. if (this.sequelize.config.dialectModule) {
  64. return this.sequelize.config.dialectModule;
  65. }
  66. // This is needed so that bundlers (like webpack) know which library to include in the bundle
  67. switch (moduleName) {
  68. case 'pg': return require('pg');
  69. case 'mysql2': return require('mysql2');
  70. case 'mariadb': return require('mariadb');
  71. case 'sqlite3': return require('sqlite3');
  72. case 'tedious': return require('tedious');
  73. default: return require(moduleName);
  74. }
  75. } catch (err) {
  76. if (err.code === 'MODULE_NOT_FOUND') {
  77. if (this.sequelize.config.dialectModulePath) {
  78. throw new Error(`Unable to find dialect at ${this.sequelize.config.dialectModulePath}`);
  79. }
  80. throw new Error(`Please install ${moduleName} package manually`);
  81. }
  82. throw err;
  83. }
  84. }
  85. /**
  86. * Handler which executes on process exit or connection manager shutdown
  87. *
  88. * @private
  89. * @returns {Promise}
  90. */
  91. _onProcessExit() {
  92. if (!this.pool) {
  93. return Promise.resolve();
  94. }
  95. return this.pool.drain().then(() => {
  96. debug('connection drain due to process exit');
  97. return this.pool.destroyAllNow();
  98. });
  99. }
  100. /**
  101. * Drain the pool and close it permanently
  102. *
  103. * @returns {Promise}
  104. */
  105. close() {
  106. // Mark close of pool
  107. this.getConnection = function getConnection() {
  108. return Promise.reject(new Error('ConnectionManager.getConnection was called after the connection manager was closed!'));
  109. };
  110. return this._onProcessExit();
  111. }
  112. /**
  113. * Initialize connection pool. By default pool autostart is set to false, so no connection will be
  114. * be created unless `pool.acquire` is called.
  115. */
  116. initPools() {
  117. const config = this.config;
  118. if (!config.replication) {
  119. this.pool = new Pool({
  120. name: 'sequelize',
  121. create: () => this._connect(config),
  122. destroy: connection => {
  123. return this._disconnect(connection)
  124. .tap(() => { debug('connection destroy'); });
  125. },
  126. validate: config.pool.validate,
  127. max: config.pool.max,
  128. min: config.pool.min,
  129. acquireTimeoutMillis: config.pool.acquire,
  130. idleTimeoutMillis: config.pool.idle,
  131. reapIntervalMillis: config.pool.evict
  132. });
  133. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, no replication`);
  134. return;
  135. }
  136. if (!Array.isArray(config.replication.read)) {
  137. config.replication.read = [config.replication.read];
  138. }
  139. // Map main connection config
  140. config.replication.write = _.defaults(config.replication.write, _.omit(config, 'replication'));
  141. // Apply defaults to each read config
  142. config.replication.read = config.replication.read.map(readConfig =>
  143. _.defaults(readConfig, _.omit(this.config, 'replication'))
  144. );
  145. // custom pooling for replication (original author @janmeier)
  146. let reads = 0;
  147. this.pool = {
  148. release: client => {
  149. if (client.queryType === 'read') {
  150. this.pool.read.release(client);
  151. } else {
  152. this.pool.write.release(client);
  153. }
  154. },
  155. acquire: (queryType, useMaster) => {
  156. useMaster = useMaster === undefined ? false : useMaster;
  157. if (queryType === 'SELECT' && !useMaster) {
  158. return this.pool.read.acquire();
  159. }
  160. return this.pool.write.acquire();
  161. },
  162. destroy: connection => {
  163. this.pool[connection.queryType].destroy(connection);
  164. debug('connection destroy');
  165. },
  166. destroyAllNow: () => {
  167. return Promise.join(
  168. this.pool.read.destroyAllNow(),
  169. this.pool.write.destroyAllNow()
  170. ).tap(() => { debug('all connections destroyed'); });
  171. },
  172. drain: () => {
  173. return Promise.join(
  174. this.pool.write.drain(),
  175. this.pool.read.drain()
  176. );
  177. },
  178. read: new Pool({
  179. name: 'sequelize:read',
  180. create: () => {
  181. // round robin config
  182. const nextRead = reads++ % config.replication.read.length;
  183. return this._connect(config.replication.read[nextRead]).tap(connection => {
  184. connection.queryType = 'read';
  185. });
  186. },
  187. destroy: connection => this._disconnect(connection),
  188. validate: config.pool.validate,
  189. max: config.pool.max,
  190. min: config.pool.min,
  191. acquireTimeoutMillis: config.pool.acquire,
  192. idleTimeoutMillis: config.pool.idle,
  193. reapIntervalMillis: config.pool.evict
  194. }),
  195. write: new Pool({
  196. name: 'sequelize:write',
  197. create: () => {
  198. return this._connect(config.replication.write).tap(connection => {
  199. connection.queryType = 'write';
  200. });
  201. },
  202. destroy: connection => this._disconnect(connection),
  203. validate: config.pool.validate,
  204. max: config.pool.max,
  205. min: config.pool.min,
  206. acquireTimeoutMillis: config.pool.acquire,
  207. idleTimeoutMillis: config.pool.idle,
  208. reapIntervalMillis: config.pool.evict
  209. })
  210. };
  211. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, with replication`);
  212. }
  213. /**
  214. * Get connection from pool. It sets database version if it's not already set.
  215. * Call pool.acquire to get a connection
  216. *
  217. * @param {Object} [options] Pool options
  218. * @param {string} [options.type] Set which replica to use. Available options are `read` and `write`
  219. * @param {boolean} [options.useMaster=false] Force master or write replica to get connection from
  220. *
  221. * @returns {Promise<Connection>}
  222. */
  223. getConnection(options) {
  224. options = options || {};
  225. let promise;
  226. if (this.sequelize.options.databaseVersion === 0) {
  227. if (this.versionPromise) {
  228. promise = this.versionPromise;
  229. } else {
  230. promise = this.versionPromise = this._connect(this.config.replication.write || this.config)
  231. .then(connection => {
  232. const _options = {};
  233. _options.transaction = { connection }; // Cheat .query to use our private connection
  234. _options.logging = () => {};
  235. _options.logging.__testLoggingFn = true;
  236. //connection might have set databaseVersion value at initialization,
  237. //avoiding a useless round trip
  238. if (this.sequelize.options.databaseVersion === 0) {
  239. return this.sequelize.databaseVersion(_options).then(version => {
  240. const parsedVersion = _.get(semver.coerce(version), 'version') || version;
  241. this.sequelize.options.databaseVersion = semver.valid(parsedVersion)
  242. ? parsedVersion
  243. : this.defaultVersion;
  244. this.versionPromise = null;
  245. return this._disconnect(connection);
  246. });
  247. }
  248. this.versionPromise = null;
  249. return this._disconnect(connection);
  250. }).catch(err => {
  251. this.versionPromise = null;
  252. throw err;
  253. });
  254. }
  255. } else {
  256. promise = Promise.resolve();
  257. }
  258. return promise.then(() => {
  259. return this.pool.acquire(options.type, options.useMaster)
  260. .catch(error => {
  261. if (error instanceof TimeoutError) throw new errors.ConnectionAcquireTimeoutError(error);
  262. throw error;
  263. });
  264. }).tap(() => { debug('connection acquired'); });
  265. }
  266. /**
  267. * Release a pooled connection so it can be utilized by other connection requests
  268. *
  269. * @param {Connection} connection
  270. *
  271. * @returns {Promise}
  272. */
  273. releaseConnection(connection) {
  274. return Promise.try(() => {
  275. this.pool.release(connection);
  276. debug('connection released');
  277. });
  278. }
  279. /**
  280. * Call dialect library to get connection
  281. *
  282. * @param {*} config Connection config
  283. * @private
  284. * @returns {Promise<Connection>}
  285. */
  286. _connect(config) {
  287. return this.sequelize.runHooks('beforeConnect', config)
  288. .then(() => this.dialect.connectionManager.connect(config))
  289. .then(connection => this.sequelize.runHooks('afterConnect', connection, config).return(connection));
  290. }
  291. /**
  292. * Call dialect library to disconnect a connection
  293. *
  294. * @param {Connection} connection
  295. * @private
  296. * @returns {Promise}
  297. */
  298. _disconnect(connection) {
  299. return this.sequelize.runHooks('beforeDisconnect', connection)
  300. .then(() => this.dialect.connectionManager.disconnect(connection))
  301. .then(() => this.sequelize.runHooks('afterDisconnect', connection));
  302. }
  303. /**
  304. * Determine if a connection is still valid or not
  305. *
  306. * @param {Connection} connection
  307. *
  308. * @returns {boolean}
  309. */
  310. _validate(connection) {
  311. if (!this.dialect.connectionManager.validate) {
  312. return true;
  313. }
  314. return this.dialect.connectionManager.validate(connection);
  315. }
  316. }
  317. module.exports = ConnectionManager;
  318. module.exports.ConnectionManager = ConnectionManager;
  319. module.exports.default = ConnectionManager;