diff --git a/mongo/client_bulk_write.go b/mongo/client_bulk_write.go index ed29a74289..61ec64eea1 100644 --- a/mongo/client_bulk_write.go +++ b/mongo/client_bulk_write.go @@ -191,6 +191,8 @@ type modelBatches struct { writeErrors map[int]WriteError } +var _ driver.OperationBatches = &modelBatches{} + func (mb *modelBatches) IsOrdered() *bool { return &mb.ordered } @@ -209,7 +211,7 @@ func (mb *modelBatches) Size() int { return len(mb.models) - mb.offset } -func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) { +func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, totalSize int) (int, []byte, error) { fn := functionSet{ appendStart: func(dst []byte, identifier string) (int32, []byte) { var idx int32 @@ -228,10 +230,10 @@ func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, to return dst }, } - return mb.appendBatches(fn, dst, maxCount, maxDocSize, totalSize) + return mb.appendBatches(fn, dst, maxCount, totalSize) } -func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) { +func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, totalSize int) (int, []byte, error) { fn := functionSet{ appendStart: bsoncore.AppendArrayElementStart, appendDocument: bsoncore.AppendDocumentElement, @@ -240,7 +242,7 @@ func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, total return dst }, } - return mb.appendBatches(fn, dst, maxCount, maxDocSize, totalSize) + return mb.appendBatches(fn, dst, maxCount, totalSize) } type functionSet struct { @@ -249,7 +251,7 @@ type functionSet struct { updateLength func([]byte, int32, int32) []byte } -func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) { +func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, totalSize int) (int, []byte, error) { if mb.Size() == 0 { return 0, dst, io.EOF } @@ -269,7 +271,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD } canRetry := true - checkSize := true + // checkSize := true l := len(dst) @@ -291,13 +293,13 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD var err error switch model := mb.models[i].model.(type) { case *ClientInsertOneModel: - checkSize = false + // checkSize = false mb.cursorHandlers = append(mb.cursorHandlers, mb.appendInsertResult) var id interface{} id, doc, err = (&clientInsertDoc{ namespace: nsIdx, document: model.Document, - sizeLimit: maxDocSize, + // sizeLimit: maxDocSize, }).marshal(mb.client.bsonOpts, mb.client.registry) if err != nil { break @@ -331,7 +333,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD checkDollarKey: true, }).marshal(mb.client.bsonOpts, mb.client.registry) case *ClientReplaceOneModel: - checkSize = false + // checkSize = false mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult) doc, err = (&clientUpdateDoc{ namespace: nsIdx, @@ -343,7 +345,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD upsert: model.Upsert, multi: false, checkDollarKey: false, - sizeLimit: maxDocSize, + // sizeLimit: maxDocSize, }).marshal(mb.client.bsonOpts, mb.client.registry) case *ClientDeleteOneModel: mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult) @@ -371,9 +373,9 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD return 0, nil, err } length := len(doc) - if maxDocSize > 0 && length > maxDocSize+16*1024 { - return 0, nil, driver.ErrDocumentTooLarge - } + // if maxDocSize > 0 && length > maxDocSize+16*1024 { + // return 0, nil, driver.ErrDocumentTooLarge + // } if !exists { length += len(ns) } @@ -398,9 +400,9 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD dst = fn.updateLength(dst, opsIdx, int32(len(dst[opsIdx:]))) nsDst = fn.updateLength(nsDst, nsIdx, int32(len(nsDst[nsIdx:]))) dst = append(dst, nsDst...) - if checkSize && maxDocSize > 0 && len(dst)-l > maxDocSize+16*1024 { - return 0, nil, driver.ErrDocumentTooLarge - } + // if checkSize && maxDocSize > 0 && len(dst)-l > maxDocSize+16*1024 { + // return 0, nil, driver.ErrDocumentTooLarge + // } mb.retryMode = driver.RetryNone if mb.client.retryWrites && canRetry { @@ -585,7 +587,7 @@ type clientInsertDoc struct { namespace int document interface{} - sizeLimit int + // sizeLimit int } func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (interface{}, bsoncore.Document, error) { @@ -596,9 +598,9 @@ func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bson. if err != nil { return nil, nil, err } - if d.sizeLimit > 0 && len(f) > d.sizeLimit { - return nil, nil, driver.ErrDocumentTooLarge - } + // if d.sizeLimit > 0 && len(f) > d.sizeLimit { + // return nil, nil, driver.ErrDocumentTooLarge + // } var id interface{} f, id, err = ensureID(f, bson.NilObjectID, bsonOpts, registry) if err != nil { @@ -620,7 +622,7 @@ type clientUpdateDoc struct { multi bool checkDollarKey bool - sizeLimit int + // sizeLimit int } func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (bsoncore.Document, error) { @@ -641,9 +643,9 @@ func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson. if err != nil { return nil, err } - if d.sizeLimit > 0 && len(u.Data) > d.sizeLimit { - return nil, driver.ErrDocumentTooLarge - } + // if d.sizeLimit > 0 && len(u.Data) > d.sizeLimit { + // return nil, driver.ErrDocumentTooLarge + // } doc = bsoncore.AppendValueElement(doc, "updateMods", u) doc = bsoncore.AppendBooleanElement(doc, "multi", d.multi) diff --git a/mongo/client_bulk_write_test.go b/mongo/client_bulk_write_test.go index b5191a2861..c650dfa9f8 100644 --- a/mongo/client_bulk_write_test.go +++ b/mongo/client_bulk_write_test.go @@ -55,7 +55,7 @@ func TestBatches(t *testing.T) { var n int const limitBigEnough = 16_000 // test the "maxCount" that truncates the output - n, _, err = batches.AppendBatchSequence(nil, 4, limitBigEnough, limitBigEnough) + n, _, err = batches.AppendBatchSequence(nil, 4, limitBigEnough) require.NoError(t, err, "AppendBatchSequence error: %v", err) assert.Equal(t, 3, n, "expected %d appendings, got: %d", 3, n) diff --git a/x/mongo/driver/batches.go b/x/mongo/driver/batches.go index 4f096616a7..e5baf6bf3d 100644 --- a/x/mongo/driver/batches.go +++ b/x/mongo/driver/batches.go @@ -24,10 +24,12 @@ type Batches struct { offset int } +var _ OperationBatches = &Batches{} + // AppendBatchSequence appends dst with document sequence of batches as long as the limits of max count, max // document size, or total size allows. It returns the number of batches appended, the new appended slice, and // any error raised. It returns the origenal input slice if nothing can be appends within the limits. -func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) { +func (b *Batches) AppendBatchSequence(dst []byte, maxCount, totalSize int) (int, []byte, error) { if b.Size() == 0 { return 0, dst, io.EOF } @@ -44,11 +46,11 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) ( break } doc := b.Documents[i] - if len(doc) > maxDocSize { - break - } + // if len(doc) > maxDocSize { + // break + // } size += len(doc) - if size > maxDocSize { + if size > totalSize { break } dst = append(dst, doc...) @@ -64,7 +66,7 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) ( // AppendBatchArray appends dst with array of batches as long as the limits of max count, max document size, or // total size allows. It returns the number of batches appended, the new appended slice, and any error raised. It // returns the origenal input slice if nothing can be appends within the limits. -func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) { +func (b *Batches) AppendBatchArray(dst []byte, maxCount, totalSize int) (int, []byte, error) { if b.Size() == 0 { return 0, dst, io.EOF } @@ -77,11 +79,11 @@ func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, _ int) (int break } doc := b.Documents[i] - if len(doc) > maxDocSize { - break - } + // if len(doc) > maxDocSize { + // break + // } size += len(doc) - if size > maxDocSize { + if size > totalSize { break } dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(n), doc) diff --git a/x/mongo/driver/batches_test.go b/x/mongo/driver/batches_test.go index 95cdb674de..4b3a89880e 100644 --- a/x/mongo/driver/batches_test.go +++ b/x/mongo/driver/batches_test.go @@ -11,7 +11,6 @@ import ( "go.mongodb.org/mongo-driver/v2/internal/assert" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" - "go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage" ) func newTestBatches(t *testing.T) *Batches { @@ -32,6 +31,7 @@ func TestAdvancing(t *testing.T) { assert.Equal(t, 0, size, "expected Size(): %d, got: %d", 1, size) } +/* func TestAppendBatchSequence(t *testing.T) { t.Run("Append 0", func(t *testing.T) { batches := newTestBatches(t) @@ -99,3 +99,4 @@ func TestAppendBatchArray(t *testing.T) { assert.Equal(t, dst, got) }) } +*/ diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index fdf8732f21..4b01d50427 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -184,6 +184,16 @@ func redactFinishedInformationResponse(info finishedInformation) bson.Raw { return bson.Raw{} } +// OperationBatches contains the documents that are split when executing a write command that potentially +// has more documents than can fit in a single command. +type OperationBatches interface { + AppendBatchSequence(dst []byte, maxCount int, totalSize int) (int, []byte, error) + AppendBatchArray(dst []byte, maxCount int, totalSize int) (int, []byte, error) + IsOrdered() *bool + AdvanceBatches(n int) + Size() int +} + // Operation is used to execute an operation. It contains all of the common code required to // select a server, transform an operation into a command, write the command to a connection from // the selected server, read a response from that connection, process the response, and potentially @@ -269,15 +279,8 @@ type Operation struct { // Batches contains the documents that are split when executing a write command that potentially // has more documents than can fit in a single command. This should only be specified for - // commands that are batch compatible. For more information, please refer to the definition of - // Batches. - Batches interface { - AppendBatchSequence(dst []byte, maxCount int, maxDocSize int, totalSize int) (int, []byte, error) - AppendBatchArray(dst []byte, maxCount int, maxDocSize int, totalSize int) (int, []byte, error) - IsOrdered() *bool - AdvanceBatches(n int) - Size() int - } + // commands that are batch compatible. + Batches OperationBatches // Legacy sets the legacy type for this operation. There are only 3 types that require legacy // support: find, getMore, and killCursors. For more information about LegacyOperationKind, @@ -1371,7 +1374,7 @@ func (op Operation) createWireMessage( if err == nil && op.Batches != nil { batchOffset = len(dst) info.processedBatches, dst, err = op.Batches.AppendBatchSequence(dst, - int(desc.MaxBatchCount), int(desc.MaxDocumentSize), int(desc.MaxDocumentSize), + int(desc.MaxBatchCount), int(desc.MaxMessageSize), ) if err != nil { break @@ -1383,12 +1386,8 @@ func (op Operation) createWireMessage( default: var batches []byte if op.Batches != nil { - maxDocSize := -1 - if unacknowledged { - maxDocSize = int(desc.MaxDocumentSize) - } info.processedBatches, batches, err = op.Batches.AppendBatchSequence(batches, - int(desc.MaxBatchCount), maxDocSize, int(desc.MaxMessageSize), + int(desc.MaxBatchCount), int(desc.MaxMessageSize), ) if err != nil { break @@ -1443,14 +1442,14 @@ func (op Operation) addEncryptCommandFields(ctx context.Context, dst []byte, des var n int if op.Batches != nil { if maxBatchCount := int(desc.MaxBatchCount); maxBatchCount > 1 { - n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, maxBatchCount, cryptMaxBsonObjectSize, cryptMaxBsonObjectSize) + n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, maxBatchCount, cryptMaxBsonObjectSize) if err != nil { return 0, nil, err } } if n == 0 { - maxDocumentSize := int(desc.MaxDocumentSize) - n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, 1, maxDocumentSize, maxDocumentSize) + // maxDocumentSize := int(desc.MaxDocumentSize) + n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, 1, int(desc.MaxMessageSize)) if err != nil { return 0, nil, err } @@ -1483,8 +1482,8 @@ func (op Operation) addLegacyCommandFields(dst []byte, desc description.Selected return 0, dst, nil } var n int - maxDocumentSize := int(desc.MaxDocumentSize) - n, dst, err = op.Batches.AppendBatchArray(dst, int(desc.MaxBatchCount), maxDocumentSize, maxDocumentSize) + // maxDocumentSize := int(desc.MaxDocumentSize) + n, dst, err = op.Batches.AppendBatchArray(dst, int(desc.MaxBatchCount), int(desc.MaxMessageSize)) if err != nil { return 0, nil, err }