var { Transform } = require('stream'); var sysUtil = require('util'); var util = require('./util'); function SelectStream(options) { if (!(this instanceof SelectStream)) return new SelectStream(options); Transform.call(this, options); Object.assign(this, { totalLength: 0, // current message block's total length headerLength: 0, // current message block's header length payloadRestLength: 0, // current message block's rest payload length header: null, // current message block's header chunk: Buffer.alloc(0), // the data chunk being parsed callback: null, // current _transform function's callback }); } SelectStream.prototype = { /** * process data chunk * concat the last chunk and current chunk * try to parse current message block's totalLength and headerLength * try to parse current message block's header * try to parse current message block's payload */ processChunk(chunk, encoding, callback) { Object.assign(this, { chunk: Buffer.concat([this.chunk, chunk], this.chunk.length + chunk.length), encoding, callback, }); this.parseLength(); this.parseHeader(); this.parsePayload(); }, /** * try to parse current message block's totalLength and headerLength */ parseLength() { if (!this.callback) { return; } if (this.totalLength && this.headerLength) { return; } if (this.chunk.length >= 12) { this.totalLength = this.chunk.readInt32BE(0); this.headerLength = this.chunk.readInt32BE(4); this.payloadRestLength = this.totalLength - this.headerLength - 16; this.chunk = this.chunk.slice(12); } else { this.callback(); this.callback = null; } }, /** * try to parse current message block's header * if header[':message-type'] is error, callback the error, emit error to next stream */ parseHeader() { if (!this.callback) { return; } if (!this.headerLength || this.header) { return; } if (this.chunk.length >= this.headerLength) { var header = {}; var offset = 0; while (offset < this.headerLength) { var headerNameLength = this.chunk[offset] * 1; var headerName = this.chunk.toString('ascii', offset + 1, offset + 1 + headerNameLength); var headerValueLength = this.chunk.readInt16BE(offset + headerNameLength + 2); var headerValue = this.chunk.toString( 'ascii', offset + headerNameLength + 4, offset + headerNameLength + 4 + headerValueLength, ); header[headerName] = headerValue; offset += headerNameLength + 4 + headerValueLength; } this.header = header; this.chunk = this.chunk.slice(this.headerLength); this.checkErrorHeader(); } else { this.callback(); this.callback = null; } }, /** * try to parse current message block's payload */ parsePayload() { var self = this; if (!this.callback) { return; } if (this.chunk.length <= this.payloadRestLength) { this.payloadRestLength -= this.chunk.length; this.pushData(this.chunk); this.chunk = Buffer.alloc(0); } else if (this.chunk.length < this.payloadRestLength + 4) { this.pushData(this.chunk.slice(0, this.payloadRestLength)); this.chunk = this.chunk.slice(this.payloadRestLength); this.payloadRestLength = 0; } else { this.pushData(this.chunk.slice(0, this.payloadRestLength)); this.chunk = this.chunk.slice(this.payloadRestLength + 4); this.totalLength = 0; this.headerLength = 0; this.payloadRestLength = 0; this.header = null; } if (this.chunk.length && !(this.payloadRestLength === 0 && this.chunk.length < 4)) { process.nextTick(function () { self.processChunk(Buffer.alloc(0), self.encoding, self.callback); }); } else { this.callback(); this.callback = null; } }, /** * if header[':event-type'] is Records, pipe payload to next stream */ pushData(content) { if (this.header[':event-type'] === 'Records') { this.push(content); this.emit('message:records', content); } else if (this.header[':event-type'] === 'Progress') { var progress = util.xml2json(content.toString()).Progress; this.emit('message:progress', progress); } else if (this.header[':event-type'] === 'Stats') { var stats = util.xml2json(content.toString()).Stats; this.emit('message:stats', stats); } else if (this.header[':event-type'] === 'error') { var errCode = this.header[':error-code']; var errMessage = this.header[':error-message']; var err = new Error(errMessage); err.message = errMessage; err.name = err.code = errCode; this.emit('message:error', err); } else { // 'Continuation', 'End' this.emit('message:' + this.header[':event-type'].toLowerCase()); } }, /** * if header[':message-type'] is error, callback the error, emit error to next stream */ checkErrorHeader() { if (this.header[':message-type'] === 'error') { this.callback(this.header); this.callback = null; } }, /** * Transform Stream's implementations */ _transform(chunk, encoding, callback) { this.processChunk(chunk, encoding, callback); }, _flush(callback) { this.processChunk(Buffer.alloc(0), this.encoding, callback); }, }; sysUtil.inherits(SelectStream, Transform); module.exports = SelectStream;