Skip to content

Commit

Permalink
Merge pull request #326 from teharrison/master
Browse files Browse the repository at this point in the history
update preauth, add ability do download multiple node files in archive format
  • Loading branch information
teharrison authored Apr 13, 2017
2 parents a5e8cb0 + c65e077 commit c3f162f
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 112 deletions.
6 changes: 6 additions & 0 deletions RELEASE_NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# v0.9.21

- graceful error handling of missing .bson file
- add more to preauth return: file size, options used
- added ability to download multiple files (.tar or .zip format) from a query

# v0.9.20

- add priority field to node along with index and set option
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.20
0.9.21
56 changes: 54 additions & 2 deletions shock-server/controller/node/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
e "github.com/MG-RAST/Shock/shock-server/errors"
"github.com/MG-RAST/Shock/shock-server/logger"
"github.com/MG-RAST/Shock/shock-server/node"
"github.com/MG-RAST/Shock/shock-server/node/archive"
"github.com/MG-RAST/Shock/shock-server/preauth"
"github.com/MG-RAST/Shock/shock-server/request"
"github.com/MG-RAST/Shock/shock-server/responder"
"github.com/MG-RAST/Shock/shock-server/user"
Expand Down Expand Up @@ -72,7 +74,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {

// Gather params to make db query. Do not include the following list.
if _, ok := query["query"]; ok {
paramlist := map[string]int{"query": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1}
paramlist := map[string]int{"query": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "download_url": 1, "file_name": 1, "archive": 1}
for key := range query {
if _, found := paramlist[key]; !found {
keyStr := fmt.Sprintf("attributes.%s", key)
Expand All @@ -86,7 +88,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {
}
}
} else if _, ok := query["querynode"]; ok {
paramlist := map[string]int{"querynode": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1}
paramlist := map[string]int{"querynode": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "download_url": 1, "file_name": 1, "archive": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1}
for key := range query {
if _, found := paramlist[key]; !found {
for _, value := range query[key] {
Expand Down Expand Up @@ -197,6 +199,56 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}

// process preauth url request, requires archive option
if _, ok := query["download_url"]; ok {
// add options - set defaults first
options := map[string]string{}
options["archive"] = "zip" // default is zip
if _, ok := query["archive"]; ok {
if archive.IsValidToArchive(query.Get("archive")) {
options["archive"] = query.Get("archive")
}
}
preauthId := util.RandString(20)
if _, ok := query["file_name"]; ok {
options["filename"] = query.Get("file_name")
} else {
options["filename"] = preauthId
}
if !strings.HasSuffix(options["filename"], options["archive"]) {
options["filename"] = options["filename"] + "." + options["archive"]
}
// get valid nodes
var nodeIds []string
var totalBytes int64
for _, n := range nodes {
if n.HasFile() {
nodeIds = append(nodeIds, n.Id)
totalBytes += n.File.Size
}
}
if len(nodeIds) == 0 {
return responder.RespondWithError(ctx, http.StatusBadRequest, "err:@node_ReadMany download url: no available files found")
}
// set preauth
if p, err := preauth.New(preauthId, "download", nodeIds, options); err != nil {
err_msg := "err:@node_ReadMany download_url: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
} else {
data := preauth.PreAuthResponse{
Url: util.ApiUrl(ctx) + "/preauth/" + p.Id,
ValidTill: p.ValidTill.Format(time.ANSIC),
Format: options["archive"],
Filename: options["filename"],
Files: len(nodeIds),
Size: totalBytes,
}
return responder.RespondWithPaginatedData(ctx, data, limit, offset, count)
}
}

return responder.RespondWithPaginatedData(ctx, nodes, limit, offset, count)
}

Expand Down
18 changes: 14 additions & 4 deletions shock-server/controller/node/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
var fFunc filter.FilterFunc = nil
var compressionFormat string = ""
// use query params if exist
if _, ok := query["filename"]; ok {
filename = query.Get("filename")
if _, ok := query["file_name"]; ok {
filename = query.Get("file_name")
}
if _, ok := query["filter"]; ok {
if filter.Has(query.Get("filter")) {
Expand Down Expand Up @@ -378,6 +378,7 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
if !n.HasFile() {
return responder.RespondWithError(ctx, http.StatusBadRequest, e.NodeNoFile)
} else {
preauthFilename := filename
// add options
options := map[string]string{}
options["filename"] = filename
Expand All @@ -386,14 +387,23 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
}
if compressionFormat != "" {
options["compression"] = compressionFormat
preauthFilename = preauthFilename + "." + compressionFormat
}
// set preauth
if p, err := preauth.New(util.RandString(20), "download", n.Id, options); err != nil {
if p, err := preauth.New(util.RandString(20), "download", []string{n.Id}, options); err != nil {
err_msg := "err:@node_Read download_url: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
} else {
return responder.RespondWithData(ctx, util.UrlResponse{Url: util.ApiUrl(ctx) + "/preauth/" + p.Id, ValidTill: p.ValidTill.Format(time.ANSIC)})
data := preauth.PreAuthResponse{
Url: util.ApiUrl(ctx) + "/preauth/" + p.Id,
ValidTill: p.ValidTill.Format(time.ANSIC),
Format: options["compression"],
Filename: preauthFilename,
Files: 1,
Size: n.File.Size,
}
return responder.RespondWithData(ctx, data)
}
}
} else if _, ok := query["download_post"]; ok {
Expand Down
143 changes: 94 additions & 49 deletions shock-server/controller/preauth/preauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package preauth
import (
"github.com/MG-RAST/Shock/shock-server/logger"
"github.com/MG-RAST/Shock/shock-server/node"
"github.com/MG-RAST/Shock/shock-server/node/archive"
"github.com/MG-RAST/Shock/shock-server/node/file"
"github.com/MG-RAST/Shock/shock-server/node/file/index"
"github.com/MG-RAST/Shock/shock-server/node/filter"
Expand All @@ -24,37 +23,25 @@ func PreAuthRequest(ctx context.Context) {
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
} else {
if n, err := node.Load(p.NodeId); err == nil {
switch p.Type {
case "download":
streamDownload(ctx, n, p.Options)
preauth.Delete(id)
default:
responder.RespondWithError(ctx, http.StatusNotFound, "Preauthorization type not supported: "+p.Type)
}
} else {
err_msg := "err:@preAuth loadnode: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
switch p.Type {
case "download":
streamDownload(ctx, id, p.Nodes, p.Options)
preauth.Delete(id)
default:
responder.RespondWithError(ctx, http.StatusNotFound, "Preauthorization type not supported: "+p.Type)
}
}
return
}

// handle download and its options
func streamDownload(ctx context.Context, n *node.Node, options map[string]string) {
nf, err := n.FileReader()
defer nf.Close()
if err != nil {
err_msg := "err:@preAuth node.FileReader: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
return
}
// set defaults
filename := n.Id
func streamDownload(ctx context.Context, pid string, nodes []string, options map[string]string) {
// get defaults
filename := pid
var filterFunc filter.FilterFunc = nil
var compressionFormat string = ""
var archiveFormat string = ""

// use options if exist
if fn, has := options["filename"]; has {
filename = fn
Expand All @@ -65,35 +52,93 @@ func streamDownload(ctx context.Context, n *node.Node, options map[string]string
}
}
if cp, has := options["compression"]; has {
if archive.IsValidCompress(cp) {
compressionFormat = cp
}
compressionFormat = cp
}
// stream it
var s *request.Streamer
if n.Type == "subset" {
s = &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat}
if n.File.Size == 0 {
// handle empty subset file
s.R = append(s.R, nf)
} else {
idx := index.New()
fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10)
recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits)
if err != nil {
responder.RespondWithError(ctx, http.StatusInternalServerError, err.Error())
return
}
for _, rec := range recSlice {
s.R = append(s.R, io.NewSectionReader(nf, rec[0], rec[1]))
if ar, has := options["archive"]; has {
archiveFormat = ar
}
var files []*file.FileInfo

// process nodes
for _, nid := range nodes {
// get node
n, err := node.Load(nid)
if (err != nil) || !n.HasFile() {
continue
}
// get filereader
nf, err := n.FileReader()
if err != nil {
nf.Close()
continue
}
// add to file array
var fileInfo file.FileInfo
if n.Type == "subset" {
if n.File.Size == 0 {
// handle empty subset file
fileInfo.R = append(fileInfo.R, nf)
} else {
idx := index.New()
fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10)
recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits)
if err != nil {
nf.Close()
continue
}
for _, rec := range recSlice {
fileInfo.R = append(fileInfo.R, io.NewSectionReader(nf, rec[0], rec[1]))
}
}
} else {
fileInfo.R = append(fileInfo.R, nf)
}
} else {
s = &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat}
defer nf.Close()
// add to file info
fileInfo.Name = n.File.Name
fileInfo.Size = n.File.Size
fileInfo.ModTime = n.File.CreatedOn
if _, ok := n.File.Checksum["md5"]; ok {
fileInfo.Checksum = n.File.Checksum["md5"]
}
files = append(files, &fileInfo)
}
if err = s.Stream(false); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@preAuth: s.stream: " + err.Error()

if (len(nodes) == 1) && (len(files) == 1) {
// create single node / file streamer
s := &request.Streamer{
R: files[0].R,
W: ctx.HttpResponseWriter(),
ContentType: "application/octet-stream",
Filename: filename,
Size: files[0].Size,
Filter: filterFunc,
Compression: compressionFormat,
}
if err := s.Stream(false); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@preAuth: s.stream: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
}
} else if (len(files) > 1) && (archiveFormat != "") {
// create multi node / file streamer, must have archive format
m := &request.MultiStreamer{
Files: files,
W: ctx.HttpResponseWriter(),
ContentType: "application/octet-stream",
Filename: filename,
Archive: archiveFormat,
}
if err := m.MultiStream(); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@preAuth: m.multistream: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
}
} else {
// something broke
err_msg := "err:@preAuth: no files available to download for given combination of options"
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
Expand Down
Loading

0 comments on commit c3f162f

Please sign in to comment.