Skip to content

Commit

Permalink
Add per-task metrics (JuliaLang#56320)
Browse files Browse the repository at this point in the history
Close JuliaLang#47351 (builds on top of
JuliaLang#48416)

Adds two per-task metrics:
- running time = amount of time the task was actually running (according
to our scheduler). Note: currently inclusive of GC time, but would be
good to be able to separate that out (in a future PR)
- wall time = amount of time between the scheduler becoming aware of
this task and the task entering a terminal state (i.e. done or failed).

We record running time in `wait()`, where the scheduler stops running
the task as well as in `yield(t)`, `yieldto(t)` and `throwto(t)`, which
bypass the scheduler. Other places where a task stops running (for
`Channel`, `ReentrantLock`, `Event`, `Timer` and `Semaphore` are all
implemented in terms of `wait(Condition)`, which in turn calls `wait()`.
`LibuvStream` similarly calls `wait()`.

This should capture everything (albeit, slightly over-counting task CPU
time by including any enqueuing work done before we hit `wait()`).

The various metrics counters could be a separate inlined struct if we
think that's a useful abstraction, but for now i've just put them
directly in `jl_task_t`. They are all atomic, except the
`metrics_enabled` flag itself (which we now have to check on task
start/switch/done even if metrics are not enabled) which is set on task
construction and marked `const` on the julia side.

In future PRs we could add more per-task metrics, e.g. compilation time,
GC time, allocations, potentially a wait-time breakdown (time waiting on
locks, channels, in the scheduler run queue, etc.), potentially the
number of yields.

Perhaps in future there could be ways to enable this on a per-thread and
per-task basis. And potentially in future these same timings could be
used by `@time` (e.g. writing this same timing data to a ScopedValue
like in JuliaLang#55103 but only for tasks
lexically scoped to inside the `@time` block).

Timings are off by default but can be turned on globally via starting
Julia with `--task-metrics=yes` or calling
`Base.Experimental.task_metrics(true)`. Metrics are collected for all
tasks created when metrics are enabled. In other words,
enabling/disabling timings via `Base.Experimental.task_metrics` does not
affect existing `Task`s, only new `Task`s.

The other new APIs are `Base.Experimental.task_running_time_ns(::Task)`
and `Base.Experimental.task_wall_time_ns(::Task)` for retrieving the new
metrics. These are safe to call on any task (including the current task,
or a task running on another thread). All these are in
`Base.Experimental` to give us room to change up the APIs as we add more
metrics in future PRs (without worrying about release timelines).

cc @NHDaly @kpamnany @d-netto

---------

Co-authored-by: Pete Vilter <[email protected]>
Co-authored-by: K Pamnany <[email protected]>
Co-authored-by: Nathan Daly <[email protected]>
Co-authored-by: Valentin Churavy <[email protected]>
  • Loading branch information
5 people committed Dec 7, 2024
1 parent 9344ba8 commit 341e314
Show file tree
Hide file tree
Showing 19 changed files with 564 additions and 37 deletions.
28 changes: 23 additions & 5 deletions base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,33 @@
#end

#mutable struct Task
# parent::Task
# next::Any
# queue::Any
# storage::Any
# state::Symbol
# donenotify::Any
# result::Any
# exception::Any
# backtrace::Any
# logstate::Any
# scope::Any
# code::Any
# @atomic _state::UInt8
# sticky::UInt8
# priority::UInt16
# @atomic _isexception::UInt8
# pad00::UInt8
# pad01::UInt8
# pad02::UInt8
# rngState0::UInt64
# rngState1::UInt64
# rngState2::UInt64
# rngState3::UInt64
# rngState4::UInt64
# const metrics_enabled::Bool
# pad10::UInt8
# pad11::UInt8
# pad12::UInt8
# @atomic first_enqueued_at::UInt64
# @atomic last_started_running_at::UInt64
# @atomic running_time_ns::UInt64
# @atomic finished_at::UInt64
#end

export
Expand Down
83 changes: 83 additions & 0 deletions base/experimental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,87 @@ adding them to the global method table.
"""
:@MethodTable

"""
Base.Experimental.disable_new_worlds()
Mark that no new worlds (methods additions, deletions, etc) are permitted to be created at
any future time, allowing for lower latencies for some operations and slightly lower memory
usage, by eliminating the tracking of those possible invalidation.
"""
disable_new_worlds() = ccall(:jl_disable_new_worlds, Cvoid, ())

### Task metrics

"""
Base.Experimental.task_metrics(::Bool)
Enable or disable the collection of per-task metrics.
A `Task` created when `Base.Experimental.task_metrics(true)` is in effect will have
[`Base.Experimental.task_running_time_ns`](@ref) and [`Base.Experimental.task_wall_time_ns`](@ref)
timing information available.
!!! note
Task metrics can be enabled at start-up via the `--task-metrics=yes` command line option.
"""
function task_metrics(b::Bool)
if b
ccall(:jl_task_metrics_enable, Cvoid, ())
else
ccall(:jl_task_metrics_disable, Cvoid, ())
end
return nothing
end

"""
Base.Experimental.task_running_time_ns(t::Task) -> Union{UInt64, Nothing}
Return the total nanoseconds that the task `t` has spent running.
This metric is only updated when `t` yields or completes unless `t` is the current task, in
which it will be updated continuously.
See also [`Base.Experimental.task_wall_time_ns`](@ref).
Returns `nothing` if task timings are not enabled.
See [`Base.Experimental.task_metrics`](@ref).
!!! note "This metric is from the Julia scheduler"
A task may be running on an OS thread that is descheduled by the OS
scheduler, this time still counts towards the metric.
!!! compat "Julia 1.12"
This method was added in Julia 1.12.
"""
function task_running_time_ns(t::Task=current_task())
t.metrics_enabled || return nothing
if t == current_task()
# These metrics fields can't update while we're running.
# But since we're running we need to include the time since we last started running!
return t.running_time_ns + (time_ns() - t.last_started_running_at)
else
return t.running_time_ns
end
end

"""
Base.Experimental.task_wall_time_ns(t::Task) -> Union{UInt64, Nothing}
Return the total nanoseconds that the task `t` was runnable.
This is the time since the task first entered the run queue until the time at which it
completed, or until the current time if the task has not yet completed.
See also [`Base.Experimental.task_running_time_ns`](@ref).
Returns `nothing` if task timings are not enabled.
See [`Base.Experimental.task_metrics`](@ref).
!!! compat "Julia 1.12"
This method was added in Julia 1.12.
"""
function task_wall_time_ns(t::Task=current_task())
t.metrics_enabled || return nothing
start_at = t.first_enqueued_at
start_at == 0 && return UInt64(0)
end_at = t.finished_at
end_at == 0 && return time_ns() - start_at
return end_at - start_at
end

end # module
1 change: 1 addition & 0 deletions base/options.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct JLOptions
heap_size_hint::UInt64
trace_compile_timing::Int8
safe_crash_log_file::Ptr{UInt8}
task_metrics::Int8
end

# This runs early in the sysimage != is not defined yet
Expand Down
53 changes: 50 additions & 3 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,11 @@ function enq_work(t::Task)
return t
end

schedule(t::Task) = enq_work(t)
function schedule(t::Task)
# [task] created -scheduled-> wait_time
maybe_record_enqueued!(t)
enq_work(t)
end

"""
schedule(t::Task, [val]; error=false)
Expand Down Expand Up @@ -857,6 +861,8 @@ function schedule(t::Task, @nospecialize(arg); error=false)
t.queue === nothing || Base.error("schedule: Task not runnable")
setfield!(t, :result, arg)
end
# [task] created -scheduled-> wait_time
maybe_record_enqueued!(t)
enq_work(t)
return t
end
Expand Down Expand Up @@ -888,9 +894,15 @@ A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
immediately yields to `t` before calling the scheduler.
"""
function yield(t::Task, @nospecialize(x=nothing))
(t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable")
ct = current_task()
t === ct && throw(ConcurrencyViolationError("Cannot yield to currently running task!"))
(t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable"))
# [task] user_time -yield-> wait_time
record_running_time!(ct)
# [task] created -scheduled-> wait_time
maybe_record_enqueued!(t)
t.result = x
enq_work(current_task())
enq_work(ct)
set_next_task(t)
return try_yieldto(ensure_rescheduled)
end
Expand All @@ -904,13 +916,18 @@ call to `yieldto`. This is a low-level call that only switches tasks, not consid
or scheduling in any way. Its use is discouraged.
"""
function yieldto(t::Task, @nospecialize(x=nothing))
ct = current_task()
# TODO: these are legacy behaviors; these should perhaps be a scheduler
# state error instead.
if t._state === task_state_done
return x
elseif t._state === task_state_failed
throw(t.result)
end
# [task] user_time -yield-> wait_time
record_running_time!(ct)
# [task] created -scheduled-unfairly-> wait_time
maybe_record_enqueued!(t)
t.result = x
set_next_task(t)
return try_yieldto(identity)
Expand All @@ -924,6 +941,10 @@ function try_yieldto(undo)
rethrow()
end
ct = current_task()
# [task] wait_time -(re)started-> user_time
if ct.metrics_enabled
@atomic :monotonic ct.last_started_running_at = time_ns()
end
if ct._isexception
exc = ct.result
ct.result = nothing
Expand All @@ -937,6 +958,11 @@ end

# yield to a task, throwing an exception in it
function throwto(t::Task, @nospecialize exc)
ct = current_task()
# [task] user_time -yield-> wait_time
record_running_time!(ct)
# [task] created -scheduled-unfairly-> wait_time
maybe_record_enqueued!(t)
t.result = exc
t._isexception = true
set_next_task(t)
Expand Down Expand Up @@ -989,6 +1015,9 @@ checktaskempty = Partr.multiq_check_empty
end

function wait()
ct = current_task()
# [task] user_time -yield-or-done-> wait_time
record_running_time!(ct)
GC.safepoint()
W = workqueue_for(Threads.threadid())
poptask(W)
Expand All @@ -1003,3 +1032,21 @@ if Sys.iswindows()
else
pause() = ccall(:pause, Cvoid, ())
end

# update the `running_time_ns` field of `t` to include the time since it last started running.
function record_running_time!(t::Task)
if t.metrics_enabled && !istaskdone(t)
@atomic :monotonic t.running_time_ns += time_ns() - t.last_started_running_at
end
return t
end

# if this is the first time `t` has been added to the run queue
# (or the first time it has been unfairly yielded to without being added to the run queue)
# then set the `first_enqueued_at` field to the current time.
function maybe_record_enqueued!(t::Task)
if t.metrics_enabled && t.first_enqueued_at == 0
@atomic :monotonic t.first_enqueued_at = time_ns()
end
return t
end
8 changes: 8 additions & 0 deletions doc/man/julia.1
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,14 @@ Print precompile statements for methods compiled during execution or save to a p
--trace-compile-timing=
If --trace-compile is enabled show how long each took to compile in ms

.TP
--trace-dispatch={stderr|name}
Print precompile statements for methods dispatched during execution or save to stderr or a path.

.TP
--task-metrics={yes|no*}
Enable the collection of per-task metrics.

.TP
-image-codegen
Force generate code in imaging mode
Expand Down
8 changes: 8 additions & 0 deletions doc/src/base/multi-threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,11 @@ These building blocks are used to create the regular synchronization objects.
```@docs
Base.Threads.SpinLock
```

## Task metrics (Experimental)

```@docs
Base.Experimental.task_metrics
Base.Experimental.task_running_time_ns
Base.Experimental.task_wall_time_ns
```
1 change: 1 addition & 0 deletions doc/src/manual/command-line-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ The following is a complete list of command-line switches available when launchi
|`--code-coverage=tracefile.info` |Append coverage information to the LCOV tracefile (filename supports format tokens).|
|`--track-allocation[={none*\|user\|all}]` |Count bytes allocated by each source line (omitting setting is equivalent to "user")|
|`--track-allocation=@<path>` |Count bytes but only in files that fall under the given file path/directory. The `@` prefix is required to select this option. A `@` with no path will track the current directory.|
|`--task-metrics={yes\|no*}` |Enable the collection of per-task metrics|
|`--bug-report=KIND` |Launch a bug report session. It can be used to start a REPL, run a script, or evaluate expressions. It first tries to use BugReporting.jl installed in current environment and falls back to the latest compatible BugReporting.jl if not. For more information, see `--bug-report=help`.|
|`--compile={yes*\|no\|all\|min}` |Enable or disable JIT compiler, or request exhaustive or minimal compilation|
|`--output-o <name>` |Generate an object file (including system image data)|
Expand Down
4 changes: 4 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,10 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel)
#if defined(_COMPILER_GCC_) && __GNUC__ >= 12
#pragma GCC diagnostic ignored "-Wdangling-pointer"
#endif
if (jl_options.task_metrics == JL_OPTIONS_TASK_METRICS_ON) {
// enable before creating the root task so it gets timings too.
jl_atomic_fetch_add(&jl_task_metrics_enabled, 1);
}
// warning: this changes `jl_current_task`, so be careful not to call that from this function
jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi);
#pragma GCC diagnostic pop
Expand Down
30 changes: 30 additions & 0 deletions src/jlapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,36 @@ JL_DLLEXPORT uint64_t jl_cumulative_recompile_time_ns(void)
return jl_atomic_load_relaxed(&jl_cumulative_recompile_time);
}

/**
* @brief Enable per-task timing.
*/
JL_DLLEXPORT void jl_task_metrics_enable(void)
{
// Increment the flag to allow reentrant callers.
jl_atomic_fetch_add(&jl_task_metrics_enabled, 1);
}

/**
* @brief Disable per-task timing.
*/
JL_DLLEXPORT void jl_task_metrics_disable(void)
{
// Prevent decrementing the counter below zero
uint8_t enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled);
while (enabled > 0) {
if (jl_atomic_cmpswap(&jl_task_metrics_enabled, &enabled, enabled-1))
break;
}
}

/**
* @brief Retrieve floating-point environment constants.
*
* Populates an array with constants related to the floating-point environment,
* such as rounding modes and exception flags.
*
* @param ret An array of integers to be populated with floating-point environment constants.
*/
JL_DLLEXPORT void jl_get_fenv_consts(int *ret)
{
ret[0] = FE_INEXACT;
Expand Down
Loading

0 comments on commit 341e314

Please sign in to comment.