Skip to content

Commit

Permalink
GODRIVER-3421 Remove the BSON document size validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Dec 4, 2024
1 parent acca80b commit 82df60a
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 56 deletions.
50 changes: 26 additions & 24 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type modelBatches struct {
writeErrors map[int]WriteError
}

var _ driver.OperationBatches = &modelBatches{}

func (mb *modelBatches) IsOrdered() *bool {
return &mb.ordered
}
Expand All @@ -209,7 +211,7 @@ func (mb *modelBatches) Size() int {
return len(mb.models) - mb.offset
}

func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, totalSize int) (int, []byte, error) {
fn := functionSet{
appendStart: func(dst []byte, identifier string) (int32, []byte) {
var idx int32
Expand All @@ -228,10 +230,10 @@ func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, to
return dst
},
}
return mb.appendBatches(fn, dst, maxCount, maxDocSize, totalSize)
return mb.appendBatches(fn, dst, maxCount, totalSize)
}

func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, totalSize int) (int, []byte, error) {
fn := functionSet{
appendStart: bsoncore.AppendArrayElementStart,
appendDocument: bsoncore.AppendDocumentElement,
Expand All @@ -240,7 +242,7 @@ func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, total
return dst
},
}
return mb.appendBatches(fn, dst, maxCount, maxDocSize, totalSize)
return mb.appendBatches(fn, dst, maxCount, totalSize)
}

type functionSet struct {
Expand All @@ -249,7 +251,7 @@ type functionSet struct {
updateLength func([]byte, int32, int32) []byte
}

func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, totalSize int) (int, []byte, error) {
if mb.Size() == 0 {
return 0, dst, io.EOF
}
Expand All @@ -269,7 +271,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
}

canRetry := true
checkSize := true
// checkSize := true

l := len(dst)

