connection.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. // This file was modified by Oracle on June 1, 2021.
  2. // The changes involve new logic to handle an additional ERR Packet sent by
  3. // the MySQL server when the connection is closed unexpectedly.
  4. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  5. // This file was modified by Oracle on June 17, 2021.
  6. // The changes involve logic to ensure the socket connection is closed when
  7. // there is a fatal error.
  8. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  9. 'use strict';
  10. const Net = require('net');
  11. const Tls = require('tls');
  12. const Timers = require('timers');
  13. const EventEmitter = require('events').EventEmitter;
  14. const Readable = require('stream').Readable;
  15. const Queue = require('denque');
  16. const SqlString = require('sqlstring');
  17. const LRU = require('lru-cache');
  18. const PacketParser = require('./packet_parser.js');
  19. const Packets = require('./packets/index.js');
  20. const Commands = require('./commands/index.js');
  21. const ConnectionConfig = require('./connection_config.js');
  22. const CharsetToEncoding = require('./constants/charset_encodings.js');
  23. let _connectionId = 0;
  24. let convertNamedPlaceholders = null;
  25. class Connection extends EventEmitter {
  26. constructor(opts) {
  27. super();
  28. this.config = opts.config;
  29. // TODO: fill defaults
  30. // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
  31. // if host is given, connect to host:3306
  32. // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
  33. // if there is no host/port and no socketPath parameters?
  34. if (!opts.config.stream) {
  35. if (opts.config.socketPath) {
  36. this.stream = Net.connect(opts.config.socketPath);
  37. } else {
  38. this.stream = Net.connect(
  39. opts.config.port,
  40. opts.config.host
  41. );
  42. // Enable keep-alive on the socket. It's disabled by default, but the
  43. // user can enable it and supply an initial delay.
  44. this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
  45. }
  46. // if stream is a function, treat it as "stream agent / factory"
  47. } else if (typeof opts.config.stream === 'function') {
  48. this.stream = opts.config.stream(opts);
  49. } else {
  50. this.stream = opts.config.stream;
  51. }
  52. this._internalId = _connectionId++;
  53. this._commands = new Queue();
  54. this._command = null;
  55. this._paused = false;
  56. this._paused_packets = new Queue();
  57. this._statements = new LRU({
  58. max: this.config.maxPreparedStatements,
  59. dispose: function(key, statement) {
  60. statement.close();
  61. }
  62. });
  63. this.serverCapabilityFlags = 0;
  64. this.authorized = false;
  65. this.sequenceId = 0;
  66. this.compressedSequenceId = 0;
  67. this.threadId = null;
  68. this._handshakePacket = null;
  69. this._fatalError = null;
  70. this._protocolError = null;
  71. this._outOfOrderPackets = [];
  72. this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
  73. this.stream.on('error', this._handleNetworkError.bind(this));
  74. // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
  75. this.packetParser = new PacketParser(p => {
  76. this.handlePacket(p);
  77. });
  78. this.stream.on('data', data => {
  79. if (this.connectTimeout) {
  80. Timers.clearTimeout(this.connectTimeout);
  81. this.connectTimeout = null;
  82. }
  83. this.packetParser.execute(data);
  84. });
  85. this.stream.on('close', () => {
  86. // we need to set this flag everywhere where we want connection to close
  87. if (this._closing) {
  88. return;
  89. }
  90. if (!this._protocolError) {
  91. // no particular error message before disconnect
  92. this._protocolError = new Error(
  93. 'Connection lost: The server closed the connection.'
  94. );
  95. this._protocolError.fatal = true;
  96. this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
  97. }
  98. this._notifyError(this._protocolError);
  99. });
  100. let handshakeCommand;
  101. if (!this.config.isServer) {
  102. handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
  103. handshakeCommand.on('end', () => {
  104. // this happens when handshake finishes early either because there was
  105. // some fatal error or the server sent an error packet instead of
  106. // an hello packet (for example, 'Too many connactions' error)
  107. if (!handshakeCommand.handshake || this._fatalError || this._protocolError) {
  108. return;
  109. }
  110. this._handshakePacket = handshakeCommand.handshake;
  111. this.threadId = handshakeCommand.handshake.connectionId;
  112. this.emit('connect', handshakeCommand.handshake);
  113. });
  114. handshakeCommand.on('error', err => {
  115. this._closing = true;
  116. this._notifyError(err);
  117. });
  118. this.addCommand(handshakeCommand);
  119. }
  120. // in case there was no initiall handshake but we need to read sting, assume it utf-8
  121. // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
  122. // will be overwrittedn with actial encoding value as soon as server handshake packet is received
  123. this.serverEncoding = 'utf8';
  124. if (this.config.connectTimeout) {
  125. const timeoutHandler = this._handleTimeoutError.bind(this);
  126. this.connectTimeout = Timers.setTimeout(
  127. timeoutHandler,
  128. this.config.connectTimeout
  129. );
  130. }
  131. }
  132. promise(promiseImpl) {
  133. const PromiseConnection = require('../promise').PromiseConnection;
  134. return new PromiseConnection(this, promiseImpl);
  135. }
  136. _addCommandClosedState(cmd) {
  137. const err = new Error(
  138. "Can't add new command when connection is in closed state"
  139. );
  140. err.fatal = true;
  141. if (cmd.onResult) {
  142. cmd.onResult(err);
  143. } else {
  144. this.emit('error', err);
  145. }
  146. }
  147. _handleFatalError(err) {
  148. err.fatal = true;
  149. // stop receiving packets
  150. this.stream.removeAllListeners('data');
  151. this.addCommand = this._addCommandClosedState;
  152. this.write = () => {
  153. this.emit('error', new Error("Can't write in closed state"));
  154. };
  155. this._notifyError(err);
  156. this._fatalError = err;
  157. }
  158. _handleNetworkError(err) {
  159. if (this.connectTimeout) {
  160. Timers.clearTimeout(this.connectTimeout);
  161. this.connectTimeout = null;
  162. }
  163. // Do not throw an error when a connection ends with a RST,ACK packet
  164. if (err.errno === 'ECONNRESET' && this._closing) {
  165. return;
  166. }
  167. this._handleFatalError(err);
  168. }
  169. _handleTimeoutError() {
  170. if (this.connectTimeout) {
  171. Timers.clearTimeout(this.connectTimeout);
  172. this.connectTimeout = null;
  173. }
  174. this.stream.destroy && this.stream.destroy();
  175. const err = new Error('connect ETIMEDOUT');
  176. err.errorno = 'ETIMEDOUT';
  177. err.code = 'ETIMEDOUT';
  178. err.syscall = 'connect';
  179. this._handleNetworkError(err);
  180. }
  181. // notify all commands in the queue and bubble error as connection "error"
  182. // called on stream error or unexpected termination
  183. _notifyError(err) {
  184. if (this.connectTimeout) {
  185. Timers.clearTimeout(this.connectTimeout);
  186. this.connectTimeout = null;
  187. }
  188. // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
  189. if (this._fatalError) {
  190. return;
  191. }
  192. let command;
  193. // if there is no active command, notify connection
  194. // if there are commands and all of them have callbacks, pass error via callback
  195. let bubbleErrorToConnection = !this._command;
  196. if (this._command && this._command.onResult) {
  197. this._command.onResult(err);
  198. this._command = null;
  199. // connection handshake is special because we allow it to be implicit
  200. // if error happened during handshake, but there are others commands in queue
  201. // then bubble error to other commands and not to connection
  202. } else if (
  203. !(
  204. this._command &&
  205. this._command.constructor === Commands.ClientHandshake &&
  206. this._commands.length > 0
  207. )
  208. ) {
  209. bubbleErrorToConnection = true;
  210. }
  211. while ((command = this._commands.shift())) {
  212. if (command.onResult) {
  213. command.onResult(err);
  214. } else {
  215. bubbleErrorToConnection = true;
  216. }
  217. }
  218. // notify connection if some comands in the queue did not have callbacks
  219. // or if this is pool connection ( so it can be removed from pool )
  220. if (bubbleErrorToConnection || this._pool) {
  221. this.emit('error', err);
  222. }
  223. // close connection after emitting the event in case of a fatal error
  224. if (err.fatal) {
  225. this.close();
  226. }
  227. }
  228. write(buffer) {
  229. const result = this.stream.write(buffer, err => {
  230. if (err) {
  231. this._handleNetworkError(err);
  232. }
  233. });
  234. if (!result) {
  235. this.stream.emit('pause');
  236. }
  237. }
  238. // http://dev.mysql.com/doc/internals/en/sequence-id.html
  239. //
  240. // The sequence-id is incremented with each packet and may wrap around.
  241. // It starts at 0 and is reset to 0 when a new command
  242. // begins in the Command Phase.
  243. // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
  244. _resetSequenceId() {
  245. this.sequenceId = 0;
  246. this.compressedSequenceId = 0;
  247. }
  248. _bumpCompressedSequenceId(numPackets) {
  249. this.compressedSequenceId += numPackets;
  250. this.compressedSequenceId %= 256;
  251. }
  252. _bumpSequenceId(numPackets) {
  253. this.sequenceId += numPackets;
  254. this.sequenceId %= 256;
  255. }
  256. writePacket(packet) {
  257. const MAX_PACKET_LENGTH = 16777215;
  258. const length = packet.length();
  259. let chunk, offset, header;
  260. if (length < MAX_PACKET_LENGTH) {
  261. packet.writeHeader(this.sequenceId);
  262. if (this.config.debug) {
  263. // eslint-disable-next-line no-console
  264. console.log(
  265. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  266. );
  267. // eslint-disable-next-line no-console
  268. console.log(
  269. `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
  270. );
  271. }
  272. this._bumpSequenceId(1);
  273. this.write(packet.buffer);
  274. } else {
  275. if (this.config.debug) {
  276. // eslint-disable-next-line no-console
  277. console.log(
  278. `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
  279. );
  280. // eslint-disable-next-line no-console
  281. console.log(
  282. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  283. );
  284. }
  285. for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
  286. chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
  287. if (chunk.length === MAX_PACKET_LENGTH) {
  288. header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
  289. } else {
  290. header = Buffer.from([
  291. chunk.length & 0xff,
  292. (chunk.length >> 8) & 0xff,
  293. (chunk.length >> 16) & 0xff,
  294. this.sequenceId
  295. ]);
  296. }
  297. this._bumpSequenceId(1);
  298. this.write(header);
  299. this.write(chunk);
  300. }
  301. }
  302. }
  303. // 0.11+ environment
  304. startTLS(onSecure) {
  305. if (this.config.debug) {
  306. // eslint-disable-next-line no-console
  307. console.log('Upgrading connection to TLS');
  308. }
  309. const secureContext = Tls.createSecureContext({
  310. ca: this.config.ssl.ca,
  311. cert: this.config.ssl.cert,
  312. ciphers: this.config.ssl.ciphers,
  313. key: this.config.ssl.key,
  314. passphrase: this.config.ssl.passphrase,
  315. minVersion: this.config.ssl.minVersion
  316. });
  317. const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
  318. let secureEstablished = false;
  319. const secureSocket = new Tls.TLSSocket(this.stream, {
  320. rejectUnauthorized: rejectUnauthorized,
  321. requestCert: true,
  322. secureContext: secureContext,
  323. isServer: false
  324. });
  325. // error handler for secure socket
  326. secureSocket.on('_tlsError', err => {
  327. if (secureEstablished) {
  328. this._handleNetworkError(err);
  329. } else {
  330. onSecure(err);
  331. }
  332. });
  333. secureSocket.on('secure', () => {
  334. secureEstablished = true;
  335. onSecure(rejectUnauthorized ? secureSocket.ssl.verifyError() : null);
  336. });
  337. secureSocket.on('data', data => {
  338. this.packetParser.execute(data);
  339. });
  340. this.write = buffer => {
  341. secureSocket.write(buffer);
  342. };
  343. // start TLS communications
  344. secureSocket._start();
  345. }
  346. pipe() {
  347. if (this.stream instanceof Net.Stream) {
  348. this.stream.ondata = (data, start, end) => {
  349. this.packetParser.execute(data, start, end);
  350. };
  351. } else {
  352. this.stream.on('data', data => {
  353. this.packetParser.execute(
  354. data.parent,
  355. data.offset,
  356. data.offset + data.length
  357. );
  358. });
  359. }
  360. }
  361. protocolError(message, code) {
  362. // Starting with MySQL 8.0.24, if the client closes the connection
  363. // unexpectedly, the server will send a last ERR Packet, which we can
  364. // safely ignore.
  365. // https://dev.mysql.com/worklog/task/?id=12999
  366. if (this._closing) {
  367. return;
  368. }
  369. const err = new Error(message);
  370. err.fatal = true;
  371. err.code = code || 'PROTOCOL_ERROR';
  372. this.emit('error', err);
  373. }
  374. handlePacket(packet) {
  375. if (this._paused) {
  376. this._paused_packets.push(packet);
  377. return;
  378. }
  379. if (packet) {
  380. if (this.sequenceId !== packet.sequenceId) {
  381. const err = new Error(
  382. `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
  383. );
  384. err.expected = this.sequenceId;
  385. err.received = packet.sequenceId;
  386. this.emit('warn', err); // REVIEW
  387. // eslint-disable-next-line no-console
  388. console.error(err.message);
  389. }
  390. this._bumpSequenceId(packet.numPackets);
  391. }
  392. if (this.config.debug) {
  393. if (packet) {
  394. // eslint-disable-next-line no-console
  395. console.log(
  396. ` raw: ${packet.buffer
  397. .slice(packet.offset, packet.offset + packet.length())
  398. .toString('hex')}`
  399. );
  400. // eslint-disable-next-line no-console
  401. console.trace();
  402. const commandName = this._command
  403. ? this._command._commandName
  404. : '(no command)';
  405. const stateName = this._command
  406. ? this._command.stateName()
  407. : '(no command)';
  408. // eslint-disable-next-line no-console
  409. console.log(
  410. `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
  411. );
  412. }
  413. }
  414. if (!this._command) {
  415. const marker = packet.peekByte();
  416. // If it's an Err Packet, we should use it.
  417. if (marker === 0xff) {
  418. const error = Packets.Error.fromPacket(packet);
  419. this.protocolError(error.message, error.code);
  420. } else {
  421. // Otherwise, it means it's some other unexpected packet.
  422. this.protocolError(
  423. 'Unexpected packet while no commands in the queue',
  424. 'PROTOCOL_UNEXPECTED_PACKET'
  425. );
  426. }
  427. this.close();
  428. return;
  429. }
  430. const done = this._command.execute(packet, this);
  431. if (done) {
  432. this._command = this._commands.shift();
  433. if (this._command) {
  434. this.sequenceId = 0;
  435. this.compressedSequenceId = 0;
  436. this.handlePacket();
  437. }
  438. }
  439. }
  440. addCommand(cmd) {
  441. // this.compressedSequenceId = 0;
  442. // this.sequenceId = 0;
  443. if (this.config.debug) {
  444. const commandName = cmd.constructor.name;
  445. // eslint-disable-next-line no-console
  446. console.log(`Add command: ${commandName}`);
  447. cmd._commandName = commandName;
  448. }
  449. if (!this._command) {
  450. this._command = cmd;
  451. this.handlePacket();
  452. } else {
  453. this._commands.push(cmd);
  454. }
  455. return cmd;
  456. }
  457. format(sql, values) {
  458. if (typeof this.config.queryFormat === 'function') {
  459. return this.config.queryFormat.call(
  460. this,
  461. sql,
  462. values,
  463. this.config.timezone
  464. );
  465. }
  466. const opts = {
  467. sql: sql,
  468. values: values
  469. };
  470. this._resolveNamedPlaceholders(opts);
  471. return SqlString.format(
  472. opts.sql,
  473. opts.values,
  474. this.config.stringifyObjects,
  475. this.config.timezone
  476. );
  477. }
  478. escape(value) {
  479. return SqlString.escape(value, false, this.config.timezone);
  480. }
  481. escapeId(value) {
  482. return SqlString.escapeId(value, false);
  483. }
  484. raw(sql) {
  485. return SqlString.raw(sql);
  486. }
  487. _resolveNamedPlaceholders(options) {
  488. let unnamed;
  489. if (this.config.namedPlaceholders || options.namedPlaceholders) {
  490. if (Array.isArray(options.values)) {
  491. // if an array is provided as the values, assume the conversion is not necessary.
  492. // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled.
  493. return
  494. }
  495. if (convertNamedPlaceholders === null) {
  496. convertNamedPlaceholders = require('named-placeholders')();
  497. }
  498. unnamed = convertNamedPlaceholders(options.sql, options.values);
  499. options.sql = unnamed[0];
  500. options.values = unnamed[1];
  501. }
  502. }
  503. query(sql, values, cb) {
  504. let cmdQuery;
  505. if (sql.constructor === Commands.Query) {
  506. cmdQuery = sql;
  507. } else {
  508. cmdQuery = Connection.createQuery(sql, values, cb, this.config);
  509. }
  510. this._resolveNamedPlaceholders(cmdQuery);
  511. const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []);
  512. cmdQuery.sql = rawSql;
  513. return this.addCommand(cmdQuery);
  514. }
  515. pause() {
  516. this._paused = true;
  517. this.stream.pause();
  518. }
  519. resume() {
  520. let packet;
  521. this._paused = false;
  522. while ((packet = this._paused_packets.shift())) {
  523. this.handlePacket(packet);
  524. // don't resume if packet hander paused connection
  525. if (this._paused) {
  526. return;
  527. }
  528. }
  529. this.stream.resume();
  530. }
  531. // TODO: named placeholders support
  532. prepare(options, cb) {
  533. if (typeof options === 'string') {
  534. options = { sql: options };
  535. }
  536. return this.addCommand(new Commands.Prepare(options, cb));
  537. }
  538. unprepare(sql) {
  539. let options = {};
  540. if (typeof sql === 'object') {
  541. options = sql;
  542. } else {
  543. options.sql = sql;
  544. }
  545. const key = Connection.statementKey(options);
  546. const stmt = this._statements.get(key);
  547. if (stmt) {
  548. this._statements.del(key);
  549. stmt.close();
  550. }
  551. return stmt;
  552. }
  553. execute(sql, values, cb) {
  554. let options = {};
  555. if (typeof sql === 'object') {
  556. // execute(options, cb)
  557. options = sql;
  558. if (typeof values === 'function') {
  559. cb = values;
  560. } else {
  561. options.values = options.values || values;
  562. }
  563. } else if (typeof values === 'function') {
  564. // execute(sql, cb)
  565. cb = values;
  566. options.sql = sql;
  567. options.values = undefined;
  568. } else {
  569. // execute(sql, values, cb)
  570. options.sql = sql;
  571. options.values = values;
  572. }
  573. this._resolveNamedPlaceholders(options);
  574. // check for values containing undefined
  575. if (options.values) {
  576. //If namedPlaceholder is not enabled and object is passed as bind parameters
  577. if (!Array.isArray(options.values)) {
  578. throw new TypeError(
  579. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  580. );
  581. }
  582. options.values.forEach(val => {
  583. //If namedPlaceholder is not enabled and object is passed as bind parameters
  584. if (!Array.isArray(options.values)) {
  585. throw new TypeError(
  586. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  587. );
  588. }
  589. if (val === undefined) {
  590. throw new TypeError(
  591. 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
  592. );
  593. }
  594. if (typeof val === 'function') {
  595. throw new TypeError(
  596. 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
  597. );
  598. }
  599. });
  600. }
  601. const executeCommand = new Commands.Execute(options, cb);
  602. const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
  603. if (err) {
  604. // skip execute command if prepare failed, we have main
  605. // combined callback here
  606. executeCommand.start = function() {
  607. return null;
  608. };
  609. if (cb) {
  610. cb(err);
  611. } else {
  612. executeCommand.emit('error', err);
  613. }
  614. executeCommand.emit('end');
  615. return;
  616. }
  617. executeCommand.statement = stmt;
  618. });
  619. this.addCommand(prepareCommand);
  620. this.addCommand(executeCommand);
  621. return executeCommand;
  622. }
  623. changeUser(options, callback) {
  624. if (!callback && typeof options === 'function') {
  625. callback = options;
  626. options = {};
  627. }
  628. const charsetNumber = options.charset
  629. ? ConnectionConfig.getCharsetNumber(options.charset)
  630. : this.config.charsetNumber;
  631. return this.addCommand(
  632. new Commands.ChangeUser(
  633. {
  634. user: options.user || this.config.user,
  635. password: options.password || this.config.password,
  636. passwordSha1: options.passwordSha1 || this.config.passwordSha1,
  637. database: options.database || this.config.database,
  638. timeout: options.timeout,
  639. charsetNumber: charsetNumber,
  640. currentConfig: this.config
  641. },
  642. err => {
  643. if (err) {
  644. err.fatal = true;
  645. }
  646. if (callback) {
  647. callback(err);
  648. }
  649. }
  650. )
  651. );
  652. }
  653. // transaction helpers
  654. beginTransaction(cb) {
  655. return this.query('START TRANSACTION', cb);
  656. }
  657. commit(cb) {
  658. return this.query('COMMIT', cb);
  659. }
  660. rollback(cb) {
  661. return this.query('ROLLBACK', cb);
  662. }
  663. ping(cb) {
  664. return this.addCommand(new Commands.Ping(cb));
  665. }
  666. _registerSlave(opts, cb) {
  667. return this.addCommand(new Commands.RegisterSlave(opts, cb));
  668. }
  669. _binlogDump(opts, cb) {
  670. return this.addCommand(new Commands.BinlogDump(opts, cb));
  671. }
  672. // currently just alias to close
  673. destroy() {
  674. this.close();
  675. }
  676. close() {
  677. if (this.connectTimeout) {
  678. Timers.clearTimeout(this.connectTimeout);
  679. this.connectTimeout = null;
  680. }
  681. this._closing = true;
  682. this.stream.end();
  683. this.addCommand = this._addCommandClosedState;
  684. }
  685. createBinlogStream(opts) {
  686. // TODO: create proper stream class
  687. // TODO: use through2
  688. let test = 1;
  689. const stream = new Readable({ objectMode: true });
  690. stream._read = function() {
  691. return {
  692. data: test++
  693. };
  694. };
  695. this._registerSlave(opts, () => {
  696. const dumpCmd = this._binlogDump(opts);
  697. dumpCmd.on('event', ev => {
  698. stream.push(ev);
  699. });
  700. dumpCmd.on('eof', () => {
  701. stream.push(null);
  702. // if non-blocking, then close stream to prevent errors
  703. if (opts.flags && opts.flags & 0x01) {
  704. this.close();
  705. }
  706. });
  707. // TODO: pipe errors as well
  708. });
  709. return stream;
  710. }
  711. connect(cb) {
  712. if (!cb) {
  713. return;
  714. }
  715. if (this._fatalError || this._protocolError) {
  716. return cb(this._fatalError || this._protocolError);
  717. }
  718. if (this._handshakePacket) {
  719. return cb(null, this);
  720. }
  721. let connectCalled = 0;
  722. function callbackOnce(isErrorHandler) {
  723. return function(param) {
  724. if (!connectCalled) {
  725. if (isErrorHandler) {
  726. cb(param);
  727. } else {
  728. cb(null, param);
  729. }
  730. }
  731. connectCalled = 1;
  732. };
  733. }
  734. this.once('error', callbackOnce(true));
  735. this.once('connect', callbackOnce(false));
  736. }
  737. // ===================================
  738. // outgoing server connection methods
  739. // ===================================
  740. writeColumns(columns) {
  741. this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
  742. columns.forEach(column => {
  743. this.writePacket(
  744. Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
  745. );
  746. });
  747. this.writeEof();
  748. }
  749. // row is array of columns, not hash
  750. writeTextRow(column) {
  751. this.writePacket(
  752. Packets.TextRow.toPacket(column, this.serverConfig.encoding)
  753. );
  754. }
  755. writeTextResult(rows, columns) {
  756. this.writeColumns(columns);
  757. rows.forEach(row => {
  758. const arrayRow = new Array(columns.length);
  759. columns.forEach(column => {
  760. arrayRow.push(row[column.name]);
  761. });
  762. this.writeTextRow(arrayRow);
  763. });
  764. this.writeEof();
  765. }
  766. writeEof(warnings, statusFlags) {
  767. this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
  768. }
  769. writeOk(args) {
  770. if (!args) {
  771. args = { affectedRows: 0 };
  772. }
  773. this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
  774. }
  775. writeError(args) {
  776. // if we want to send error before initial hello was sent, use default encoding
  777. const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
  778. this.writePacket(Packets.Error.toPacket(args, encoding));
  779. }
  780. serverHandshake(args) {
  781. this.serverConfig = args;
  782. this.serverConfig.encoding =
  783. CharsetToEncoding[this.serverConfig.characterSet];
  784. return this.addCommand(new Commands.ServerHandshake(args));
  785. }
  786. // ===============================================================
  787. end(callback) {
  788. if (this.config.isServer) {
  789. this._closing = true;
  790. const quitCmd = new EventEmitter();
  791. setImmediate(() => {
  792. this.stream.end();
  793. quitCmd.emit('end');
  794. });
  795. return quitCmd;
  796. }
  797. // trigger error if more commands enqueued after end command
  798. const quitCmd = this.addCommand(new Commands.Quit(callback));
  799. this.addCommand = this._addCommandClosedState;
  800. return quitCmd;
  801. }
  802. static createQuery(sql, values, cb, config) {
  803. let options = {
  804. rowsAsArray: config.rowsAsArray
  805. };
  806. if (typeof sql === 'object') {
  807. // query(options, cb)
  808. options = sql;
  809. if (typeof values === 'function') {
  810. cb = values;
  811. } else if (values !== undefined) {
  812. options.values = values;
  813. }
  814. } else if (typeof values === 'function') {
  815. // query(sql, cb)
  816. cb = values;
  817. options.sql = sql;
  818. options.values = undefined;
  819. } else {
  820. // query(sql, values, cb)
  821. options.sql = sql;
  822. options.values = values;
  823. }
  824. return new Commands.Query(options, cb);
  825. }
  826. static statementKey(options) {
  827. return (
  828. `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
  829. );
  830. }
  831. }
  832. if (Tls.TLSSocket) {
  833. // not supported
  834. } else {
  835. Connection.prototype.startTLS = function _startTLS(onSecure) {
  836. if (this.config.debug) {
  837. // eslint-disable-next-line no-console
  838. console.log('Upgrading connection to TLS');
  839. }
  840. const crypto = require('crypto');
  841. const config = this.config;
  842. const stream = this.stream;
  843. const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
  844. const credentials = crypto.createCredentials({
  845. key: config.ssl.key,
  846. cert: config.ssl.cert,
  847. passphrase: config.ssl.passphrase,
  848. ca: config.ssl.ca,
  849. ciphers: config.ssl.ciphers
  850. });
  851. const securePair = Tls.createSecurePair(
  852. credentials,
  853. false,
  854. true,
  855. rejectUnauthorized
  856. );
  857. if (stream.ondata) {
  858. stream.ondata = null;
  859. }
  860. stream.removeAllListeners('data');
  861. stream.pipe(securePair.encrypted);
  862. securePair.encrypted.pipe(stream);
  863. securePair.cleartext.on('data', data => {
  864. this.packetParser.execute(data);
  865. });
  866. this.write = function(buffer) {
  867. securePair.cleartext.write(buffer);
  868. };
  869. securePair.on('secure', () => {
  870. onSecure(rejectUnauthorized ? securePair.ssl.verifyError() : null);
  871. });
  872. };
  873. }
  874. module.exports = Connection;