Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support putifabsent #8428

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d92b29d
works
ItamarYuran Dec 16, 2024
f18a67c
multi if none mash
ItamarYuran Dec 17, 2024
81de1fc
trimmmin' them
ItamarYuran Dec 17, 2024
7d2a511
yalla
ItamarYuran Dec 17, 2024
768ef8c
only if absent babe
ItamarYuran Dec 22, 2024
b07a5ab
erroring
ItamarYuran Dec 22, 2024
c8a508c
tests v1
ItamarYuran Dec 24, 2024
81c8a79
tests v2
ItamarYuran Dec 24, 2024
5fc97da
test v3
ItamarYuran Dec 24, 2024
1dd06fd
test v4
ItamarYuran Dec 24, 2024
a635bb4
test v5
ItamarYuran Dec 24, 2024
717f224
test yalla
ItamarYuran Dec 24, 2024
d558f57
user metadata
ItamarYuran Dec 25, 2024
150776e
beginning s3 client
ItamarYuran Dec 25, 2024
d7e22f4
beginning s3 client
ItamarYuran Dec 25, 2024
4b1533b
local host
ItamarYuran Dec 25, 2024
1528c90
local host
ItamarYuran Dec 25, 2024
08cc251
amen
ItamarYuran Dec 25, 2024
2873d3a
main/object
ItamarYuran Dec 25, 2024
7e42c88
yalla
ItamarYuran Dec 25, 2024
b46deb3
yalla kadima
ItamarYuran Dec 25, 2024
126144f
debug
ItamarYuran Dec 25, 2024
4a56ace
pront all headers
ItamarYuran Dec 25, 2024
c4387d9
I really think it will work now
ItamarYuran Dec 26, 2024
51d8105
this time bby
ItamarYuran Dec 26, 2024
6f2431a
now is the time
ItamarYuran Dec 26, 2024
047bd48
yalla
ItamarYuran Dec 26, 2024
55a2d72
lets see
ItamarYuran Dec 26, 2024
c620816
maybe now
ItamarYuran Dec 26, 2024
69764b1
with multipart test
ItamarYuran Dec 26, 2024
0b2e2b4
test will now pass
ItamarYuran Dec 26, 2024
8b4d12f
svc to s3
ItamarYuran Dec 26, 2024
67da789
smol change
ItamarYuran Dec 26, 2024
005f174
its gonna work i tell u
ItamarYuran Dec 26, 2024
278a345
nooooow
ItamarYuran Dec 26, 2024
94ed11f
looks like we got it
ItamarYuran Dec 26, 2024
e7508d5
formatted
ItamarYuran Dec 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions esti/s3_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/go-openapi/swag"
"io"
"math/rand"
"net/http"
Expand All @@ -16,6 +13,11 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/middleware"
"github.com/go-openapi/swag"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/tags"
Expand Down Expand Up @@ -182,6 +184,82 @@ func TestS3UploadAndDownload(t *testing.T) {
}
}

func setHTTPHeaders(ifNoneMatch string) func(*middleware.Stack) error {
return func(stack *middleware.Stack) error {
return stack.Build.Add(middleware.BuildMiddlewareFunc("AddIfNoneMatchHeader", func(
ctx context.Context, in middleware.BuildInput, next middleware.BuildHandler,
) (
middleware.BuildOutput, middleware.Metadata, error,
) {
if req, ok := in.Request.(*http.Request); ok {
// Add the If-None-Match header
req.Header.Set("If-None-Match", ifNoneMatch)
fmt.Printf("Set If-None-Match header: %s\n", ifNoneMatch) // Debug logging
}
// Continue with the next middleware handler
return next.HandleBuild(ctx, in)
}), middleware.Before)
}
}
func TestS3IfNoneMatch(t *testing.T) {
const parallelism = 10

ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

s3Endpoint := viper.GetString("s3_endpoint")
s3Client := createS3Client(s3Endpoint, t)

type TestCase struct {
Path string
Content string
IfNoneMatch string
ExpectError bool
}

testCases := []TestCase{
{Path: "main/object1", Content: "data", IfNoneMatch: "", ExpectError: false},
{Path: "main/object1", Content: "data", IfNoneMatch: "*", ExpectError: true},
{Path: "main/object2", Content: "data", IfNoneMatch: "*", ExpectError: false},
{Path: "main/object2", Content: "data", IfNoneMatch: "", ExpectError: false},
{Path: "main/object3", Content: "data", IfNoneMatch: "hi", ExpectError: true},
}

objects := make(chan TestCase, parallelism*2)
wg := sync.WaitGroup{}

wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func() {
defer wg.Done()
for tc := range objects {
// Create the PutObject request
input := &s3.PutObjectInput{
Bucket: aws.String(repo),
Key: aws.String(tc.Path),
Body: strings.NewReader(tc.Content),
}
fmt.Printf("Sending PutObject request for Path: %s with If-None-Match: %s\n", tc.Path, tc.IfNoneMatch) // Debug logging
_, err := s3Client.PutObject(ctx, input, s3.WithAPIOptions(setHTTPHeaders(tc.IfNoneMatch)))

if tc.ExpectError {
require.Error(t, err, "was expecting an error")
} else {
require.NoError(t, err, "wasn't expecting error")
}
}
}()
}
// Enqueue test cases
for _, tc := range testCases {
objects <- tc
}
close(objects)

// Wait for all workers to finish
wg.Wait()
}

