Skip to content

Commit

Permalink
Merge pull request #354 from MG-RAST/develop
Browse files Browse the repository at this point in the history
locks and async actions added
  • Loading branch information
teharrison authored Jun 6, 2018
2 parents a721858 + f6c6051 commit 480b72a
Show file tree
Hide file tree
Showing 32 changed files with 889 additions and 455 deletions.
13 changes: 13 additions & 0 deletions RELEASE_NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# v0.9.26

- remove repeat saves
- fix chunkrecord indexer for fastq and last record
- handle index of empty files gracefully
- overhaul locker, now can lock node, file, index
- if file locked, unable to download file
- if index loxked, unable to download using that index
- locks have timeouts and exist in-memory only
- parts nodes merge is async and locks file during merge
- indexing is async and locks index during build
- add /locker /locked/node /locked/file locked/index resources to view states

# v0.9.25

- add node lock to index and acl actions
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.25
0.9.26
5 changes: 3 additions & 2 deletions shock-server/controller/node/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
e "github.com/MG-RAST/Shock/shock-server/errors"
"github.com/MG-RAST/Shock/shock-server/logger"
"github.com/MG-RAST/Shock/shock-server/node"
"github.com/MG-RAST/Shock/shock-server/node/locker"
"github.com/MG-RAST/Shock/shock-server/request"
"github.com/MG-RAST/Shock/shock-server/responder"
"github.com/MG-RAST/Shock/shock-server/user"
Expand Down Expand Up @@ -138,14 +139,14 @@ func AclTypedRequest(ctx context.Context) {
}

// lock node
err = node.LockMgr.LockNode(nid)
err = locker.NodeLockMgr.LockNode(nid)
if err != nil {
err_msg := "err@node_Acl: (LockNode) id=" + nid + ": " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}
defer node.LockMgr.UnlockNode(nid)
defer locker.NodeLockMgr.UnlockNode(nid)

