const fs = require('fs'); const is = require('is-type-of'); const util = require('util'); const path = require('path'); const mime = require('mime'); const { isFile } = require('./common/utils/isFile'); const { isArray } = require('./common/utils/isArray'); const { isBuffer } = require('./common/utils/isBuffer'); const { retry } = require('./common/utils/retry'); const proto = exports; /** * Multipart operations */ /** * Upload a file to OSS using multipart uploads * @param {String} name * @param {String|File|Buffer} file * @param {Object} options * {Object} options.callback The callback parameter is composed of a JSON string encoded in Base64 * {String} options.callback.url the OSS sends a callback request to this URL * {String} options.callback.host The host header value for initiating callback requests * {String} options.callback.body The value of the request body when a callback is initiated * {String} options.callback.contentType The Content-Type of the callback requests initiatiated * {Object} options.callback.customValue Custom parameters are a map of key-values, e.g: * customValue = { * key1: 'value1', * key2: 'value2' * } */ proto.multipartUpload = async function multipartUpload(name, file, options) { this.resetCancelFlag(); options = options || {}; if (options.checkpoint && options.checkpoint.uploadId) { return await this._resumeMultipart(options.checkpoint, options); } const minPartSize = 100 * 1024; if (!options.mime) { if (isFile(file)) { options.mime = mime.getType(path.extname(file.name)); } else if (isBuffer(file)) { options.mime = ''; } else { options.mime = mime.getType(path.extname(file)); } } options.headers = options.headers || {}; this._convertMetaToHeaders(options.meta, options.headers); const fileSize = await this._getFileSize(file); if (fileSize < minPartSize) { options.contentLength = fileSize; const result = await this.put(name, file, options); if (options && options.progress) { await options.progress(1); } const ret = { res: result.res, bucket: this.options.bucket, name, etag: result.res.headers.etag }; if ((options.headers && options.headers['x-oss-callback']) || options.callback) { ret.data = result.data; } return ret; } if (options.partSize && !(parseInt(options.partSize, 10) === options.partSize)) { throw new Error('partSize must be int number'); } if (options.partSize && options.partSize < minPartSize) { throw new Error(`partSize must not be smaller than ${minPartSize}`); } const initResult = await this.initMultipartUpload(name, options); const { uploadId } = initResult; const partSize = this._getPartSize(fileSize, options.partSize); const checkpoint = { file, name, fileSize, partSize, uploadId, doneParts: [] }; if (options && options.progress) { await options.progress(0, checkpoint, initResult.res); } return await this._resumeMultipart(checkpoint, options); }; /* * Resume multipart upload from checkpoint. The checkpoint will be * updated after each successful part upload. * @param {Object} checkpoint the checkpoint * @param {Object} options */ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) { const that = this; if (this.isCancel()) { throw this._makeCancelEvent(); } const { file, fileSize, partSize, uploadId, doneParts, name } = checkpoint; const partOffs = this._divideParts(fileSize, partSize); const numParts = partOffs.length; let uploadPartJob = retry( (self, partNo) => { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { try { if (!self.isCancel()) { const pi = partOffs[partNo - 1]; const stream = await self._createStream(file, pi.start, pi.end); const data = { stream, size: pi.end - pi.start }; if (isArray(self.multipartUploadStreams)) { self.multipartUploadStreams.push(data.stream); } else { self.multipartUploadStreams = [data.stream]; } const removeStreamFromMultipartUploadStreams = function () { if (!stream.destroyed) { stream.destroy(); } const index = self.multipartUploadStreams.indexOf(stream); if (index !== -1) { self.multipartUploadStreams.splice(index, 1); } }; stream.on('close', removeStreamFromMultipartUploadStreams); stream.on('error', removeStreamFromMultipartUploadStreams); let result; try { result = await self._uploadPart(name, uploadId, partNo, data, options); } catch (error) { removeStreamFromMultipartUploadStreams(); if (error.status === 404) { throw self._makeAbortEvent(); } throw error; } if (!self.isCancel()) { doneParts.push({ number: partNo, etag: result.res.headers.etag }); checkpoint.doneParts = doneParts; if (options.progress) { await options.progress(doneParts.length / (numParts + 1), checkpoint, result.res); } } } resolve(); } catch (err) { err.partNum = partNo; reject(err); } }); }, this.options.retryMax, { errorHandler: err => { const _errHandle = _err => { const statusErr = [-1, -2].includes(_err.status); const requestErrorRetryHandle = this.options.requestErrorRetryHandle || (() => true); return statusErr && requestErrorRetryHandle(_err); }; return !!_errHandle(err); } } ); const all = Array.from(new Array(numParts), (x, i) => i + 1); const done = doneParts.map(p => p.number); const todo = all.filter(p => done.indexOf(p) < 0); const defaultParallel = 5; const parallel = options.parallel || defaultParallel; if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) { for (let i = 0; i < todo.length; i++) { if (this.isCancel()) { throw this._makeCancelEvent(); } /* eslint no-await-in-loop: [0] */ await uploadPartJob(this, todo[i]); } } else { // upload in parallel const jobErr = await this._parallel(todo, parallel, value => { return new Promise((resolve, reject) => { uploadPartJob(that, value) .then(() => { resolve(); }) .catch(reject); }); }); const abortEvent = jobErr.find(err => err.name === 'abort'); if (abortEvent) throw abortEvent; if (this.isCancel()) { uploadPartJob = null; throw this._makeCancelEvent(); } if (jobErr && jobErr.length > 0) { jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${ jobErr[0].partNum }`; throw jobErr[0]; } } return await this.completeMultipartUpload(name, uploadId, doneParts, options); }; /** * Get file size */ proto._getFileSize = async function _getFileSize(file) { if (isBuffer(file)) { return file.length; } else if (isFile(file)) { return file.size; } else if (is.string(file)) { const stat = await this._statFile(file); return stat.size; } throw new Error('_getFileSize requires Buffer/File/String.'); }; /* * Readable stream for Web File */ const { Readable } = require('stream'); function WebFileReadStream(file, options) { if (!(this instanceof WebFileReadStream)) { return new WebFileReadStream(file, options); } Readable.call(this, options); this.file = file; this.reader = new FileReader(); this.start = 0; this.finish = false; this.fileBuffer = null; } util.inherits(WebFileReadStream, Readable); WebFileReadStream.prototype.readFileAndPush = function readFileAndPush(size) { if (this.fileBuffer) { let pushRet = true; while (pushRet && this.fileBuffer && this.start < this.fileBuffer.length) { const { start } = this; let end = start + size; end = end > this.fileBuffer.length ? this.fileBuffer.length : end; this.start = end; pushRet = this.push(this.fileBuffer.slice(start, end)); } } }; WebFileReadStream.prototype._read = function _read(size) { if ( (this.file && this.start >= this.file.size) || (this.fileBuffer && this.start >= this.fileBuffer.length) || this.finish || (this.start === 0 && !this.file) ) { if (!this.finish) { this.fileBuffer = null; this.finish = true; } this.push(null); return; } const defaultReadSize = 16 * 1024; size = size || defaultReadSize; const that = this; this.reader.onload = function (e) { that.fileBuffer = Buffer.from(new Uint8Array(e.target.result)); that.file = null; that.readFileAndPush(size); }; this.reader.onerror = function onload(e) { const error = e.srcElement && e.srcElement.error; if (error) { throw error; } throw e; }; if (this.start === 0) { this.reader.readAsArrayBuffer(this.file); } else { this.readFileAndPush(size); } }; proto._createStream = function _createStream(file, start, end) { if (is.readableStream(file)) { return file; } else if (isFile(file)) { return new WebFileReadStream(file.slice(start, end)); } else if (isBuffer(file)) { const iterable = file.subarray(start, end); // we can't use Readable.from() since it is only support in Node v10 return new Readable({ read() { this.push(iterable); this.push(null); } }); } else if (is.string(file)) { return fs.createReadStream(file, { start, end: end - 1 }); } throw new Error('_createStream requires Buffer/File/String.'); }; proto._getPartSize = function _getPartSize(fileSize, partSize) { const maxNumParts = 10 * 1000; const defaultPartSize = 1 * 1024 * 1024; if (!partSize) partSize = defaultPartSize; const safeSize = Math.ceil(fileSize / maxNumParts); if (partSize < safeSize) { partSize = safeSize; console.warn( `partSize has been set to ${partSize}, because the partSize you provided causes partNumber to be greater than 10,000` ); } return partSize; }; proto._divideParts = function _divideParts(fileSize, partSize) { const numParts = Math.ceil(fileSize / partSize); const partOffs = []; for (let i = 0; i < numParts; i++) { const start = partSize * i; const end = Math.min(start + partSize, fileSize); partOffs.push({ start, end }); } return partOffs; };