Skip to content

Commit

Permalink
Put all publish frames in a single huge buffer and send it once (#123)
Browse files Browse the repository at this point in the history
* Always buffer all publish framed together

Even if the user doesn't await basicPublish all framed beloging to one
publish should be published together. So intead of waiting for a
potentially blocked socket to be drain enqueue all data and only await
for the last sent frame.

Fixes #49
  • Loading branch information
carlhoerberg authored Oct 26, 2024
1 parent a97d020 commit 7a67774
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions src/amqp-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export class AMQPChannel {

let j = 0
// get a buffer from the pool or create a new, it will later be returned to the pool for reuse
const buffer = this.connection.bufferPool.pop() || new AMQPView(new ArrayBuffer(this.connection.frameMax))
let buffer = this.connection.bufferPool.pop() || new AMQPView(new ArrayBuffer(this.connection.frameMax))
buffer.setUint8(j, 1); j += 1 // type: method
buffer.setUint16(j, this.id); j += 2 // channel
j += 4 // frame size, update later
Expand Down Expand Up @@ -338,38 +338,40 @@ export class AMQPChannel {
buffer.setUint8(j, 206); j += 1 // frame end byte
buffer.setUint32(headerStart + 3, j - headerStart - 8) // update frameSize

// Send current frames if there's no body to send
if (body.byteLength === 0) {
await this.connection.send(new Uint8Array(buffer.buffer, 0, j))
} else if (j >= buffer.byteLength - 8) {
// Send current frames if a body frame can't fit in the rest of the frame buffer
await this.connection.send(new Uint8Array(buffer.buffer, 0, j))
j = 0
let bufferView = new Uint8Array(buffer.buffer)
const bodyFrameCount = Math.ceil(body.byteLength / (this.connection.frameMax - 8))
const bufferSize = j + body.byteLength + 8 * bodyFrameCount
if (buffer.byteLength < bufferSize) { // the body doesn't fit in the buffer, expand it
const newBuffer = new ArrayBuffer(bufferSize)
const newBufferView = new Uint8Array(newBuffer)
newBufferView.set(bufferView.subarray(0, j))
buffer = new AMQPView(newBuffer)
bufferView = newBufferView
}

// split body into multiple frames if body > frameMax
for (let bodyPos = 0; bodyPos < body.byteLength;) {
const frameSize = Math.min(body.byteLength - bodyPos, buffer.byteLength - 8 - j) // frame overhead is 8 bytes
const frameSize = Math.min(body.byteLength - bodyPos, this.connection.frameMax - 8) // frame overhead is 8 bytes
const dataSlice = body.subarray(bodyPos, bodyPos + frameSize)

buffer.setUint8(j, 3); j += 1 // type: body
buffer.setUint16(j, this.id); j += 2 // channel
buffer.setUint32(j, frameSize); j += 4 // frameSize
const bodyView = new Uint8Array(buffer.buffer, j, frameSize)
bodyView.set(dataSlice); j += frameSize // body content
bufferView.set(dataSlice, j); j += frameSize // body content
buffer.setUint8(j, 206); j += 1 // frame end byte
await this.connection.send(new Uint8Array(buffer.buffer, 0, j))
bodyPos += frameSize
j = 0
}
const sendFrames = this.connection.send(bufferView.subarray(0, j))

this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse
// if publish confirm is enabled, put a promise on a queue if the sends were ok
// the promise on the queue will be fullfilled by the read loop when an ack/nack
// comes from the server
if (this.confirmId) {
return new Promise((resolve, reject) => this.unconfirmedPublishes.push([this.confirmId++, resolve, reject]))
const wait4Confirm = new Promise<number>((resolve, reject) => this.unconfirmedPublishes.push([this.confirmId++, resolve, reject]))
return sendFrames.then(() => wait4Confirm)
} else {
return Promise.resolve(0)
return sendFrames.then(() => 0)
}
}

Expand Down

0 comments on commit 7a67774

Please sign in to comment.