Skip to content

Commit

Permalink
Merge pull request #768 from dotnet/fixes
Browse files Browse the repository at this point in the history
Fix node.js stream blockage in mxstream channels
  • Loading branch information
AArnott authored Aug 23, 2024
2 parents 1e38407 + 82b8983 commit 0584d18
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 5 deletions.
11 changes: 10 additions & 1 deletion src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,20 @@ export class ChannelClass extends Channel {
}

public onContent(buffer: Buffer | null) {
const priorReadableFlowing = this._duplex.readableFlowing

this._duplex.push(buffer)

// Large buffer pushes can switch a stream from flowing to non-flowing
// when it meets or exceeds the highWaterMark. We need to resume the stream
// in this case so that the user can continue to receive data.
if (priorReadableFlowing && this._duplex.readableFlowing === false) {
this._duplex.resume()
}

// We should find a way to detect when we *actually* share the received buffer with the Channel's user
// and only report consumption when they receive the buffer from us so that we effectively apply
// backpressure to the remote party based on our user's actual consumption rather than keep allocating memory.
// backpressure to the remote party based on our user's actual consumption rather than continually allocating memory.
if (this._multiplexingStream.backpressureSupportEnabled && buffer) {
this._multiplexingStream.localContentExamined(this, buffer.length)
}
Expand Down
8 changes: 4 additions & 4 deletions src/nerdbank-streams/src/Utilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ export async function getBufferFrom(
throw new Error('Stream terminated before required bytes were read.')
}

// Returns what has been read so far
// Returns what has been read so far.
if (readBuffer === null) {
return null
}

// we need trim extra spaces
// We need to trim the trailing space.
return readBuffer.subarray(0, index)
}

Expand All @@ -116,11 +116,11 @@ export async function getBufferFrom(

if (readBuffer === null) {
if (availableSize === size || newBuffer.length < availableSize) {
// in the fast pass, we read the entire data once, and donot allocate an extra array.
// In the fast pass, we read the entire data once, and do not allocate an extra array.
return newBuffer
}

// if we read partial data, we need allocate a buffer to join all data together.
// If we read partial data, we need to allocate a buffer to join all data together.
readBuffer = Buffer.alloc(size)
}

Expand Down
53 changes: 53 additions & 0 deletions src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,59 @@ import { Channel } from '../Channel'
import CancellationToken from 'cancellationtoken'
import * as assert from 'assert'
import { nextTick } from 'process'
import { Duplex } from 'stream'

it('highWatermark threshold does not clog', async () => {
// Brokered service
let bytesToReceive = 0
let receivedAllBytes = new Deferred()
function receiver(pipe: Duplex) {
let lengths: number[] = []
pipe.on('data', (data: Buffer) => {
lengths.push(data.length)

bytesToReceive -= data.length
// console.log(`recv ${data.length}. ${bytesToReceive} remaining`)
if (bytesToReceive <= 0) {
receivedAllBytes.resolve(undefined)
}
})
}

// IServiceBroker
const { first: localServicePipe, second: servicePipe } = FullDuplexStream.CreatePair()
receiver(localServicePipe)

// MultiplexingStreamServiceBroker
const simulatedMxStream = FullDuplexStream.CreatePair()
const [mx1, mx2] = await Promise.all([MultiplexingStream.CreateAsync(simulatedMxStream.first), MultiplexingStream.CreateAsync(simulatedMxStream.second)])
const [local, remote] = await Promise.all([mx1.offerChannelAsync(''), mx2.acceptChannelAsync('')])
servicePipe.pipe(local.stream)
local.stream.pipe(servicePipe)

global.test_servicePipe = servicePipe
global.test_d = local.stream
global.test_localServicePipe = localServicePipe

// brokered service client
function writeHelper(buffer: Buffer): boolean {
bytesToReceive += buffer.length
const result = remote.stream.write(buffer)
// console.log('written', buffer.length, result)
return result
}
for (let i = 15; i < 20; i++) {
const buffer = Buffer.alloc(i * 1024)
writeHelper(buffer)
await nextTickAsync()
writeHelper(Buffer.alloc(10))
await nextTickAsync()
}

if (bytesToReceive > 0) {
await receivedAllBytes.promise
}
})
;[1, 2, 3].forEach(protocolMajorVersion => {
describe(`MultiplexingStream v${protocolMajorVersion}`, () => {
let mx1: MultiplexingStream
Expand Down

0 comments on commit 0584d18

Please sign in to comment.