Skip to content

Commit

Permalink
fix: making endings clear and final.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinheidegger committed Feb 1, 2022
1 parent f360866 commit 6bbafd6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
22 changes: 17 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ class WritableState {
this.map = mapWritable || map
this.afterWrite = afterWrite.bind(this)
this.afterUpdateNextTick = updateWriteNT.bind(this)
this.writable = true
}

get ended () {
return (this.stream._duplexState & WRITE_DONE) !== 0
}

push (data) {
this.updateNextTick()
if (this.map !== null) data = this.map(data)

this.buffered += this.byteLength(data)
Expand All @@ -143,9 +145,13 @@ class WritableState {
}

end (data) {
this.writable = false
this.updateNextTick()
if (typeof data === 'function') this.stream.once('finish', data)
else if (data !== undefined && data !== null) this.push(data)
this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
this.end = throwWriteAfterEnd
this.push = throwWriteAfterEnd
}

autoBatch (data, cb) {
Expand Down Expand Up @@ -250,11 +256,13 @@ class ReadableState {
}

push (data) {
this.updateNextTick()
const stream = this.stream

if (data === null) {
this.highWaterMark = 0
stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
this.push = throwPushAfterEnd
return false
}

Expand Down Expand Up @@ -544,7 +552,7 @@ class Stream extends EventEmitter {
}

get writable () {
return this._writableState !== null ? true : undefined
return this._writableState !== null ? this._writableState.writable : undefined
}

get destroyed () {
Expand Down Expand Up @@ -623,7 +631,6 @@ class Readable extends Stream {
}

push (data) {
this._readableState.updateNextTick()
return this._readableState.push(data)
}

Expand Down Expand Up @@ -782,12 +789,10 @@ class Writable extends Stream {
}

write (data) {
this._writableState.updateNextTick()
return this._writableState.push(data)
}

end (data) {
this._writableState.updateNextTick()
this._writableState.end(data)
return this
}
Expand Down Expand Up @@ -825,7 +830,6 @@ class Duplex extends Readable { // and Writable
}

end (data) {
this._writableState.updateNextTick()
this._writableState.end(data)
return this
}
Expand Down Expand Up @@ -967,6 +971,14 @@ function defaultByteLength (data) {
return isTypedArray(data) ? data.byteLength : 1024
}

function throwPushAfterEnd () {
throw new Error('Push after end.')
}

function throwWriteAfterEnd () {
throw new Error('Write after end.')
}

function noop () {}

function abort () {
Expand Down
16 changes: 16 additions & 0 deletions test/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,19 @@ tape('use mapReadable to map data', async function (t) {
t.deepEquals(obj, { foo: 1 })
}
})

tape('push after push(null) should cause an error', function (t) {
t.plan(4)
const s = new Readable()
s.on('data', function (data) {
t.equals(data, 'a')
})
s.on('close', function () {
t.end()
})
t.equals(s.push('a'), true)
t.equals(s.push(null), false)
t.throws(function () {
s.push('b')
})
})
22 changes: 22 additions & 0 deletions test/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ tape('use mapWritable to map data', function (t) {
r.end()
})

tape('repeat calls to .end() should cause an error', function (t) {
t.plan(5)
const s = new Writable({
write (data, cb) {
t.equals(data, 'a')
cb()
}
})
s.on('close', function () {
t.end()
})
t.equals(s.writable, true)
s.end('a')
t.equals(s.writable, false)
t.throws(function () {
s.end('b')
})
t.throws(function () {
s.write('c')
})
})

tape('many ends', function (t) {
let finals = 0
let finish = 0
Expand Down

0 comments on commit 6bbafd6

Please sign in to comment.