func verifyObjectInfo(t *testing.T, got minio.ObjectInfo, expectedSize int) {
if got.Err != nil {
t.Errorf("%s: %s", got.Key, got.Err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/gateway/operations/operation_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/logging"
)

Expand Down Expand Up @@ -40,7 +41,7 @@ func shouldReplaceMetadata(req *http.Request) bool {
return req.Header.Get(amzMetadataDirectiveHeaderPrefix) == "REPLACE"
}

func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error {
func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string, allowOverWrite bool) error {
var writeTime time.Time
if mTime == nil {
writeTime = time.Now()
Expand All @@ -59,7 +60,7 @@ func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checks
ContentType(contentType).
Build()

err := o.Catalog.CreateEntry(req.Context(), o.Repository.Name, o.Reference, entry)
err := o.Catalog.CreateEntry(req.Context(), o.Repository.Name, o.Reference, entry, graveler.WithIfAbsent(!allowOverWrite))
if err != nil {
o.Log(req).WithError(err).Error("could not update metadata")
return err
Expand Down
15 changes: 14 additions & 1 deletion pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrInternalError))
return
}
allowOverWrite, err := o.checkIfAbsent(req)
if errors.Is(err, gatewayErrors.ErrPreconditionFailed) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
if errors.Is(err, gatewayErrors.ErrNotImplemented) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented))
return
}
objName := multiPart.PhysicalAddress
req = req.WithContext(logging.AddFields(req.Context(), logging.Fields{logging.PhysicalAddressFieldKey: objName}))
xmlMultipartComplete, err := io.ReadAll(req.Body)
Expand Down Expand Up @@ -124,7 +133,11 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
return
}
checksum := strings.Split(resp.ETag, "-")[0]
err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType)
err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType, allowOverWrite)
if errors.Is(err, graveler.ErrPreconditionFailed) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand Down
41 changes: 39 additions & 2 deletions pkg/gateway/operations/putobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
Expand All @@ -20,6 +21,7 @@
)

const (
IfNoneMatchHeader = "If-None-Match"
CopySourceHeader = "x-amz-copy-source"
CopySourceRangeHeader = "x-amz-copy-source-range"
QueryParamUploadID = "uploadId"
Expand All @@ -30,7 +32,6 @@

func (controller *PutObject) RequiredPermissions(req *http.Request, repoID, _, destPath string) (permissions.Node, error) {
copySource := req.Header.Get(CopySourceHeader)

if len(copySource) == 0 {
return permissions.Node{
Permission: permissions.Permission{
Expand Down Expand Up @@ -298,6 +299,15 @@
o.Incr("put_object", o.Principal, o.Repository.Name, o.Reference)
storageClass := StorageClassFromHeader(req.Header)
opts := block.PutOpts{StorageClass: storageClass}
allowOverWrite, err := o.checkIfAbsent(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future generations, please add comment explaining this, something or exactly like this otherwise it's confusing, since this does not actually the verify core part, it's more optimization (since not atomicity here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
allowOverWrite, err := o.checkIfAbsent(req)
allowOverwrite, err := o.checkIfAbsent(req)

if errors.Is(err, gatewayErrors.ErrPreconditionFailed) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
if errors.Is(err, gatewayErrors.ErrNotImplemented) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented))
return
}
address := o.PathProvider.NewPath()
blob, err := upload.WriteBlob(req.Context(), o.BlockStore, o.Repository.StorageNamespace, address, req.Body, req.ContentLength, opts)
if err != nil {
Expand All @@ -309,7 +319,11 @@
// write metadata
metadata := amzMetaAsMetadata(req)
contentType := req.Header.Get("Content-Type")
err = o.finishUpload(req, &blob.CreationDate, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType)
err = o.finishUpload(req, &blob.CreationDate, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType, allowOverWrite)
if errors.Is(err, graveler.ErrPreconditionFailed) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand All @@ -325,3 +339,26 @@
o.SetHeader(w, "ETag", httputil.ETag(blob.Checksum))
w.WriteHeader(http.StatusOK)
}

func (o *PathOperation) checkIfAbsent(req *http.Request) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this function confusing because:
It does 3 different things:

  • It checks and validates the the header value
  • Get's the value from catalog.
  • Returns allowOverride indicator for the real code somewhere else that checks if absent.

Now that is confusing because it's a "lie" - it only's only partially checking ifAbsent as optimization, since the real check performed later and also does input validation.

I would prefer something much more explicit like a function that extracts the header and validates it, then inline check if object exist:

// checkIfAbsent sets allowOverwrite and validates the header value if set 
allowOverwrite, err := o.checkIfAbsent(req)
if err != nil { 
    // ...
}
if !allowOverwrite { 
    // first check if object exist as optimization to save resources 
    _, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{}) 
    // hadle if err != nil ...
}

for key, values := range req.Header {
for _, value := range values {
fmt.Printf("HEADER: %s = %s\n", key, value)
Fixed Show fixed Hide fixed
}
}
Header := req.Header.Get(IfNoneMatchHeader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Use lowercase for variable names
  • Be explicit in variable name
Suggested change
Header := req.Header.Get(IfNoneMatchHeader)
headerValue := req.Header.Get(IfNoneMatchHeader)

fmt.Println("HEADER: ", Header)
if Header == "" {
return true, nil
}
if Header == "*" {
_, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{})
if err == nil {
return false, gatewayErrors.ErrPreconditionFailed
}
if !errors.Is(err, graveler.ErrNotFound) {
return false, gatewayErrors.ErrInternalError
}
}
return false, gatewayErrors.ErrNotImplemented
}
Loading