mirror of https://github.com/jkjoy/sunpeiwen.git
178 lines
4.1 KiB
JavaScript
178 lines
4.1 KiB
JavaScript
|
const { isArray } = require('./utils/isArray');
|
||
|
|
||
|
const proto = exports;
|
||
|
|
||
|
proto._parallelNode = async function _parallelNode(todo, parallel, fn, sourceData) {
|
||
|
const that = this;
|
||
|
// upload in parallel
|
||
|
const jobErr = [];
|
||
|
let jobs = [];
|
||
|
const tempBatch = todo.length / parallel;
|
||
|
const remainder = todo.length % parallel;
|
||
|
const batch = remainder === 0 ? tempBatch : (todo.length - remainder) / parallel + 1;
|
||
|
let taskIndex = 1;
|
||
|
for (let i = 0; i < todo.length; i++) {
|
||
|
if (that.isCancel()) {
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
if (sourceData) {
|
||
|
jobs.push(fn(that, todo[i], sourceData));
|
||
|
} else {
|
||
|
jobs.push(fn(that, todo[i]));
|
||
|
}
|
||
|
|
||
|
if (jobs.length === parallel || (taskIndex === batch && i === todo.length - 1)) {
|
||
|
try {
|
||
|
taskIndex += 1;
|
||
|
/* eslint no-await-in-loop: [0] */
|
||
|
await Promise.all(jobs);
|
||
|
} catch (err) {
|
||
|
jobErr.push(err);
|
||
|
}
|
||
|
jobs = [];
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return jobErr;
|
||
|
};
|
||
|
|
||
|
proto._parallel = function _parallel(todo, parallel, jobPromise) {
|
||
|
const that = this;
|
||
|
return new Promise(resolve => {
|
||
|
const _jobErr = [];
|
||
|
if (parallel <= 0 || !todo) {
|
||
|
resolve(_jobErr);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
function onlyOnce(fn) {
|
||
|
return function (...args) {
|
||
|
if (fn === null) throw new Error('Callback was already called.');
|
||
|
const callFn = fn;
|
||
|
fn = null;
|
||
|
callFn.apply(this, args);
|
||
|
};
|
||
|
}
|
||
|
|
||
|
function createArrayIterator(coll) {
|
||
|
let i = -1;
|
||
|
const len = coll.length;
|
||
|
return function next() {
|
||
|
return ++i < len && !that.isCancel() ? { value: coll[i], key: i } : null;
|
||
|
};
|
||
|
}
|
||
|
|
||
|
const nextElem = createArrayIterator(todo);
|
||
|
let done = false;
|
||
|
let running = 0;
|
||
|
let looping = false;
|
||
|
|
||
|
function iterateeCallback(err) {
|
||
|
running -= 1;
|
||
|
if (err) {
|
||
|
done = true;
|
||
|
_jobErr.push(err);
|
||
|
resolve(_jobErr);
|
||
|
} else if (done && running <= 0) {
|
||
|
done = true;
|
||
|
resolve(_jobErr);
|
||
|
} else if (!looping) {
|
||
|
/* eslint no-use-before-define: [0] */
|
||
|
if (that.isCancel()) {
|
||
|
resolve(_jobErr);
|
||
|
} else {
|
||
|
replenish();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function iteratee(value, callback) {
|
||
|
jobPromise(value)
|
||
|
.then(result => {
|
||
|
callback(null, result);
|
||
|
})
|
||
|
.catch(err => {
|
||
|
callback(err);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
function replenish() {
|
||
|
looping = true;
|
||
|
while (running < parallel && !done && !that.isCancel()) {
|
||
|
const elem = nextElem();
|
||
|
if (elem === null || _jobErr.length > 0) {
|
||
|
done = true;
|
||
|
if (running <= 0) {
|
||
|
resolve(_jobErr);
|
||
|
}
|
||
|
return;
|
||
|
}
|
||
|
running += 1;
|
||
|
iteratee(elem.value, onlyOnce(iterateeCallback));
|
||
|
}
|
||
|
looping = false;
|
||
|
}
|
||
|
|
||
|
replenish();
|
||
|
});
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* cancel operation, now can use with multipartUpload
|
||
|
* @param {Object} abort
|
||
|
* {String} anort.name object key
|
||
|
* {String} anort.uploadId upload id
|
||
|
* {String} anort.options timeout
|
||
|
*/
|
||
|
proto.cancel = function cancel(abort) {
|
||
|
this.options.cancelFlag = true;
|
||
|
|
||
|
if (isArray(this.multipartUploadStreams)) {
|
||
|
this.multipartUploadStreams.forEach(_ => {
|
||
|
if (_.destroyed === false) {
|
||
|
const err = {
|
||
|
name: 'cancel',
|
||
|
message: 'cancel'
|
||
|
};
|
||
|
_.destroy(err);
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
this.multipartUploadStreams = [];
|
||
|
if (abort) {
|
||
|
this.abortMultipartUpload(abort.name, abort.uploadId, abort.options);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
proto.isCancel = function isCancel() {
|
||
|
return this.options.cancelFlag;
|
||
|
};
|
||
|
|
||
|
proto.resetCancelFlag = function resetCancelFlag() {
|
||
|
this.options.cancelFlag = false;
|
||
|
};
|
||
|
|
||
|
proto._stop = function _stop() {
|
||
|
this.options.cancelFlag = true;
|
||
|
};
|
||
|
|
||
|
// cancel is not error , so create an object
|
||
|
proto._makeCancelEvent = function _makeCancelEvent() {
|
||
|
const cancelEvent = {
|
||
|
status: 0,
|
||
|
name: 'cancel'
|
||
|
};
|
||
|
return cancelEvent;
|
||
|
};
|
||
|
|
||
|
// abort is not error , so create an object
|
||
|
proto._makeAbortEvent = function _makeAbortEvent() {
|
||
|
const abortEvent = {
|
||
|
status: 0,
|
||
|
name: 'abort',
|
||
|
message: 'upload task has been abort'
|
||
|
};
|
||
|
return abortEvent;
|
||
|
};
|