connection-manager.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. 'use strict';
  2. const _ = require('lodash');
  3. const AbstractConnectionManager = require('../abstract/connection-manager');
  4. const { logger } = require('../../utils/logger');
  5. const debug = logger.debugContext('connection:pg');
  6. const Promise = require('../../promise');
  7. const sequelizeErrors = require('../../errors');
  8. const semver = require('semver');
  9. const dataTypes = require('../../data-types');
  10. const moment = require('moment-timezone');
  11. class ConnectionManager extends AbstractConnectionManager {
  12. constructor(dialect, sequelize) {
  13. sequelize.config.port = sequelize.config.port || 5432;
  14. super(dialect, sequelize);
  15. const pgLib = this._loadDialectModule('pg');
  16. this.lib = this.sequelize.config.native ? pgLib.native : pgLib;
  17. this._clearDynamicOIDs();
  18. this._clearTypeParser();
  19. this.refreshTypeParser(dataTypes.postgres);
  20. }
  21. // Expose this as a method so that the parsing may be updated when the user has added additional, custom types
  22. _refreshTypeParser(dataType) {
  23. const arrayParserBuilder = parser => {
  24. return value => this.lib.types.arrayParser.create(value, parser).parse();
  25. };
  26. const rangeParserBuilder = parser => {
  27. return value => dataType.parse(value, { parser });
  28. };
  29. // Set range parsers
  30. if (dataType.key.toLowerCase() === 'range') {
  31. for (const name in this.nameOidMap) {
  32. const entry = this.nameOidMap[name];
  33. if (! entry.rangeOid) continue;
  34. const rangeParser = rangeParserBuilder(this.getTypeParser(entry.oid));
  35. const arrayRangeParser = arrayParserBuilder(rangeParser);
  36. this.oidParserMap.set(entry.rangeOid, rangeParser);
  37. if (! entry.arrayRangeOid) continue;
  38. this.oidParserMap.set(entry.arrayRangeOid, arrayRangeParser);
  39. }
  40. return;
  41. }
  42. // Create parsers for normal or enum data types
  43. const parser = value => dataType.parse(value);
  44. const arrayParser = arrayParserBuilder(parser);
  45. // Set enum parsers
  46. if (dataType.key.toLowerCase() === 'enum') {
  47. this.enumOids.oids.forEach(oid => {
  48. this.oidParserMap.set(oid, parser);
  49. });
  50. this.enumOids.arrayOids.forEach(arrayOid => {
  51. this.oidParserMap.set(arrayOid, arrayParser);
  52. });
  53. return;
  54. }
  55. // Set parsers for normal data types
  56. dataType.types.postgres.forEach(name => {
  57. if (! this.nameOidMap[name]) return;
  58. this.oidParserMap.set(this.nameOidMap[name].oid, parser);
  59. if (! this.nameOidMap[name].arrayOid) return;
  60. this.oidParserMap.set(this.nameOidMap[name].arrayOid, arrayParser);
  61. });
  62. }
  63. _clearTypeParser() {
  64. this.oidParserMap = new Map();
  65. }
  66. getTypeParser(oid, ...args) {
  67. if (this.oidParserMap.get(oid)) return this.oidParserMap.get(oid);
  68. return this.lib.types.getTypeParser(oid, ...args);
  69. }
  70. connect(config) {
  71. config.user = config.username;
  72. const connectionConfig = _.pick(config, [
  73. 'user', 'password', 'host', 'database', 'port'
  74. ]);
  75. connectionConfig.types = {
  76. getTypeParser: ConnectionManager.prototype.getTypeParser.bind(this)
  77. };
  78. if (config.dialectOptions) {
  79. _.merge(connectionConfig,
  80. _.pick(config.dialectOptions, [
  81. // see [http://www.postgresql.org/docs/9.3/static/runtime-config-logging.html#GUC-APPLICATION-NAME]
  82. 'application_name',
  83. // choose the SSL mode with the PGSSLMODE environment variable
  84. // object format: [https://github.com/brianc/node-postgres/blob/master/lib/connection.js#L79]
  85. // see also [http://www.postgresql.org/docs/9.3/static/libpq-ssl.html]
  86. 'ssl',
  87. // In addition to the values accepted by the corresponding server,
  88. // you can use "auto" to determine the right encoding from the
  89. // current locale in the client (LC_CTYPE environment variable on Unix systems)
  90. 'client_encoding',
  91. // !! DO NOT SET THIS TO TRUE !!
  92. // (unless you know what you're doing)
  93. // see [http://www.postgresql.org/message-id/flat/bc9549a50706040852u27633f41ib1e6b09f8339d845@mail.gmail.com#bc9549a50706040852u27633f41ib1e6b09f8339d845@mail.gmail.com]
  94. 'binary',
  95. // This should help with backends incorrectly considering idle clients to be dead and prematurely disconnecting them.
  96. // this feature has been added in pg module v6.0.0, check pg/CHANGELOG.md
  97. 'keepAlive',
  98. // Times out queries after a set time in milliseconds. Added in pg v7.3
  99. 'statement_timeout'
  100. ]));
  101. }
  102. return new Promise((resolve, reject) => {
  103. let responded = false;
  104. const connection = new this.lib.Client(connectionConfig);
  105. const parameterHandler = message => {
  106. switch (message.parameterName) {
  107. case 'server_version':
  108. if (this.sequelize.options.databaseVersion === 0) {
  109. const version = semver.coerce(message.parameterValue).version;
  110. this.sequelize.options.databaseVersion = semver.valid(version)
  111. ? version
  112. : this.defaultVersion;
  113. }
  114. break;
  115. case 'standard_conforming_strings':
  116. connection['standard_conforming_strings'] = message.parameterValue;
  117. break;
  118. }
  119. };
  120. const endHandler = () => {
  121. debug('connection timeout');
  122. if (!responded) {
  123. reject(new sequelizeErrors.ConnectionTimedOutError(new Error('Connection timed out')));
  124. }
  125. };
  126. // If we didn't ever hear from the client.connect() callback the connection timeout
  127. // node-postgres does not treat this as an error since no active query was ever emitted
  128. connection.once('end', endHandler);
  129. if (!this.sequelize.config.native) {
  130. // Receive various server parameters for further configuration
  131. connection.connection.on('parameterStatus', parameterHandler);
  132. }
  133. connection.connect(err => {
  134. responded = true;
  135. if (!this.sequelize.config.native) {
  136. // remove parameter handler
  137. connection.connection.removeListener('parameterStatus', parameterHandler);
  138. }
  139. if (err) {
  140. if (err.code) {
  141. switch (err.code) {
  142. case 'ECONNREFUSED':
  143. reject(new sequelizeErrors.ConnectionRefusedError(err));
  144. break;
  145. case 'ENOTFOUND':
  146. reject(new sequelizeErrors.HostNotFoundError(err));
  147. break;
  148. case 'EHOSTUNREACH':
  149. reject(new sequelizeErrors.HostNotReachableError(err));
  150. break;
  151. case 'EINVAL':
  152. reject(new sequelizeErrors.InvalidConnectionError(err));
  153. break;
  154. default:
  155. reject(new sequelizeErrors.ConnectionError(err));
  156. break;
  157. }
  158. } else {
  159. reject(new sequelizeErrors.ConnectionError(err));
  160. }
  161. } else {
  162. debug('connection acquired');
  163. connection.removeListener('end', endHandler);
  164. resolve(connection);
  165. }
  166. });
  167. }).tap(connection => {
  168. let query = '';
  169. if (this.sequelize.options.standardConformingStrings !== false && connection['standard_conforming_strings'] !== 'on') {
  170. // Disable escape characters in strings
  171. // see https://github.com/sequelize/sequelize/issues/3545 (security issue)
  172. // see https://www.postgresql.org/docs/current/static/runtime-config-compatible.html#GUC-STANDARD-CONFORMING-STRINGS
  173. query += 'SET standard_conforming_strings=on;';
  174. }
  175. if (this.sequelize.options.clientMinMessages !== false) {
  176. query += `SET client_min_messages TO ${this.sequelize.options.clientMinMessages};`;
  177. }
  178. if (!this.sequelize.config.keepDefaultTimezone) {
  179. const isZone = !!moment.tz.zone(this.sequelize.options.timezone);
  180. if (isZone) {
  181. query += `SET TIME ZONE '${this.sequelize.options.timezone}';`;
  182. } else {
  183. query += `SET TIME ZONE INTERVAL '${this.sequelize.options.timezone}' HOUR TO MINUTE;`;
  184. }
  185. }
  186. if (query) {
  187. return connection.query(query);
  188. }
  189. }).tap(connection => {
  190. if (Object.keys(this.nameOidMap).length === 0 &&
  191. this.enumOids.oids.length === 0 &&
  192. this.enumOids.arrayOids.length === 0) {
  193. return this._refreshDynamicOIDs(connection);
  194. }
  195. }).tap(connection => {
  196. // Don't let a Postgres restart (or error) to take down the whole app
  197. connection.on('error', error => {
  198. connection._invalid = true;
  199. debug(`connection error ${error.code || error.message}`);
  200. this.pool.destroy(connection);
  201. });
  202. });
  203. }
  204. disconnect(connection) {
  205. if (connection._ending) {
  206. debug('connection tried to disconnect but was already at ENDING state');
  207. return Promise.resolve();
  208. }
  209. return Promise.fromCallback(callback => connection.end(callback));
  210. }
  211. validate(connection) {
  212. return !connection._invalid && !connection._ending;
  213. }
  214. _refreshDynamicOIDs(connection) {
  215. const databaseVersion = this.sequelize.options.databaseVersion;
  216. const supportedVersion = '8.3.0';
  217. // Check for supported version
  218. if ( (databaseVersion && semver.gte(databaseVersion, supportedVersion)) === false) {
  219. return Promise.resolve();
  220. }
  221. // Refresh dynamic OIDs for some types
  222. // These include Geometry / Geography / HStore / Enum / Citext / Range
  223. return (connection || this.sequelize).query(
  224. 'WITH ranges AS (' +
  225. ' SELECT pg_range.rngtypid, pg_type.typname AS rngtypname,' +
  226. ' pg_type.typarray AS rngtyparray, pg_range.rngsubtype' +
  227. ' FROM pg_range LEFT OUTER JOIN pg_type ON pg_type.oid = pg_range.rngtypid' +
  228. ')' +
  229. 'SELECT pg_type.typname, pg_type.typtype, pg_type.oid, pg_type.typarray,' +
  230. ' ranges.rngtypname, ranges.rngtypid, ranges.rngtyparray' +
  231. ' FROM pg_type LEFT OUTER JOIN ranges ON pg_type.oid = ranges.rngsubtype' +
  232. ' WHERE (pg_type.typtype IN(\'b\', \'e\'));'
  233. ).then(results => {
  234. let result = Array.isArray(results) ? results.pop() : results;
  235. // When searchPath is prepended then two statements are executed and the result is
  236. // an array of those two statements. First one is the SET search_path and second is
  237. // the SELECT query result.
  238. if (Array.isArray(result)) {
  239. if (result[0].command === 'SET') {
  240. result = result.pop();
  241. }
  242. }
  243. const newNameOidMap = {};
  244. const newEnumOids = { oids: [], arrayOids: [] };
  245. for (const row of result.rows) {
  246. // Mapping enums, handled separatedly
  247. if (row.typtype === 'e') {
  248. newEnumOids.oids.push(row.oid);
  249. if (row.typarray) newEnumOids.arrayOids.push(row.typarray);
  250. continue;
  251. }
  252. // Mapping base types and their arrays
  253. newNameOidMap[row.typname] = { oid: row.oid };
  254. if (row.typarray) newNameOidMap[row.typname].arrayOid = row.typarray;
  255. // Mapping ranges(of base types) and their arrays
  256. if (row.rngtypid) {
  257. newNameOidMap[row.typname].rangeOid = row.rngtypid;
  258. if (row.rngtyparray) newNameOidMap[row.typname].arrayRangeOid = row.rngtyparray;
  259. }
  260. }
  261. // Replace all OID mappings. Avoids temporary empty OID mappings.
  262. this.nameOidMap = newNameOidMap;
  263. this.enumOids = newEnumOids;
  264. this.refreshTypeParser(dataTypes.postgres);
  265. });
  266. }
  267. _clearDynamicOIDs() {
  268. this.nameOidMap = {};
  269. this.enumOids = { oids: [], arrayOids: [] };
  270. }
  271. }
  272. module.exports = ConnectionManager;
  273. module.exports.ConnectionManager = ConnectionManager;
  274. module.exports.default = ConnectionManager;