Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/dependency #145

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/calloc/CmdArgParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
FlagExcludes string
FlagGetUserEnv bool
FlagExport string
FlagDependency string

FlagConfigFilePath string
FlagDebugLevel string
Expand Down Expand Up @@ -85,4 +86,5 @@ func init() {
RootCmd.Flags().StringVarP(&FlagExcludes, "exclude", "x", "", "Exclude specific nodes from allocating (commas separated list)")
RootCmd.Flags().BoolVar(&FlagGetUserEnv, "get-user-env", false, "Load login environment variables of the user")
RootCmd.Flags().StringVar(&FlagExport, "export", "", "Propagate environment variables")
RootCmd.Flags().StringVarP(&FlagDependency, "dependency", "d", "", "Conditions for job to execute")
}
8 changes: 8 additions & 0 deletions internal/calloc/calloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,14 @@ func MainCalloc(cmd *cobra.Command, args []string) util.CraneCmdError {
task.Env["CRANE_EXPORT_ENV"] = FlagExport
}
util.SetPropagatedEnviron(task)

if FlagDependency != "" {
err := util.SetTaskDependencies(task, FlagDependency)
if err != nil {
log.Fatal(err)
}
}

task.Resources.AllocatableRes.CpuCoreLimit = task.CpusPerTask * float64(task.NtasksPerNode)
if task.Resources.AllocatableRes.CpuCoreLimit > 1e6 {
log.Errorf("Request too many CPUs: %v", task.Resources.AllocatableRes.CpuCoreLimit)
Expand Down
4 changes: 2 additions & 2 deletions internal/cbatch/CmdArgParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (

FlagConfigFilePath string
FlagJson bool
FlagDependency string

RootCmd = &cobra.Command{
Use: "cbatch [flags] file",
Expand Down Expand Up @@ -130,7 +131,6 @@ func init() {
RootCmd.Flags().StringVarP(&FlagStdoutPath, "output", "o", "", "Redirection path of standard output of the script")
RootCmd.Flags().StringVarP(&FlagStderrPath, "error", "e", "", "Redirection path of standard error of the script")
RootCmd.Flags().StringVar(&FlagExtraAttr, "extra-attr", "", "Extra attributes of the job (in JSON format)")
RootCmd.Flags().StringVar(&FlagMailType, "mail-type", "", "Notify user by mail when certain events occur, supported values: NONE, BEGIN, END, FAIL, ALL (default is NONE)")
RootCmd.Flags().StringVar(&FlagMailUser, "mail-user", "", "Mail address of the notification receiver")
RootCmd.Flags().BoolVar(&FlagJson, "json", false, "Output in JSON format")
RootCmd.Flags().StringVarP(&FlagDependency, "dependency", "d", "", "Conditions for job to execute")
}
27 changes: 27 additions & 0 deletions internal/cbatch/cbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func ProcessCbatchArgs(cmd *cobra.Command, args []CbatchArg) (bool, *protos.Task
return false, nil
}
task.ExtraAttr = extra
case "--dependency", "-d":
err := util.SetTaskDependencies(task, arg.val)
if err != nil {
log.Error(err)
return false, nil
}
default:
log.Errorf("Invalid parameter '%s' given in the script file.\n", arg.name)
return false, nil
Expand Down Expand Up @@ -252,6 +258,13 @@ func ProcessCbatchArgs(cmd *cobra.Command, args []CbatchArg) (bool, *protos.Task
}
task.ExtraAttr = extra
}
if FlagDependency != "" {
err := util.SetTaskDependencies(task, FlagDependency)
if err != nil {
log.Error(err)
return false, nil
}
}

// Check the validity of the parameters

Expand Down Expand Up @@ -300,6 +313,20 @@ func ProcessCbatchArgs(cmd *cobra.Command, args []CbatchArg) (bool, *protos.Task
return false, nil
}
}
if task.Dependencies != nil {
taskIds := make(map[uint32]bool)
for _, dep := range task.Dependencies.Dependencies {
if taskIds[dep.TaskId] {
log.Errorf("Duplicate task #%d in dependencies\n", dep.TaskId)
return false, nil
}
taskIds[dep.TaskId] = true
}
}
if len(task.Name) > 30 {
task.Name = task.Name[:30]
log.Warnf("Job name exceeds 30 characters, trimmed to %v.\n", task.Name)
}

task.Resources.AllocatableRes.CpuCoreLimit = task.CpusPerTask * float64(task.NtasksPerNode)
if task.Resources.AllocatableRes.CpuCoreLimit > 1e6 {
Expand Down
2 changes: 2 additions & 0 deletions internal/crun/CmdArgParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
FlagGetUserEnv bool
FlagExport string
FlagGres string
FlagDependency string

FlagConfigFilePath string
FlagDebugLevel string
Expand Down Expand Up @@ -84,4 +85,5 @@ func init() {
RootCmd.Flags().StringVarP(&FlagExcludes, "exclude", "x", "", "Exclude specific nodes from allocating (commas separated list)")
RootCmd.Flags().BoolVar(&FlagGetUserEnv, "get-user-env", false, "Load login environment variables of the user")
RootCmd.Flags().StringVar(&FlagExport, "export", "", "Propagate environment variables")
RootCmd.Flags().StringVarP(&FlagDependency, "dependency", "d", "", "Conditions for job to execute")
}
8 changes: 8 additions & 0 deletions internal/crun/crun.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,14 @@ func MainCrun(cmd *cobra.Command, args []string) util.CraneCmdError {
task.Env["CRANE_EXPORT_ENV"] = FlagExport
}
util.SetPropagatedEnviron(task)

if FlagDependency != "" {
err := util.SetTaskDependencies(task, FlagDependency)
if err != nil {
log.Fatal(err)
}
}

task.Resources.AllocatableRes.CpuCoreLimit = task.CpusPerTask * float64(task.NtasksPerNode)
if task.Resources.AllocatableRes.CpuCoreLimit > 1e6 {
log.Errorf("Request too many cpus: %f", task.Resources.AllocatableRes.CpuCoreLimit)
Expand Down
55 changes: 55 additions & 0 deletions internal/util/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,58 @@ func ParseGres(gres string) *protos.DeviceMap {

return result
}

func SetTaskDependencies(task *protos.TaskToCtld, depStr string) error {
if strings.Contains(depStr, ",") && strings.Contains(depStr, "?") {
return fmt.Errorf("cannot use both ',' and '?' in the dependency string")
}
sep := ","
if strings.Contains(depStr, "?") {
sep = "?"
}
depend_all := (sep == ",")
if task.Dependencies != nil && depend_all != task.Dependencies.DependAll {
return fmt.Errorf("cannot merge dependency with different dependency types(, and ?)")
}
if task.Dependencies == nil {
task.Dependencies = &protos.Dependencies{
DependAll: depend_all,
}
}

depStr = strings.TrimSpace(depStr)
depStrList := strings.Split(depStr, sep)
for _, subDepStr := range depStrList {
dependencies := strings.Split(subDepStr, ":")
if len(dependencies) < 2 {
return fmt.Errorf("unrecognized dependency string: %s", subDepStr)
}
condition := new(protos.DependencyType)
switch dependencies[0] {
case "after":
*condition = protos.DependencyType_AFTER
case "afterok":
*condition = protos.DependencyType_AFTER_OK
case "afternotok":
*condition = protos.DependencyType_AFTER_NOT_OK
case "afterany":
*condition = protos.DependencyType_AFTER_ANY
default:
return fmt.Errorf("unrecognized dependency type: %s", dependencies[0])
}
for _, dep := range dependencies[1:] {
taskId, err := strconv.ParseUint(dep, 10, 32)
if err != nil {
return fmt.Errorf("invalid task id: %s", dep)
}
task.Dependencies.Dependencies = append(task.Dependencies.Dependencies, &protos.DependencyCondition{
TaskId: uint32(taskId),
Type: *condition,
})
}
}
if len(task.Dependencies.Dependencies) > 50 {
return fmt.Errorf("dependency count should be no more than 50")
}
return nil
}
9 changes: 4 additions & 5 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,11 @@ message StreamCtldReply {
}
}

message QueryActualGresRequest{
}
message QueryActualDresRequest{}

message QueryActualGresReply{
message QueryActualDresReply{
bool ok = 1;
DedicatedResource dedicated_resource = 2;
DedicatedResourceInNode dres_in_node = 2;
}

message StreamCrunRequest{
Expand Down Expand Up @@ -740,7 +739,7 @@ service Craned {
rpc CreateCgroupForTasks(CreateCgroupForTasksRequest) returns(CreateCgroupForTasksReply);
rpc ReleaseCgroupForTasks(ReleaseCgroupForTasksRequest) returns(ReleaseCgroupForTasksReply);

rpc QueryActualGres(QueryActualGresRequest) returns(QueryActualGresReply);
rpc QueryActualDres(QueryActualDresRequest) returns(QueryActualDresReply);
/*
If the task is an interactive task, the resource uuid is also revoked.
If there's no process in this interactive task, just deallocate all the resources.
Expand Down
51 changes: 27 additions & 24 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,11 @@ message ResourceV2 {
map <string /*craned id*/, ResourceInNode> each_node_res = 1;
}

// Old implementation
message DedicatedResource {
map <string /*craned id*/, DedicatedResourceInNode> each_node_gres = 1;
}

message ResourceView {
AllocatableResource allocatable_res = 1;
DeviceMap device_map = 2;
}

// Old implementation
message Resources {
AllocatableResource allocatable_resource = 1;

oneof dedicated_resource {
// After the task is scheduled, some missing device
// specification is fulfilled by the scheduler and
// a detailed specification of dedicated resources is generated.
// Such a detailed form is sent to Craned for actual device allocation.
DedicatedResource actual_dedicated_resource = 2;

// The form of dedicated resource presented by user
// is abstracted by DeviceMap.
// DeviceMap is used between front end and CraneCtld.
DeviceMap dedicated_resource_req = 3;
}
}

enum PartitionState {
PARTITION_UP = 0;
PARTITION_DOWN = 1;
Expand Down Expand Up @@ -132,6 +109,23 @@ enum InteractiveTaskType {
Crun = 1;
}

enum DependencyType {
AFTER = 0;
AFTER_ANY = 1;
AFTER_OK = 2;
AFTER_NOT_OK = 3;
}

message DependencyCondition {
uint32 task_id = 1;
DependencyType type = 2;
}

message Dependencies{
repeated DependencyCondition dependencies = 1;
bool depend_all = 2;
}

message TaskToCtld {
/* -------- Fields that are set at the submission time. ------- */
google.protobuf.Duration time_limit = 1;
Expand All @@ -151,6 +145,8 @@ message TaskToCtld {

bool requeue_if_failed = 12;
bool get_user_env = 13;

Dependencies dependencies = 14;

oneof payload {
BatchTaskAdditionalMeta batch_meta = 21;
Expand Down Expand Up @@ -183,13 +179,19 @@ message RuntimeAttrOfTask {
int32 requeue_count = 11;
repeated string craned_ids = 12;
TaskStatus status = 13;
uint32 exit_code = 14;
uint32 exit_code = 14;;

google.protobuf.Timestamp submit_time = 15;
google.protobuf.Timestamp start_time = 16;
google.protobuf.Timestamp end_time = 17;

bool held = 18;
ResourceV2 resources = 19;
bool dependency_ok = 20;
// If this task depends all dependencies, store satisfied dependencies.
// If this task depends any dependency, store unsatisfied dependencies.
// TaskId must be stored in order to restore.
repeated uint32 dependency_ids = 21;
}

message TaskToD {
Expand Down Expand Up @@ -271,6 +273,7 @@ message TaskInfo {
string extra_attr = 20;

// Dynamic task information
uint32 dependency_state = 29;
bool held = 30;
TaskStatus status = 31;

Expand Down