Skip to content

Commit

Permalink
Put all publish frames in a single huge buffer and send it once
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Sep 18, 2024
1 parent 55e9101 commit 64fccd0
Showing 1 changed file with 13 additions and 16 deletions.
29 changes: 13 additions & 16 deletions src/amqp-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ export class AMQPChannel {
* @param [immediate] - if the message should be returned if it can't be delivered to a consumer immediately (not supported in RabbitMQ)
* @return - fulfilled when the message is enqueue on the socket, or if publish confirm is enabled when the message is confirmed by the server
*/
async basicPublish(exchange: string, routingKey: string, data: string|Uint8Array|ArrayBuffer|Buffer|null, properties: AMQPProperties = {}, mandatory = false, immediate = false): Promise<number> {
basicPublish(exchange: string, routingKey: string, data: string|Uint8Array|ArrayBuffer|Buffer|null, properties: AMQPProperties = {}, mandatory = false, immediate = false): Promise<number> {
if (this.closed) return this.rejectClosed()
if (this.connection.blocked)
return Promise.reject(new AMQPError(`Connection blocked by server: ${this.connection.blocked}`, this.connection))
Expand All @@ -310,7 +310,7 @@ export class AMQPChannel {

let j = 0
// get a buffer from the pool or create a new, it will later be returned to the pool for reuse
const buffer = this.connection.bufferPool.pop() || new AMQPView(new ArrayBuffer(this.connection.frameMax))
let buffer = this.connection.bufferPool.pop() || new AMQPView(new ArrayBuffer(this.connection.frameMax))
buffer.setUint8(j, 1); j += 1 // type: method
buffer.setUint16(j, this.id); j += 2 // channel
j += 4 // frame size, update later
Expand Down Expand Up @@ -338,19 +338,16 @@ export class AMQPChannel {
buffer.setUint8(j, 206); j += 1 // frame end byte
buffer.setUint32(headerStart + 3, j - headerStart - 8) // update frameSize

let lastFrame
// Send current frames if there's no body to send
if (body.byteLength === 0) {
lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j))
} else if (j >= buffer.byteLength - 8) {
// Send current frames if a body frame can't fit in the rest of the frame buffer
lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j))
j = 0
if (buffer.byteLength - j < 8 + body.byteLength) { // the body doesn't fit in the buffer, expand it
const bodyFrameCount = Math.ceil(body.byteLength / (this.connection.frameMax - 8))
const newBuffer = new ArrayBuffer(j + body.byteLength + 8 * bodyFrameCount)
new Uint8Array(newBuffer).set(new Uint8Array(buffer.buffer, 0, j)) // copy the old buffer to the new buffer
buffer = new AMQPView(newBuffer)
}

// split body into multiple frames if body > frameMax
for (let bodyPos = 0; bodyPos < body.byteLength;) {
const frameSize = Math.min(body.byteLength - bodyPos, buffer.byteLength - 8 - j) // frame overhead is 8 bytes
const frameSize = Math.min(body.byteLength - bodyPos, this.connection.frameMax - 8) // frame overhead is 8 bytes
const dataSlice = body.subarray(bodyPos, bodyPos + frameSize)

buffer.setUint8(j, 3); j += 1 // type: body
Expand All @@ -359,19 +356,19 @@ export class AMQPChannel {
const bodyView = new Uint8Array(buffer.buffer, j, frameSize)
bodyView.set(dataSlice); j += frameSize // body content
buffer.setUint8(j, 206); j += 1 // frame end byte
lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j))
bodyPos += frameSize
j = 0
}
await lastFrame // buffer all frames and only wait for the last as RabbitMQ requires all publish frames to be sent together
const sendFrames = this.connection.send(new Uint8Array(buffer.buffer, 0, j))

this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse
// if publish confirm is enabled, put a promise on a queue if the sends were ok
// the promise on the queue will be fullfilled by the read loop when an ack/nack
// comes from the server
if (this.confirmId) {
return new Promise((resolve, reject) => this.unconfirmedPublishes.push([this.confirmId++, resolve, reject]))
const wait4Confirm = new Promise<number>((resolve, reject) => this.unconfirmedPublishes.push([this.confirmId++, resolve, reject]))
return sendFrames.then(() => wait4Confirm)
} else {
return Promise.resolve(0)
return sendFrames.then(() => 0)
}
}

Expand Down

0 comments on commit 64fccd0

Please sign in to comment.