Expand All @@ -291,13 +293,13 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
var err error
switch model := mb.models[i].model.(type) {
case *ClientInsertOneModel:
checkSize = false
// checkSize = false
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendInsertResult)
var id interface{}
id, doc, err = (&clientInsertDoc{
namespace: nsIdx,
document: model.Document,
sizeLimit: maxDocSize,
// sizeLimit: maxDocSize,
}).marshal(mb.client.bsonOpts, mb.client.registry)
if err != nil {
break
Expand Down Expand Up @@ -331,7 +333,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
checkDollarKey: true,
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientReplaceOneModel:
checkSize = false
// checkSize = false
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
doc, err = (&clientUpdateDoc{
namespace: nsIdx,
Expand All @@ -343,7 +345,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
upsert: model.Upsert,
multi: false,
checkDollarKey: false,
sizeLimit: maxDocSize,
// sizeLimit: maxDocSize,
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientDeleteOneModel:
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult)
Expand Down Expand Up @@ -371,9 +373,9 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
return 0, nil, err
}
length := len(doc)
if maxDocSize > 0 && length > maxDocSize+16*1024 {
return 0, nil, driver.ErrDocumentTooLarge
}
// if maxDocSize > 0 && length > maxDocSize+16*1024 {
// return 0, nil, driver.ErrDocumentTooLarge
// }
if !exists {
length += len(ns)
}
Expand All @@ -398,9 +400,9 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
dst = fn.updateLength(dst, opsIdx, int32(len(dst[opsIdx:])))
nsDst = fn.updateLength(nsDst, nsIdx, int32(len(nsDst[nsIdx:])))
dst = append(dst, nsDst...)
if checkSize && maxDocSize > 0 && len(dst)-l > maxDocSize+16*1024 {
return 0, nil, driver.ErrDocumentTooLarge
}
// if checkSize && maxDocSize > 0 && len(dst)-l > maxDocSize+16*1024 {
// return 0, nil, driver.ErrDocumentTooLarge
// }

mb.retryMode = driver.RetryNone
if mb.client.retryWrites && canRetry {
Expand Down Expand Up @@ -585,7 +587,7 @@ type clientInsertDoc struct {
namespace int
document interface{}

sizeLimit int
// sizeLimit int
}

func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (interface{}, bsoncore.Document, error) {
Expand All @@ -596,9 +598,9 @@ func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.
if err != nil {
return nil, nil, err
}
if d.sizeLimit > 0 && len(f) > d.sizeLimit {
return nil, nil, driver.ErrDocumentTooLarge
}
// if d.sizeLimit > 0 && len(f) > d.sizeLimit {
// return nil, nil, driver.ErrDocumentTooLarge
// }
var id interface{}
f, id, err = ensureID(f, bson.NilObjectID, bsonOpts, registry)
if err != nil {
Expand All @@ -620,7 +622,7 @@ type clientUpdateDoc struct {
multi bool
checkDollarKey bool

sizeLimit int
// sizeLimit int
}

func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (bsoncore.Document, error) {
Expand All @@ -641,9 +643,9 @@ func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.
if err != nil {
return nil, err
}
if d.sizeLimit > 0 && len(u.Data) > d.sizeLimit {
return nil, driver.ErrDocumentTooLarge
}
// if d.sizeLimit > 0 && len(u.Data) > d.sizeLimit {
// return nil, driver.ErrDocumentTooLarge
// }
doc = bsoncore.AppendValueElement(doc, "updateMods", u)
doc = bsoncore.AppendBooleanElement(doc, "multi", d.multi)

Expand Down
2 changes: 1 addition & 1 deletion mongo/client_bulk_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestBatches(t *testing.T) {
var n int
const limitBigEnough = 16_000
// test the "maxCount" that truncates the output
n, _, err = batches.AppendBatchSequence(nil, 4, limitBigEnough, limitBigEnough)
n, _, err = batches.AppendBatchSequence(nil, 4, limitBigEnough)
require.NoError(t, err, "AppendBatchSequence error: %v", err)
assert.Equal(t, 3, n, "expected %d appendings, got: %d", 3, n)

Expand Down
22 changes: 12 additions & 10 deletions x/mongo/driver/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ type Batches struct {
offset int
}

var _ OperationBatches = &Batches{}

// AppendBatchSequence appends dst with document sequence of batches as long as the limits of max count, max
// document size, or total size allows. It returns the number of batches appended, the new appended slice, and
// any error raised. It returns the origenal input slice if nothing can be appends within the limits.
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) {
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, totalSize int) (int, []byte, error) {
if b.Size() == 0 {
return 0, dst, io.EOF
}
Expand All @@ -44,11 +46,11 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (
break
}
doc := b.Documents[i]
if len(doc) > maxDocSize {
break
}
// if len(doc) > maxDocSize {
// break
// }
size += len(doc)
if size > maxDocSize {
if size > totalSize {
break
}
dst = append(dst, doc...)
Expand All @@ -64,7 +66,7 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (
// AppendBatchArray appends dst with array of batches as long as the limits of max count, max document size, or
// total size allows. It returns the number of batches appended, the new appended slice, and any error raised. It
// returns the origenal input slice if nothing can be appends within the limits.
func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) {
func (b *Batches) AppendBatchArray(dst []byte, maxCount, totalSize int) (int, []byte, error) {
if b.Size() == 0 {
return 0, dst, io.EOF
}
Expand All @@ -77,11 +79,11 @@ func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, _ int) (int
break
}
doc := b.Documents[i]
if len(doc) > maxDocSize {
break
}
// if len(doc) > maxDocSize {
// break
// }
size += len(doc)
if size > maxDocSize {
if size > totalSize {
break
}
dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(n), doc)
Expand Down
3 changes: 2 additions & 1 deletion x/mongo/driver/batches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage"
)

func newTestBatches(t *testing.T) *Batches {
Expand All @@ -32,6 +31,7 @@ func TestAdvancing(t *testing.T) {
assert.Equal(t, 0, size, "expected Size(): %d, got: %d", 1, size)
}

/*
func TestAppendBatchSequence(t *testing.T) {
t.Run("Append 0", func(t *testing.T) {
batches := newTestBatches(t)
Expand Down Expand Up @@ -99,3 +99,4 @@ func TestAppendBatchArray(t *testing.T) {
assert.Equal(t, dst, got)
})
}
*/
39 changes: 19 additions & 20 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ func redactFinishedInformationResponse(info finishedInformation) bson.Raw {
return bson.Raw{}
}

// OperationBatches contains the documents that are split when executing a write command that potentially
// has more documents than can fit in a single command.
type OperationBatches interface {
AppendBatchSequence(dst []byte, maxCount int, totalSize int) (int, []byte, error)
AppendBatchArray(dst []byte, maxCount int, totalSize int) (int, []byte, error)
IsOrdered() *bool
AdvanceBatches(n int)
Size() int
}

// Operation is used to execute an operation. It contains all of the common code required to
// select a server, transform an operation into a command, write the command to a connection from
// the selected server, read a response from that connection, process the response, and potentially
Expand Down Expand Up @@ -269,15 +279,8 @@ type Operation struct {

// Batches contains the documents that are split when executing a write command that potentially
// has more documents than can fit in a single command. This should only be specified for
// commands that are batch compatible. For more information, please refer to the definition of
// Batches.
Batches interface {
AppendBatchSequence(dst []byte, maxCount int, maxDocSize int, totalSize int) (int, []byte, error)
AppendBatchArray(dst []byte, maxCount int, maxDocSize int, totalSize int) (int, []byte, error)
IsOrdered() *bool
AdvanceBatches(n int)
Size() int
}
// commands that are batch compatible.
Batches OperationBatches

// Legacy sets the legacy type for this operation. There are only 3 types that require legacy
// support: find, getMore, and killCursors. For more information about LegacyOperationKind,
Expand Down Expand Up @@ -1371,7 +1374,7 @@ func (op Operation) createWireMessage(
if err == nil && op.Batches != nil {
batchOffset = len(dst)
info.processedBatches, dst, err = op.Batches.AppendBatchSequence(dst,
int(desc.MaxBatchCount), int(desc.MaxDocumentSize), int(desc.MaxDocumentSize),
int(desc.MaxBatchCount), int(desc.MaxMessageSize),
)
if err != nil {
break
Expand All @@ -1383,12 +1386,8 @@ func (op Operation) createWireMessage(
default:
var batches []byte
if op.Batches != nil {
maxDocSize := -1
if unacknowledged {
maxDocSize = int(desc.MaxDocumentSize)
}
info.processedBatches, batches, err = op.Batches.AppendBatchSequence(batches,
int(desc.MaxBatchCount), maxDocSize, int(desc.MaxMessageSize),
int(desc.MaxBatchCount), int(desc.MaxMessageSize),
)
if err != nil {
break
Expand Down Expand Up @@ -1443,14 +1442,14 @@ func (op Operation) addEncryptCommandFields(ctx context.Context, dst []byte, des
var n int
if op.Batches != nil {
if maxBatchCount := int(desc.MaxBatchCount); maxBatchCount > 1 {
n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, maxBatchCount, cryptMaxBsonObjectSize, cryptMaxBsonObjectSize)
n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, maxBatchCount, cryptMaxBsonObjectSize)
if err != nil {
return 0, nil, err
}
}
if n == 0 {
maxDocumentSize := int(desc.MaxDocumentSize)
n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, 1, maxDocumentSize, maxDocumentSize)
// maxDocumentSize := int(desc.MaxDocumentSize)
n, cmdDst, err = op.Batches.AppendBatchArray(cmdDst, 1, int(desc.MaxMessageSize))
if err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -1483,8 +1482,8 @@ func (op Operation) addLegacyCommandFields(dst []byte, desc description.Selected
return 0, dst, nil
}
var n int
maxDocumentSize := int(desc.MaxDocumentSize)
n, dst, err = op.Batches.AppendBatchArray(dst, int(desc.MaxBatchCount), maxDocumentSize, maxDocumentSize)
// maxDocumentSize := int(desc.MaxDocumentSize)
n, dst, err = op.Batches.AppendBatchArray(dst, int(desc.MaxBatchCount), int(desc.MaxMessageSize))
if err != nil {
return 0, nil, err
}
Expand Down

0 comments on commit 82df60a

Please sign in to comment.