// Users that are not an admin or the node owner can only delete themselves from an ACL.
if n.Acl.Owner != u.Uuid && u.Admin == false {
Expand Down
5 changes: 3 additions & 2 deletions shock-server/controller/node/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/MG-RAST/Shock/shock-server/logger"
"github.com/MG-RAST/Shock/shock-server/node"
"github.com/MG-RAST/Shock/shock-server/node/archive"
"github.com/MG-RAST/Shock/shock-server/node/file"
"github.com/MG-RAST/Shock/shock-server/preauth"
"github.com/MG-RAST/Shock/shock-server/request"
"github.com/MG-RAST/Shock/shock-server/responder"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (cr *NodeController) Create(ctx context.Context) error {
// all POSTed files writen to temp dir
params, files, err := request.ParseMultipartForm(ctx.HttpRequest())
// clean up temp dir !!
defer node.RemoveAllFormFiles(files)
defer file.RemoveAllFormFiles(files)
if err != nil {
if err.Error() == "request Content-Type isn't multipart/form-data" {
// If not multipart/form-data it will try to read the Body of the
Expand Down Expand Up @@ -109,7 +110,7 @@ func (cr *NodeController) Create(ctx context.Context) error {
logger.Error("err@node_Create: (download_url) (Authenticate) id=" + id + ": " + e.UnAuth)
return responder.RespondWithError(ctx, http.StatusUnauthorized, e.UnAuth)
}
if n.HasFile() {
if n.HasFile() && !n.HasFileLock() {
nodeIds = append(nodeIds, n.Id)
totalBytes += n.File.Size
}
Expand Down
210 changes: 42 additions & 168 deletions shock-server/controller/node/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"github.com/MG-RAST/Shock/shock-server/logger"
"github.com/MG-RAST/Shock/shock-server/node"
"github.com/MG-RAST/Shock/shock-server/node/file/index"
"github.com/MG-RAST/Shock/shock-server/node/locker"
"github.com/MG-RAST/Shock/shock-server/request"
"github.com/MG-RAST/Shock/shock-server/responder"
"github.com/MG-RAST/Shock/shock-server/user"
"github.com/MG-RAST/golib/stretchr/goweb/context"
mgo "gopkg.in/mgo.v2"
"net/http"
"os"
"strconv"
"time"
)

type getRes struct {
Expand Down Expand Up @@ -110,19 +109,39 @@ func IndexTypedRequest(ctx context.Context) {
}

// lock node
err = node.LockMgr.LockNode(nid)
err = locker.NodeLockMgr.LockNode(nid)
if err != nil {
err_msg := "err@node_Index: (LockNode) id=" + nid + ": " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}
defer node.LockMgr.UnlockNode(nid)
defer locker.NodeLockMgr.UnlockNode(nid)

// check for locks and file
if n.HasFileLock() {
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + e.NodeFileLock)
responder.RespondWithError(ctx, http.StatusLocked, e.NodeFileLock)
return
} else if n.HasIndexLock(idxType) {
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + e.NodeIndexLock)
responder.RespondWithError(ctx, http.StatusLocked, e.NodeIndexLock)
return
} else if !n.HasFile() {
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + e.NodeNoFile)
responder.RespondWithError(ctx, http.StatusBadRequest, e.NodeNoFile)
return
} else if idxType == "" {
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + e.InvalidIndex)
responder.RespondWithError(ctx, http.StatusBadRequest, e.InvalidIndex)
return
}

// Gather query params
query := ctx.HttpRequest().URL.Query()
_, forceRebuild := query["force_rebuild"]

// does it already exist
if _, has := n.Indexes[idxType]; has {
if idxType == "size" {
responder.RespondOK(ctx)
Expand All @@ -135,21 +154,10 @@ func IndexTypedRequest(ctx context.Context) {
}
}

if !n.HasFile() {
err_msg := "Node has no file."
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
} else if idxType == "" {
err_msg := "Index create requires type."
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}
// check for invalid combinations
if _, ok := index.Indexers[idxType]; !ok && idxType != "bai" && idxType != "subset" && idxType != "column" {
err_msg := fmt.Sprintf("Index type %s unavailable.", idxType)
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
logger.Error("err@node_Index: (node.Indexes) id=" + nid + ": " + e.InvalidIndex)
responder.RespondWithError(ctx, http.StatusBadRequest, e.InvalidIndex)
return
}
if idxType == "size" {
Expand All @@ -158,108 +166,28 @@ func IndexTypedRequest(ctx context.Context) {
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

if conf.LOG_PERF {
logger.Perf("START indexing: " + nid)
}

if idxType == "bai" {
//bam index is created by the command-line tool samtools
if n.Type == "subset" {
err_msg := "subset nodes do not support bam indices"
logger.Error("err@node_Index: (index/bai) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

if ext := n.FileExt(); ext == ".bam" {
if err := index.CreateBamIndex(n.FilePath()); err != nil {
err_msg := "Error while creating bam index."
logger.Error("err@node_Index: (index/bai) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
return
}
responder.RespondOK(ctx)
} else {
if ext := n.FileExt(); ext != ".bam" {
err_msg := "Index type bai requires .bam file."
logger.Error("err@node_Index: (index/bai) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}
}
if n.Type == "subset" && (idxType != "chunkrecord" || n.Subset.Parent.IndexName != "record") {
err_msg := "For subset nodes, Shock currently only supports subset and chunkrecord indexes. Also, for a chunkrecord index, the subset node must have been generated from a record index."
logger.Error("err@node_Index: (index/subset) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

subsetSize := int64(0)
count := int64(0)
indexFormat := ""
subsetName := ""
if idxType == "subset" {
// Utilizing the multipart form parser since we need to upload a file.
params, files, err := request.ParseMultipartForm(ctx.HttpRequest())
// clean up temp dir !!
defer node.RemoveAllFormFiles(files)
if err != nil {
err_msg := "err@node_Index: (ParseMultipartForm) id=" + nid + ":" + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

parentIndex, hasParent := params["parent_index"]
if !hasParent {
err_msg := "Index type subset requires parent_index param."
logger.Error("err@node_Index: (index/subset) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
} else if _, has := n.Indexes[parentIndex]; !has {
err_msg := fmt.Sprintf("Node %s does not have index of type %s.", n.Id, parentIndex)
logger.Error("err@node_Index: (index/subset) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

newIndex, hasName := params["index_name"]
if !hasName {
err_msg := "Index type subset requires index_name param."
logger.Error("err@node_Index: (index/subset) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
} else if _, reservedName := index.Indexers[newIndex]; reservedName || newIndex == "bai" {
err_msg := fmt.Sprintf("%s is a reserved index name and cannot be used to create a custom subset index.", newIndex)
logger.Error("err@node_Index: (index/subset) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}
subsetName = newIndex

subsetIndices, hasFile := files["subset_indices"]
if !hasFile {
err_msg := "Index type subset requires subset_indices file."
logger.Error("err@node_Index: (index/subset) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

f, _ := os.Open(subsetIndices.Path)
defer f.Close()
idxer := index.NewSubsetIndexer(f)

// we default to "array" index format for backwards compatibility
indexFormat = "array"
if n.Indexes[parentIndex].Format == "array" || n.Indexes[parentIndex].Format == "matrix" {
indexFormat = n.Indexes[parentIndex].Format
}
count, subsetSize, err = index.CreateSubsetIndex(&idxer, n.IndexPath()+"/"+newIndex+".idx", n.IndexPath()+"/"+parentIndex+".idx", indexFormat, n.Indexes[parentIndex].TotalUnits)
if err != nil {
err_msg := "err@node_Index: (index/subset) id=" + nid + ":" + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

} else if idxType == "column" {
// Gather query params
query := ctx.HttpRequest().URL.Query()

var colNum int
if idxType == "column" {
if n.Type == "subset" {
err_msg := "Shock does not support column index creation on subset nodes."
logger.Error("err@node_Index: (index/column) id=" + nid + ": " + err_msg)
Expand All @@ -273,73 +201,22 @@ func IndexTypedRequest(ctx context.Context) {
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

num_str := query.Get("number")
idxType = idxType + num_str
num, err := strconv.Atoi(num_str)
if err != nil || num < 1 {
colNum, err = strconv.Atoi(num_str)
if err != nil || colNum < 1 {
err_msg := "Index type column requires a number parameter in the url of an integer greater than zero."
logger.Error("err@node_Index: (index/column) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

f, _ := os.Open(n.FilePath())
defer f.Close()
idxer := index.NewColumnIndexer(f)
count, indexFormat, err = index.CreateColumnIndex(&idxer, num, n.IndexPath()+"/"+idxType+".idx")
if err != nil {
err_msg := "err@node_Index: (CreateColumnIndex) id=" + nid + ":" + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}
} else {
if n.Type == "subset" && (idxType != "chunkrecord" || n.Subset.Parent.IndexName != "record") {
err_msg := "For subset nodes, Shock currently only supports subset and chunkrecord indexes. Also, for a chunkrecord index, the subset node must have been generated from a record index."
logger.Error("err@node_Index: (index/subset) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

newIndexer := index.Indexers[idxType]
f, _ := os.Open(n.FilePath())
defer f.Close()
var idxer index.Indexer
if n.Type == "subset" {
idxer = newIndexer(f, n.Type, n.Subset.Index.Format, n.IndexPath()+"/"+n.Subset.Parent.IndexName+".idx")
} else {
idxer = newIndexer(f, n.Type, "", "")
}
count, indexFormat, err = idxer.Create(n.IndexPath() + "/" + idxType + ".idx")
if err != nil {
err_msg := "err@node_Index: (idxer.Create) id=" + nid + ": " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}
}

if count == 0 {
err_msg := "Index empty."
logger.Error("err@node_Index: (index) id=" + nid + ": " + err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
return
}

idxInfo := node.IdxInfo{
Type: idxType,
TotalUnits: count,
AvgUnitSize: n.File.Size / count,
Format: indexFormat,
CreatedOn: time.Now(),
// lock this index, trigger async indexing, save state
idxInfo := &node.IdxInfo{
Type: idxType,
Locked: locker.IndexLockMgr.Add(nid, idxType),
}

if idxType == "subset" {
idxType = subsetName
idxInfo.AvgUnitSize = subsetSize / count
}

n.SetIndexInfo(idxType, idxInfo)
if err = n.Save(); err != nil {
err_msg := "err@node_Index (node.Save): id=" + nid + ": " + err.Error()
Expand All @@ -348,10 +225,7 @@ func IndexTypedRequest(ctx context.Context) {
return
}

if conf.LOG_PERF {
logger.Perf("END indexing: " + nid)
}

go node.AsyncIndexer(idxType, nid, colNum, ctx)
responder.RespondOK(ctx)

default:
Expand Down
2 changes: 1 addition & 1 deletion shock-server/controller/node/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {
var nodeIds []string
var totalBytes int64
for _, n := range nodes {
if n.HasFile() {
if n.HasFile() && !n.HasFileLock() {
nodeIds = append(nodeIds, n.Id)
totalBytes += n.File.Size
}
Expand Down
Loading

0 comments on commit 480b72a

Please sign in to comment.