promise.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. 'use strict';
  2. const core = require('./index.js');
  3. const EventEmitter = require('events').EventEmitter;
  4. function makeDoneCb(resolve, reject, localErr) {
  5. return function (err, rows, fields) {
  6. if (err) {
  7. localErr.message = err.message;
  8. localErr.code = err.code;
  9. localErr.errno = err.errno;
  10. localErr.sql = err.sql;
  11. localErr.sqlState = err.sqlState;
  12. localErr.sqlMessage = err.sqlMessage;
  13. reject(localErr);
  14. } else {
  15. resolve([rows, fields]);
  16. }
  17. };
  18. }
  19. function inheritEvents(source, target, events) {
  20. const listeners = {};
  21. target
  22. .on('newListener', eventName => {
  23. if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
  24. source.on(
  25. eventName,
  26. (listeners[eventName] = function () {
  27. const args = [].slice.call(arguments);
  28. args.unshift(eventName);
  29. target.emit.apply(target, args);
  30. })
  31. );
  32. }
  33. })
  34. .on('removeListener', eventName => {
  35. if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
  36. source.removeListener(eventName, listeners[eventName]);
  37. delete listeners[eventName];
  38. }
  39. });
  40. }
  41. class PromisePreparedStatementInfo {
  42. constructor(statement, promiseImpl) {
  43. this.statement = statement;
  44. this.Promise = promiseImpl;
  45. }
  46. execute(parameters) {
  47. const s = this.statement;
  48. const localErr = new Error();
  49. return new this.Promise((resolve, reject) => {
  50. const done = makeDoneCb(resolve, reject, localErr);
  51. if (parameters) {
  52. s.execute(parameters, done);
  53. } else {
  54. s.execute(done);
  55. }
  56. });
  57. }
  58. close() {
  59. return new this.Promise(resolve => {
  60. this.statement.close();
  61. resolve();
  62. });
  63. }
  64. }
  65. class PromiseConnection extends EventEmitter {
  66. constructor(connection, promiseImpl) {
  67. super();
  68. this.connection = connection;
  69. this.Promise = promiseImpl || Promise;
  70. inheritEvents(connection, this, [
  71. 'error',
  72. 'drain',
  73. 'connect',
  74. 'end',
  75. 'enqueue'
  76. ]);
  77. }
  78. release() {
  79. this.connection.release();
  80. }
  81. query(query, params) {
  82. const c = this.connection;
  83. const localErr = new Error();
  84. if (typeof params === 'function') {
  85. throw new Error(
  86. 'Callback function is not available with promise clients.'
  87. );
  88. }
  89. return new this.Promise((resolve, reject) => {
  90. const done = makeDoneCb(resolve, reject, localErr);
  91. if (params !== undefined) {
  92. c.query(query, params, done);
  93. } else {
  94. c.query(query, done);
  95. }
  96. });
  97. }
  98. execute(query, params) {
  99. const c = this.connection;
  100. const localErr = new Error();
  101. if (typeof params === 'function') {
  102. throw new Error(
  103. 'Callback function is not available with promise clients.'
  104. );
  105. }
  106. return new this.Promise((resolve, reject) => {
  107. const done = makeDoneCb(resolve, reject, localErr);
  108. if (params !== undefined) {
  109. c.execute(query, params, done);
  110. } else {
  111. c.execute(query, done);
  112. }
  113. });
  114. }
  115. end() {
  116. return new this.Promise(resolve => {
  117. this.connection.end(resolve);
  118. });
  119. }
  120. beginTransaction() {
  121. const c = this.connection;
  122. const localErr = new Error();
  123. return new this.Promise((resolve, reject) => {
  124. const done = makeDoneCb(resolve, reject, localErr);
  125. c.beginTransaction(done);
  126. });
  127. }
  128. commit() {
  129. const c = this.connection;
  130. const localErr = new Error();
  131. return new this.Promise((resolve, reject) => {
  132. const done = makeDoneCb(resolve, reject, localErr);
  133. c.commit(done);
  134. });
  135. }
  136. rollback() {
  137. const c = this.connection;
  138. const localErr = new Error();
  139. return new this.Promise((resolve, reject) => {
  140. const done = makeDoneCb(resolve, reject, localErr);
  141. c.rollback(done);
  142. });
  143. }
  144. ping() {
  145. const c = this.connection;
  146. const localErr = new Error();
  147. return new this.Promise((resolve, reject) => {
  148. const done = makeDoneCb(resolve, reject, localErr);
  149. c.ping(done);
  150. });
  151. }
  152. connect() {
  153. const c = this.connection;
  154. const localErr = new Error();
  155. return new this.Promise((resolve, reject) => {
  156. c.connect((err, param) => {
  157. if (err) {
  158. localErr.message = err.message;
  159. localErr.code = err.code;
  160. localErr.errno = err.errno;
  161. localErr.sqlState = err.sqlState;
  162. localErr.sqlMessage = err.sqlMessage;
  163. reject(localErr);
  164. } else {
  165. resolve(param);
  166. }
  167. });
  168. });
  169. }
  170. prepare(options) {
  171. const c = this.connection;
  172. const promiseImpl = this.Promise;
  173. const localErr = new Error();
  174. return new this.Promise((resolve, reject) => {
  175. c.prepare(options, (err, statement) => {
  176. if (err) {
  177. localErr.message = err.message;
  178. localErr.code = err.code;
  179. localErr.errno = err.errno;
  180. localErr.sqlState = err.sqlState;
  181. localErr.sqlMessage = err.sqlMessage;
  182. reject(localErr);
  183. } else {
  184. const wrappedStatement = new PromisePreparedStatementInfo(
  185. statement,
  186. promiseImpl
  187. );
  188. resolve(wrappedStatement);
  189. }
  190. });
  191. });
  192. }
  193. changeUser(options) {
  194. const c = this.connection;
  195. const localErr = new Error();
  196. return new this.Promise((resolve, reject) => {
  197. c.changeUser(options, err => {
  198. if (err) {
  199. localErr.message = err.message;
  200. localErr.code = err.code;
  201. localErr.errno = err.errno;
  202. localErr.sqlState = err.sqlState;
  203. localErr.sqlMessage = err.sqlMessage;
  204. reject(localErr);
  205. } else {
  206. resolve();
  207. }
  208. });
  209. });
  210. }
  211. get config() {
  212. return this.connection.config;
  213. }
  214. get threadId() {
  215. return this.connection.threadId;
  216. }
  217. }
  218. function createConnection(opts) {
  219. const coreConnection = core.createConnection(opts);
  220. const createConnectionErr = new Error();
  221. const thePromise = opts.Promise || Promise;
  222. if (!thePromise) {
  223. throw new Error(
  224. 'no Promise implementation available.' +
  225. 'Use promise-enabled node version or pass userland Promise' +
  226. " implementation as parameter, for example: { Promise: require('bluebird') }"
  227. );
  228. }
  229. return new thePromise((resolve, reject) => {
  230. coreConnection.once('connect', () => {
  231. resolve(new PromiseConnection(coreConnection, thePromise));
  232. });
  233. coreConnection.once('error', err => {
  234. createConnectionErr.message = err.message;
  235. createConnectionErr.code = err.code;
  236. createConnectionErr.errno = err.errno;
  237. createConnectionErr.sqlState = err.sqlState;
  238. reject(createConnectionErr);
  239. });
  240. });
  241. }
  242. // note: the callback of "changeUser" is not called on success
  243. // hence there is no possibility to call "resolve"
  244. // patching PromiseConnection
  245. // create facade functions for prototype functions on "Connection" that are not yet
  246. // implemented with PromiseConnection
  247. // proxy synchronous functions only
  248. (function (functionsToWrap) {
  249. for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
  250. const func = functionsToWrap[i];
  251. if (
  252. typeof core.Connection.prototype[func] === 'function' &&
  253. PromiseConnection.prototype[func] === undefined
  254. ) {
  255. PromiseConnection.prototype[func] = (function factory(funcName) {
  256. return function () {
  257. return core.Connection.prototype[funcName].apply(
  258. this.connection,
  259. arguments
  260. );
  261. };
  262. })(func);
  263. }
  264. }
  265. })([
  266. // synchronous functions
  267. 'close',
  268. 'createBinlogStream',
  269. 'destroy',
  270. 'escape',
  271. 'escapeId',
  272. 'format',
  273. 'pause',
  274. 'pipe',
  275. 'resume',
  276. 'unprepare'
  277. ]);
  278. class PromisePoolConnection extends PromiseConnection {
  279. constructor(connection, promiseImpl) {
  280. super(connection, promiseImpl);
  281. }
  282. destroy() {
  283. return core.PoolConnection.prototype.destroy.apply(
  284. this.connection,
  285. arguments
  286. );
  287. }
  288. }
  289. class PromisePool extends EventEmitter {
  290. constructor(pool, thePromise) {
  291. super();
  292. this.pool = pool;
  293. this.Promise = thePromise || Promise;
  294. inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']);
  295. }
  296. getConnection() {
  297. const corePool = this.pool;
  298. return new this.Promise((resolve, reject) => {
  299. corePool.getConnection((err, coreConnection) => {
  300. if (err) {
  301. reject(err);
  302. } else {
  303. resolve(new PromisePoolConnection(coreConnection, this.Promise));
  304. }
  305. });
  306. });
  307. }
  308. query(sql, args) {
  309. const corePool = this.pool;
  310. const localErr = new Error();
  311. if (typeof args === 'function') {
  312. throw new Error(
  313. 'Callback function is not available with promise clients.'
  314. );
  315. }
  316. return new this.Promise((resolve, reject) => {
  317. const done = makeDoneCb(resolve, reject, localErr);
  318. if (args !== undefined) {
  319. corePool.query(sql, args, done);
  320. } else {
  321. corePool.query(sql, done);
  322. }
  323. });
  324. }
  325. execute(sql, args) {
  326. const corePool = this.pool;
  327. const localErr = new Error();
  328. if (typeof args === 'function') {
  329. throw new Error(
  330. 'Callback function is not available with promise clients.'
  331. );
  332. }
  333. return new this.Promise((resolve, reject) => {
  334. const done = makeDoneCb(resolve, reject, localErr);
  335. if (args) {
  336. corePool.execute(sql, args, done);
  337. } else {
  338. corePool.execute(sql, done);
  339. }
  340. });
  341. }
  342. end() {
  343. const corePool = this.pool;
  344. const localErr = new Error();
  345. return new this.Promise((resolve, reject) => {
  346. corePool.end(err => {
  347. if (err) {
  348. localErr.message = err.message;
  349. localErr.code = err.code;
  350. localErr.errno = err.errno;
  351. localErr.sqlState = err.sqlState;
  352. localErr.sqlMessage = err.sqlMessage;
  353. reject(localErr);
  354. } else {
  355. resolve();
  356. }
  357. });
  358. });
  359. }
  360. }
  361. function createPool(opts) {
  362. const corePool = core.createPool(opts);
  363. const thePromise = opts.Promise || Promise;
  364. if (!thePromise) {
  365. throw new Error(
  366. 'no Promise implementation available.' +
  367. 'Use promise-enabled node version or pass userland Promise' +
  368. " implementation as parameter, for example: { Promise: require('bluebird') }"
  369. );
  370. }
  371. return new PromisePool(corePool, thePromise);
  372. }
  373. (function (functionsToWrap) {
  374. for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
  375. const func = functionsToWrap[i];
  376. if (
  377. typeof core.Pool.prototype[func] === 'function' &&
  378. PromisePool.prototype[func] === undefined
  379. ) {
  380. PromisePool.prototype[func] = (function factory(funcName) {
  381. return function () {
  382. return core.Pool.prototype[funcName].apply(this.pool, arguments);
  383. };
  384. })(func);
  385. }
  386. }
  387. })([
  388. // synchronous functions
  389. 'escape',
  390. 'escapeId',
  391. 'format'
  392. ]);
  393. class PromisePoolCluster extends EventEmitter {
  394. constructor(poolCluster, thePromise) {
  395. super();
  396. this.poolCluster = poolCluster;
  397. this.Promise = thePromise || Promise;
  398. inheritEvents(poolCluster, this, ['acquire', 'connection', 'enqueue', 'release']);
  399. }
  400. getConnection() {
  401. const corePoolCluster = this.poolCluster;
  402. return new this.Promise((resolve, reject) => {
  403. corePoolCluster.getConnection((err, coreConnection) => {
  404. if (err) {
  405. reject(err);
  406. } else {
  407. resolve(new PromisePoolConnection(coreConnection, this.Promise));
  408. }
  409. });
  410. });
  411. }
  412. query(sql, args) {
  413. const corePoolCluster = this.poolCluster;
  414. const localErr = new Error();
  415. if (typeof args === 'function') {
  416. throw new Error(
  417. 'Callback function is not available with promise clients.'
  418. );
  419. }
  420. return new this.Promise((resolve, reject) => {
  421. const done = makeDoneCb(resolve, reject, localErr);
  422. corePoolCluster.query(sql, args, done);
  423. });
  424. }
  425. execute(sql, args) {
  426. const corePoolCluster = this.poolCluster;
  427. const localErr = new Error();
  428. if (typeof args === 'function') {
  429. throw new Error(
  430. 'Callback function is not available with promise clients.'
  431. );
  432. }
  433. return new this.Promise((resolve, reject) => {
  434. const done = makeDoneCb(resolve, reject, localErr);
  435. corePoolCluster.execute(sql, args, done);
  436. });
  437. }
  438. of(pattern, selector) {
  439. return new PromisePoolCluster(
  440. this.poolCluster.of(pattern, selector),
  441. this.Promise
  442. );
  443. }
  444. end() {
  445. const corePoolCluster = this.poolCluster;
  446. const localErr = new Error();
  447. return new this.Promise((resolve, reject) => {
  448. corePoolCluster.end(err => {
  449. if (err) {
  450. localErr.message = err.message;
  451. localErr.code = err.code;
  452. localErr.errno = err.errno;
  453. localErr.sqlState = err.sqlState;
  454. localErr.sqlMessage = err.sqlMessage;
  455. reject(localErr);
  456. } else {
  457. resolve();
  458. }
  459. });
  460. });
  461. }
  462. }
  463. /**
  464. * proxy poolCluster synchronous functions
  465. */
  466. (function (functionsToWrap) {
  467. for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
  468. const func = functionsToWrap[i];
  469. if (
  470. typeof core.PoolCluster.prototype[func] === 'function' &&
  471. PromisePoolCluster.prototype[func] === undefined
  472. ) {
  473. PromisePoolCluster.prototype[func] = (function factory(funcName) {
  474. return function () {
  475. return core.PoolCluster.prototype[funcName].apply(this.poolCluster, arguments);
  476. };
  477. })(func);
  478. }
  479. }
  480. })([
  481. 'add'
  482. ]);
  483. function createPoolCluster(opts) {
  484. const corePoolCluster = core.createPoolCluster(opts);
  485. const thePromise = (opts && opts.Promise) || Promise;
  486. if (!thePromise) {
  487. throw new Error(
  488. 'no Promise implementation available.' +
  489. 'Use promise-enabled node version or pass userland Promise' +
  490. " implementation as parameter, for example: { Promise: require('bluebird') }"
  491. );
  492. }
  493. return new PromisePoolCluster(corePoolCluster, thePromise);
  494. }
  495. exports.createConnection = createConnection;
  496. exports.createPool = createPool;
  497. exports.createPoolCluster = createPoolCluster;
  498. exports.escape = core.escape;
  499. exports.escapeId = core.escapeId;
  500. exports.format = core.format;
  501. exports.raw = core.raw;
  502. exports.PromisePool = PromisePool;
  503. exports.PromiseConnection = PromiseConnection;
  504. exports.PromisePoolConnection = PromisePoolConnection;