context.js 8.1 KB


  1. 'use strict';
  2. const Readable = require('stream').Readable;
  3. const path = require('path');
  4. const uuid = require('uuid');
  5. const parse = require('co-busboy');
  6. const sendToWormhole = require('stream-wormhole');
  7. const moment = require('moment');
  8. const fs = require('mz/fs');
  9. const mkdirp = require('mz-modules/mkdirp');
  10. const pump = require('mz-modules/pump');
  11. const rimraf = require('mz-modules/rimraf');
  12. const bytes = require('humanize-bytes');
  13. class EmptyStream extends Readable {
  14. _read() {
  15. this.push(null);
  16. }
  17. }
  18. const HAS_CONSUMED = Symbol('Context#multipartHasConsumed');
  19. function limit(code, message) {
  20. // throw 413 error
  21. const err = new Error(message);
  22. err.code = code;
  23. err.status = 413;
  24. throw err;
  25. }
  26. module.exports = {
  27. /**
  28. * clean up request tmp files helper
  29. * @function Context#cleanupRequestFiles
  30. * @param {Array<String>} [files] - file paths need to clenup, default is `ctx.request.files`.
  31. */
  32. async cleanupRequestFiles(files) {
  33. if (!files || !files.length) {
  34. files = this.request.files;
  35. }
  36. if (Array.isArray(files)) {
  37. for (const file of files) {
  38. try {
  39. await rimraf(file.filepath);
  40. } catch (err) {
  41. // warning log
  42. this.coreLogger.warn('[egg-multipart-cleanupRequestFiles-error] file: %j, error: %s',
  43. file, err);
  44. }
  45. }
  46. }
  47. },
  48. /**
  49. * save request multipart data and files to `ctx.request`
  50. * @function Context#saveRequestFiles
  51. * @param {Object} options
  52. * - {String} options.defCharset
  53. * - {Object} options.limits
  54. * - {Function} options.checkFile
  55. */
  56. async saveRequestFiles(options) {
  57. options = options || {};
  58. const ctx = this;
  59. const multipartOptions = {
  60. autoFields: false,
  61. };
  62. if (options.defCharset) multipartOptions.defCharset = options.defCharset;
  63. if (options.limits) multipartOptions.limits = options.limits;
  64. if (options.checkFile) multipartOptions.checkFile = options.checkFile;
  65. const allowArrayField = ctx.app.config.multipart.allowArrayField;
  66. let storedir;
  67. const requestBody = {};
  68. const requestFiles = [];
  69. const parts = ctx.multipart(multipartOptions);
  70. let part;
  71. do {
  72. try {
  73. part = await parts();
  74. } catch (err) {
  75. await ctx.cleanupRequestFiles(requestFiles);
  76. throw err;
  77. }
  78. if (!part) break;
  79. if (part.length) {
  80. ctx.coreLogger.debug('[egg-multipart:storeMultipart] handle value part: %j', part);
  81. const fieldnameTruncated = part[2];
  82. const valueTruncated = part[3];
  83. if (valueTruncated) {
  84. await ctx.cleanupRequestFiles(requestFiles);
  85. limit('Request_fieldSize_limit', 'Reach fieldSize limit');
  86. }
  87. if (fieldnameTruncated) {
  88. await ctx.cleanupRequestFiles(requestFiles);
  89. limit('Request_fieldNameSize_limit', 'Reach fieldNameSize limit');
  90. }
  91. // arrays are busboy fields
  92. const [ filedName, fieldValue ] = part;
  93. if (!allowArrayField) {
  94. requestBody[filedName] = fieldValue;
  95. } else {
  96. if (!requestBody[filedName]) {
  97. requestBody[filedName] = fieldValue;
  98. } else if (!Array.isArray(requestBody[filedName])) {
  99. requestBody[filedName] = [ requestBody[filedName], fieldValue ];
  100. } else {
  101. requestBody[filedName].push(fieldValue);
  102. }
  103. }
  104. continue;
  105. }
  106. // otherwise, it's a stream
  107. const meta = {
  108. field: part.fieldname,
  109. filename: part.filename,
  110. encoding: part.encoding,
  111. mime: part.mime,
  112. };
  113. // keep same property name as file stream
  114. // https://github.com/cojs/busboy/blob/master/index.js#L114
  115. meta.fieldname = meta.field;
  116. meta.transferEncoding = meta.encoding;
  117. meta.mimeType = meta.mime;
  118. ctx.coreLogger.debug('[egg-multipart:storeMultipart] handle stream part: %j', meta);
  119. // empty part, ignore it
  120. if (!part.filename) {
  121. await sendToWormhole(part);
  122. continue;
  123. }
  124. if (!storedir) {
  125. // ${tmpdir}/YYYY/MM/DD/HH
  126. storedir = path.join(ctx.app.config.multipart.tmpdir, moment().format('YYYY/MM/DD/HH'));
  127. const exists = await fs.exists(storedir);
  128. if (!exists) {
  129. await mkdirp(storedir);
  130. }
  131. }
  132. const filepath = path.join(storedir, uuid.v4() + path.extname(meta.filename));
  133. const target = fs.createWriteStream(filepath);
  134. await pump(part, target);
  135. // https://github.com/mscdex/busboy/blob/master/lib/types/multipart.js#L221
  136. meta.filepath = filepath;
  137. requestFiles.push(meta);
  138. // https://github.com/mscdex/busboy/blob/master/lib/types/multipart.js#L221
  139. if (part.truncated) {
  140. await ctx.cleanupRequestFiles(requestFiles);
  141. limit('Request_fileSize_limit', 'Reach fileSize limit');
  142. }
  143. } while (part != null);
  144. ctx.request.body = requestBody;
  145. ctx.request.files = requestFiles;
  146. },
  147. /**
  148. * create multipart.parts instance, to get separated files.
  149. * @function Context#multipart
  150. * @param {Object} [options] - override default multipart configurations
  151. * - {Boolean} options.autoFields
  152. * - {String} options.defCharset
  153. * - {Object} options.limits
  154. * - {Function} options.checkFile
  155. * @return {Yieldable} parts
  156. */
  157. multipart(options) {
  158. // multipart/form-data
  159. if (!this.is('multipart')) {
  160. this.throw(400, 'Content-Type must be multipart/*');
  161. }
  162. if (this[HAS_CONSUMED]) throw new TypeError('the multipart request can\'t be consumed twice');
  163. this[HAS_CONSUMED] = true;
  164. const parseOptions = Object.assign({}, this.app.config.multipartParseOptions);
  165. options = options || {};
  166. if (typeof options.autoFields === 'boolean') parseOptions.autoFields = options.autoFields;
  167. if (options.defCharset) parseOptions.defCharset = options.defCharset;
  168. if (options.checkFile) parseOptions.checkFile = options.checkFile;
  169. // merge and create a new limits object
  170. if (options.limits) {
  171. const limits = options.limits;
  172. for (const key in limits) {
  173. if (/^\w+Size$/.test(key)) {
  174. limits[key] = bytes(limits[key]);
  175. }
  176. }
  177. parseOptions.limits = Object.assign({}, parseOptions.limits, limits);
  178. }
  179. return parse(this, parseOptions);
  180. },
  181. /**
  182. * get upload file stream
  183. * @example
  184. * ```js
  185. * const stream = await ctx.getFileStream();
  186. * // get other fields
  187. * console.log(stream.fields);
  188. * ```
  189. * @function Context#getFileStream
  190. * @param {Object} options
  191. * - {Boolean} options.requireFile - required file submit, default is true
  192. * - {String} options.defCharset
  193. * - {Object} options.limits
  194. * - {Function} options.checkFile
  195. * @return {ReadStream} stream
  196. * @since 1.0.0
  197. */
  198. async getFileStream(options) {
  199. options = options || {};
  200. const multipartOptions = {
  201. autoFields: true,
  202. };
  203. if (options.defCharset) multipartOptions.defCharset = options.defCharset;
  204. if (options.limits) multipartOptions.limits = options.limits;
  205. if (options.checkFile) multipartOptions.checkFile = options.checkFile;
  206. const parts = this.multipart(multipartOptions);
  207. let stream = await parts();
  208. if (options.requireFile !== false) {
  209. // stream not exists, treat as an exception
  210. if (!stream || !stream.filename) {
  211. this.throw(400, 'Can\'t found upload file');
  212. }
  213. }
  214. if (!stream) {
  215. stream = new EmptyStream();
  216. }
  217. if (stream.truncated) {
  218. limit('Request_fileSize_limit', 'Request file too large, please check multipart config');
  219. }
  220. stream.fields = parts.field;
  221. stream.once('limit', () => {
  222. const err = new Error('Request file too large, please check multipart config');
  223. err.name = 'MultipartFileTooLargeError';
  224. err.status = 413;
  225. err.fields = stream.fields;
  226. err.filename = stream.filename;
  227. if (stream.listenerCount('error') > 0) {
  228. stream.emit('error', err);
  229. this.coreLogger.warn(err);
  230. } else {
  231. this.coreLogger.error(err);
  232. // ignore next error event
  233. stream.on('error', () => { });
  234. }
  235. // ignore all data
  236. stream.resume();
  237. });
  238. return stream;
  239. },
  240. };