Skip to content

Commit

Permalink
FMWK-600-reading-in-order
Browse files Browse the repository at this point in the history
- refactoring
  • Loading branch information
filkeith committed Dec 23, 2024
1 parent 9966b35 commit cd0e1a3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 62 deletions.
106 changes: 44 additions & 62 deletions io/aws/s3/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func NewReader(
return nil, fmt.Errorf("path is required, use WithDir(path string) or WithFile(path string) to set")
}

if r.sort != "" && r.sort != SortASC && r.sort != SortDESC {
return nil, fmt.Errorf("unknown sorting type %s", r.sort)
}

// Check if the bucket exists and we have permissions.
if _, err := client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: aws.String(bucketName),
Expand Down Expand Up @@ -120,27 +124,15 @@ func (r *Reader) streamDirectory(
// start serving goroutines.
var wg sync.WaitGroup

objectsToOpen := make(chan *string, bufferSize)

wg.Add(1)

objectsToSort := make(chan *string, bufferSize)

go func() {
defer wg.Done()
r.openObjects(ctx, objectsToOpen, readersCh, errorsCh)
r.sortObjects(ctx, objectsToSort, readersCh, errorsCh)
}()

var objectsToSort chan *string
if r.sort != "" {
objectsToSort = make(chan *string, bufferSize)

wg.Add(1)

go func() {
defer wg.Done()
r.sortObjects(ctx, objectsToSort, objectsToOpen)
}()
}

var continuationToken *string

for {
Expand Down Expand Up @@ -171,37 +163,35 @@ func (r *Reader) streamDirectory(
}
}

switch {
case r.sort != "":
objectsToSort <- p.Key
default:
objectsToOpen <- p.Key
}
objectsToSort <- p.Key
}

continuationToken = listResponse.NextContinuationToken
if continuationToken == nil {
if r.sort != "" {
// Don't defer this line. We must close this channel, to stop goroutine and release a wait group.
close(objectsToSort)
}

break
}
}
// Close only if we are not sorting. If we sort, this channel will be closed in another goroutine.
if r.sort == "" {
// Don't defer this line. We must close this channel, to stop goroutine and release a wait group.
close(objectsToOpen)
}

close(objectsToSort)
wg.Wait()
}

// sortObjects receives keys, sort them, and then send to open chan.
func (r *Reader) sortObjects(ctx context.Context, objectsToSort <-chan *string, objectsToOpen chan<- *string) {
defer close(objectsToOpen)
func (r *Reader) sortObjects(
ctx context.Context,
objectsToSort <-chan *string,
readersCh chan<- io.ReadCloser,
errorsCh chan<- error,
) {
// If we don't need to sort objects, open them.
if r.sort == "" {
for path := range objectsToSort {
r.openObject(ctx, path, readersCh, errorsCh)
}

return
}
// If we need to sort.
keys := make([]string, 0)

for key := range objectsToSort {
Expand All @@ -222,46 +212,38 @@ func (r *Reader) sortObjects(ctx context.Context, objectsToSort <-chan *string,
}

for _, k := range keys {
objectsToOpen <- &k
r.openObject(ctx, &k, readersCh, errorsCh)
}
}

// openObjects creates object readers and sends them readersCh.
func (r *Reader) openObjects(
ctx context.Context,
objectsToOpen <-chan *string,
func (r *Reader) openObject(ctx context.Context,
path *string,
readersCh chan<- io.ReadCloser,
errorsCh chan<- error,
) {
for key := range objectsToOpen {
if ctx.Err() != nil {
return
}

object, err := r.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &r.bucketName,
Key: key,
})
object, err := r.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &r.bucketName,
Key: path,
})

if err != nil {
// Skip 404 not found error.
var opErr *smithy.OperationError
if errors.As(err, &opErr) {
var httpErr *awsHttp.ResponseError
if errors.As(opErr.Err, &httpErr) && httpErr.HTTPStatusCode() == http.StatusNotFound {
continue
}
if err != nil {
// Skip 404 not found error.
var opErr *smithy.OperationError
if errors.As(err, &opErr) {
var httpErr *awsHttp.ResponseError
if errors.As(opErr.Err, &httpErr) && httpErr.HTTPStatusCode() == http.StatusNotFound {
return
}
}

// We check *p.Key == nil in the beginning.
errorsCh <- fmt.Errorf("failed to open directory file %s: %w", *key, err)
// We check *p.Key == nil in the beginning.
errorsCh <- fmt.Errorf("failed to open directory file %s: %w", *path, err)

return
}
return
}

if object != nil {
readersCh <- object.Body
}
if object != nil {
readersCh <- object.Body
}
}

Expand Down
4 changes: 4 additions & 0 deletions io/local/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func NewReader(opts ...Opt) (*Reader, error) {
return nil, fmt.Errorf("path is required, use WithDir(path string) or WithFile(path string) to set")
}

if r.sort != "" && r.sort != SortASC && r.sort != SortDESC {
return nil, fmt.Errorf("unknown sorting type %s", r.sort)
}

return r, nil
}

Expand Down

0 comments on commit cd0e1a3

Please sign in to comment.