2023-10-03 11:14:36 +08:00
'use strict' ;
var _Object$setPrototypeO ;
function _defineProperty ( obj , key , value ) { key = _toPropertyKey ( key ) ; if ( key in obj ) { Object . defineProperty ( obj , key , { value : value , enumerable : true , configurable : true , writable : true } ) ; } else { obj [ key ] = value ; } return obj ; }
function _toPropertyKey ( arg ) { var key = _toPrimitive ( arg , "string" ) ; return typeof key === "symbol" ? key : String ( key ) ; }
function _toPrimitive ( input , hint ) { if ( typeof input !== "object" || input === null ) return input ; var prim = input [ Symbol . toPrimitive ] ; if ( prim !== undefined ) { var res = prim . call ( input , hint || "default" ) ; if ( typeof res !== "object" ) return res ; throw new TypeError ( "@@toPrimitive must return a primitive value." ) ; } return ( hint === "string" ? String : Number ) ( input ) ; }
var finished = require ( './end-of-stream' ) ;
var kLastResolve = Symbol ( 'lastResolve' ) ;
var kLastReject = Symbol ( 'lastReject' ) ;
var kError = Symbol ( 'error' ) ;
var kEnded = Symbol ( 'ended' ) ;
var kLastPromise = Symbol ( 'lastPromise' ) ;
var kHandlePromise = Symbol ( 'handlePromise' ) ;
var kStream = Symbol ( 'stream' ) ;
function createIterResult ( value , done ) {
return {
value : value ,
done : done
} ;
}
function readAndResolve ( iter ) {
var resolve = iter [ kLastResolve ] ;
if ( resolve !== null ) {
var data = iter [ kStream ] . read ( ) ;
// we defer if data is null
// we can be expecting either 'end' or
// 'error'
if ( data !== null ) {
iter [ kLastPromise ] = null ;
iter [ kLastResolve ] = null ;
iter [ kLastReject ] = null ;
resolve ( createIterResult ( data , false ) ) ;
}
}
}
function onReadable ( iter ) {
// we wait for the next tick, because it might
// emit an error with process.nextTick
process . nextTick ( readAndResolve , iter ) ;
}
function wrapForNext ( lastPromise , iter ) {
return function ( resolve , reject ) {
lastPromise . then ( function ( ) {
if ( iter [ kEnded ] ) {
resolve ( createIterResult ( undefined , true ) ) ;
return ;
}
iter [ kHandlePromise ] ( resolve , reject ) ;
} , reject ) ;
} ;
}
var AsyncIteratorPrototype = Object . getPrototypeOf ( function ( ) { } ) ;
var ReadableStreamAsyncIteratorPrototype = Object . setPrototypeOf ( ( _Object$setPrototypeO = {
get stream ( ) {
return this [ kStream ] ;
} ,
next : function next ( ) {
var _this = this ;
// if we have detected an error in the meanwhile
// reject straight away
var error = this [ kError ] ;
if ( error !== null ) {
return Promise . reject ( error ) ;
}
if ( this [ kEnded ] ) {
return Promise . resolve ( createIterResult ( undefined , true ) ) ;
}
if ( this [ kStream ] . destroyed ) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise ( function ( resolve , reject ) {
process . nextTick ( function ( ) {
if ( _this [ kError ] ) {
reject ( _this [ kError ] ) ;
} else {
resolve ( createIterResult ( undefined , true ) ) ;
}
} ) ;
} ) ;
}
// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
// where next() is only called once at a time
var lastPromise = this [ kLastPromise ] ;
var promise ;
if ( lastPromise ) {
promise = new Promise ( wrapForNext ( lastPromise , this ) ) ;
} else {
// fast path needed to support multiple this.push()
// without triggering the next() queue
var data = this [ kStream ] . read ( ) ;
if ( data !== null ) {
return Promise . resolve ( createIterResult ( data , false ) ) ;
}
promise = new Promise ( this [ kHandlePromise ] ) ;
}
this [ kLastPromise ] = promise ;
return promise ;
}
} , _defineProperty ( _Object$setPrototypeO , Symbol . asyncIterator , function ( ) {
return this ;
} ) , _defineProperty ( _Object$setPrototypeO , "return" , function _return ( ) {
var _this2 = this ;
// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise ( function ( resolve , reject ) {
_this2 [ kStream ] . destroy ( null , function ( err ) {
if ( err ) {
reject ( err ) ;
return ;
}
resolve ( createIterResult ( undefined , true ) ) ;
} ) ;
} ) ;
} ) , _Object$setPrototypeO ) , AsyncIteratorPrototype ) ;
var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator ( stream ) {
var _Object$create ;
var iterator = Object . create ( ReadableStreamAsyncIteratorPrototype , ( _Object$create = { } , _defineProperty ( _Object$create , kStream , {
value : stream ,
writable : true
} ) , _defineProperty ( _Object$create , kLastResolve , {
value : null ,
writable : true
} ) , _defineProperty ( _Object$create , kLastReject , {
value : null ,
writable : true
} ) , _defineProperty ( _Object$create , kError , {
value : null ,
writable : true
} ) , _defineProperty ( _Object$create , kEnded , {
value : stream . _readableState . endEmitted ,
writable : true
} ) , _defineProperty ( _Object$create , kHandlePromise , {
value : function value ( resolve , reject ) {
var data = iterator [ kStream ] . read ( ) ;
if ( data ) {
iterator [ kLastPromise ] = null ;
iterator [ kLastResolve ] = null ;
iterator [ kLastReject ] = null ;
resolve ( createIterResult ( data , false ) ) ;
} else {
iterator [ kLastResolve ] = resolve ;
iterator [ kLastReject ] = reject ;
}
} ,
writable : true
} ) , _Object$create ) ) ;
iterator [ kLastPromise ] = null ;
finished ( stream , function ( err ) {
if ( err && err . code !== 'ERR_STREAM_PREMATURE_CLOSE' ) {
var reject = iterator [ kLastReject ] ;
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if ( reject !== null ) {
iterator [ kLastPromise ] = null ;
iterator [ kLastResolve ] = null ;
iterator [ kLastReject ] = null ;
reject ( err ) ;
}
iterator [ kError ] = err ;
return ;
}
var resolve = iterator [ kLastResolve ] ;
if ( resolve !== null ) {
iterator [ kLastPromise ] = null ;
iterator [ kLastResolve ] = null ;
iterator [ kLastReject ] = null ;
resolve ( createIterResult ( undefined , true ) ) ;
}
iterator [ kEnded ] = true ;
} ) ;
stream . on ( 'readable' , onReadable . bind ( null , iterator ) ) ;
return iterator ;
} ;
2023-09-25 15:58:56 +08:00
module . exports = createReadableStreamAsyncIterator ;