Skip to content

Commit

Permalink
Merge pull request #785 from JuliaAI/stack_cache_and_acceleration_reb…
Browse files Browse the repository at this point in the history
…ased

Stack cache and acceleration (rebased)
  • Loading branch information
ablaom authored Jun 14, 2022
2 parents adb341f + e02f6af commit 1b17082
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 51 deletions.
16 changes: 1 addition & 15 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,4 @@ ScientificTypes = "3"
StatisticalTraits = "3"
StatsBase = "0.32, 0.33"
Tables = "0.2, 1.0"
julia = "1.6"

[extras]
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DecisionTree = "7806a523-6efd-50cb-b5f6-3fa6f1930dbb"
Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
MultivariateStats = "6f286f6a-111f-5878-ab1e-185364afe411"
NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9"

[targets]
test = ["DataFrames", "DecisionTree", "Distances", "Logging", "MultivariateStats", "NearestNeighbors", "StableRNGs", "Test", "TypedTables"]
julia = "1.6"
9 changes: 5 additions & 4 deletions src/composition/learning_networks/machines.jl
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ end

"""
return!(mach::Machine{<:Surrogate}, model, verbosity)
return!(mach::Machine{<:Surrogate}, model, verbosity; acceleration=CPU1())
The last call in custom code defining the `MLJBase.fit` method for a
new composite model type. Here `model` is the instance of the new type
Expand All @@ -345,7 +345,7 @@ the following:
handles smart updating (namely, an `MLJBase.update` fallback for
composite models).
- Calls `fit!(mach, verbosity=verbosity)`.
- Calls `fit!(mach, verbosity=verbosity, acceleration=acceleration)`.
- Moves any data in source nodes of the learning network into `cache`
(for data-anonymization purposes).
Expand Down Expand Up @@ -388,11 +388,12 @@ end
"""
function return!(mach::Machine{<:Surrogate},
model::Union{Model,Nothing},
verbosity)
verbosity;
acceleration=CPU1())

network_model_names_ = network_model_names(model, mach)

verbosity isa Nothing || fit!(mach, verbosity=verbosity)
verbosity isa Nothing || fit!(mach, verbosity=verbosity, acceleration=acceleration)
setfield!(mach.fitresult, :network_model_names, network_model_names_)

# anonymize the data
Expand Down
25 changes: 23 additions & 2 deletions src/composition/learning_networks/nodes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,17 @@ end
acceleration=CPU1())
Train all machines required to call the node `N`, in an appropriate
order. These machines are those returned by `machines(N)`.
order, but parallelizing where possible using specified `acceleration`
mode. These machines are those returned by `machines(N)`.
Supported modes of `acceleration`: `CPU1()`, `CPUThreads()`.
"""
fit!(y::Node; acceleration=CPU1(), kwargs...) =
fit!(y::Node, acceleration; kwargs...)

fit!(y::Node, ::AbstractResource; kwargs...) =
error("Only `acceleration=CPU1()` currently supported")
error("Only `acceleration=CPU1()` and `acceleration=CPUThreads()` currently supported")

function fit!(y::Node, ::CPU1; kwargs...)

Expand All @@ -230,6 +233,24 @@ function fit!(y::Node, ::CPU1; kwargs...)

return y
end

function fit!(y::Node, ::CPUThreads; kwargs...)
_machines = machines(y)

# flush the fit_okay channels:
for mach in _machines
flush!(mach.fit_okay)
end

# fit the machines in Multithreading mode
@sync for mach in _machines
Threads.@spawn fit_only!(mach, true; kwargs...)
end

return y

end

fit!(S::Source; args...) = S

# allow arguments of `Nodes` and `Machine`s to appear
Expand Down
60 changes: 37 additions & 23 deletions src/composition/models/stacking.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ mutable struct DeterministicStack{modelnames, inp_scitype, tg_scitype} <: Determ
metalearner::Deterministic
resampling
measures::Union{Nothing,AbstractVector}
function DeterministicStack(modelnames, models, metalearner, resampling, measures)
cache::Bool
acceleration::AbstractResource
function DeterministicStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
inp_scitype, tg_scitype = input_target_scitypes(models, metalearner)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures, cache, acceleration)
end
end

Expand All @@ -42,9 +44,11 @@ mutable struct ProbabilisticStack{modelnames, inp_scitype, tg_scitype} <: Probab
metalearner::Probabilistic
resampling
measures::Union{Nothing,AbstractVector}
function ProbabilisticStack(modelnames, models, metalearner, resampling, measures)
cache::Bool
acceleration::AbstractResource
function ProbabilisticStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
inp_scitype, tg_scitype = input_target_scitypes(models, metalearner)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures)
return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures, cache, acceleration)
end
end

Expand All @@ -54,7 +58,7 @@ const Stack{modelnames, inp_scitype, tg_scitype} =
ProbabilisticStack{modelnames, inp_scitype, tg_scitype}}

"""
Stack(;metalearner=nothing, resampling=CV(), name1=model1, name2=model2, ...)
Stack(; metalearner=nothing, name1=model1, name2=model2, ..., keyword_options...)
Implements the two-layer generalized stack algorithm introduced by
[Wolpert
Expand Down Expand Up @@ -89,12 +93,17 @@ When training a machine bound to such an instance:
model will optimize the squared error.
- `resampling`: The resampling strategy used
to prepare out-of-sample predictions of the base learners.
to prepare out-of-sample predictions of the base learners.
- `measures`: A measure or iterable over measures, to perform an internal
- `measures`: A measure or iterable over measures, to perform an internal
evaluation of the learners in the Stack while training. This is not for the
evaluation of the Stack itself.
- `cache`: Whether machines created in the learning network will cache data or not.
- `acceleration`: A supported `AbstractResource` to define the training parallelization
mode of the stack.
- `name1=model1, name2=model2, ...`: the `Supervised` model instances
to be used as base learners. The provided names become properties
of the instance created to allow hyper-parameter access
Expand Down Expand Up @@ -139,15 +148,15 @@ evaluate!(mach; resampling=Holdout(), measure=rmse)
```
The internal evaluation report can be accessed like this
The internal evaluation report can be accessed like this
and provides a PerformanceEvaluation object for each model:
```julia
report(mach).cv_report
```
"""
function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=measure, named_models...)
function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=measure, cache=true, acceleration=CPU1(), named_models...)
metalearner === nothing &&
throw(ArgumentError("No metalearner specified. Use Stack(metalearner=...)"))

Expand All @@ -159,9 +168,9 @@ function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=
end

if metalearner isa Deterministic
stack = DeterministicStack(modelnames, models, metalearner, resampling, measures)
stack = DeterministicStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
elseif metalearner isa Probabilistic
stack = ProbabilisticStack(modelnames, models, metalearner, resampling, measures)
stack = ProbabilisticStack(modelnames, models, metalearner, resampling, measures, cache, acceleration)
else
throw(ArgumentError("The metalearner should be a subtype
of $(Union{Deterministic, Probabilistic})"))
Expand Down Expand Up @@ -202,13 +211,16 @@ function MMI.clean!(stack::Stack{modelnames, inp_scitype, tg_scitype}) where {mo
end


Base.propertynames(::Stack{modelnames}) where modelnames = tuple(:resampling, :metalearner, modelnames...)
Base.propertynames(::Stack{modelnames}) where modelnames =
tuple(:metalearner, :resampling, :measures, :cache, :acceleration, modelnames...)


function Base.getproperty(stack::Stack{modelnames}, name::Symbol) where modelnames
name === :metalearner && return getfield(stack, :metalearner)
name === :resampling && return getfield(stack, :resampling)
name == :measures && return getfield(stack, :measures)
name === :cache && return getfield(stack, :cache)
name == :acceleration && return getfield(stack, :acceleration)
models = getfield(stack, :models)
for j in eachindex(modelnames)
name === modelnames[j] && return models[j]
Expand All @@ -221,6 +233,8 @@ function Base.setproperty!(stack::Stack{modelnames}, _name::Symbol, val) where m
_name === :metalearner && return setfield!(stack, :metalearner, val)
_name === :resampling && return setfield!(stack, :resampling, val)
_name === :measures && return setfield!(stack, :measures, val)
_name === :cache && return setfield!(stack, :cache, val)
_name === :acceleration && return setfield!(stack, :acceleration, val)
idx = findfirst(==(_name), modelnames)
idx isa Nothing || return getfield(stack, :models)[idx] = val
error("type Stack has no property $name")
Expand Down Expand Up @@ -272,7 +286,7 @@ internal_stack_report(m::Stack, verbosity::Int, tt_pairs, folds_evaluations::Var
"""
internal_stack_report(m::Stack, verbosity::Int, y::AbstractNode, folds_evaluations::Vararg{AbstractNode})
When measure/measures is provided, the folds_evaluation will have been filled by `store_for_evaluation`. This function is
When measure/measures is provided, the folds_evaluation will have been filled by `store_for_evaluation`. This function is
not doing any heavy work (not constructing nodes corresponding to measures) but just unpacking all the folds_evaluations in a single node that
can be evaluated later.
"""
Expand Down Expand Up @@ -304,10 +318,10 @@ function internal_stack_report(stack::Stack{modelnames,}, verbosity::Int, tt_pai
fitted_params_per_fold=[],
report_per_fold=[],
train_test_pairs=tt_pairs
)
)
for model in getfield(stack, :models)]
)

# Update the results
index = 1
for foldid in 1:nfolds
Expand All @@ -330,7 +344,7 @@ function internal_stack_report(stack::Stack{modelnames,}, verbosity::Int, tt_pai
end

# Update per_fold
model_results.per_fold[i][foldid] =
model_results.per_fold[i][foldid] =
reports_each_observation(measure) ? MLJBase.aggregate(loss, measure) : loss
end
index += 1
Expand Down Expand Up @@ -366,7 +380,7 @@ end
oos_set(m::Stack, folds::AbstractNode, Xs::Source, ys::Source)
This function is building the out-of-sample dataset that is later used by the `judge`
for its own training. It also returns the folds_evaluations object if internal
for its own training. It also returns the folds_evaluations object if internal
cross-validation results are requested.
"""
function oos_set(m::Stack, Xs::Source, ys::Source, tt_pairs)
Expand All @@ -384,7 +398,7 @@ function oos_set(m::Stack, Xs::Source, ys::Source, tt_pairs)
# predictions are subsequently used as an input to the metalearner
Zfold = []
for model in getfield(m, :models)
mach = machine(model, Xtrain, ytrain)
mach = machine(model, Xtrain, ytrain, cache=m.cache)
ypred = predict(mach, Xtest)
# Internal evaluation on the fold if required
push!(folds_evaluations, store_for_evaluation(mach, Xtest, ytest, m.measures))
Expand Down Expand Up @@ -417,15 +431,15 @@ function fit(m::Stack, verbosity::Int, X, y)

Xs = source(X)
ys = source(y)

Zval, yval, folds_evaluations = oos_set(m, Xs, ys, tt_pairs)

metamach = machine(m.metalearner, Zval, yval)
metamach = machine(m.metalearner, Zval, yval, cache=m.cache)

# Each model is retrained on the original full training set
Zpred = []
for model in getfield(m, :models)
mach = machine(model, Xs, ys)
mach = machine(model, Xs, ys, cache=m.cache)
ypred = predict(mach, Xs)
ypred = pre_judge_transform(ypred, typeof(model), target_scitype(model))
push!(Zpred, ypred)
Expand All @@ -438,6 +452,6 @@ function fit(m::Stack, verbosity::Int, X, y)

# We can infer the Surrogate by two calls to supertype
mach = machine(supertype(supertype(typeof(m)))(), Xs, ys; predict=ŷ, internal_report...)
return!(mach, m, verbosity)

return!(mach, m, verbosity, acceleration=m.acceleration)
end
26 changes: 26 additions & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[deps]
CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597"
ComputationalResources = "ed09eef8-17a6-5b46-8889-db040fac31e3"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DecisionTree = "7806a523-6efd-50cb-b5f6-3fa6f1930dbb"
Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
LossFunctions = "30fc2ffe-d236-52d8-8643-a9d8f7c094a7"
MLJModelInterface = "e80e1ace-859a-464e-9ed9-23947d8ae3ea"
MultivariateStats = "6f286f6a-111f-5878-ab1e-185364afe411"
NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce"
OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScientificTypes = "321657f4-b219-11e9-178b-2701a2544e81"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9"
8 changes: 4 additions & 4 deletions test/composition/learning_networks/machines.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ zhat = inverse_transform(standM, uhat)
yhat = exp(zhat)
enode = @node mae(ys, yhat)

@testset "replace method for learning network machines" begin
@testset "replace method for learning network machines, acceleration: $(typeof(accel))" for accel in (CPU1(), CPUThreads())

fit!(yhat, verbosity=0)
fit!(yhat, verbosity=0, acceleration=accel)

# test nested reporting:
r = MLJBase.report(yhat)
Expand Down Expand Up @@ -199,7 +199,7 @@ enode = @node mae(ys, yhat)
knnM2 = machines(yhat2, knn) |> first
hotM2 = machines(yhat2, hot) |> first

@test_mach_sequence(fit!(yhat2, force=true),
@test_mach_sequence(fit!(yhat2, force=true, acceleration=accel),
[(:train, standM2), (:train, hotM2),
(:train, knnM2), (:train, oakM2)],
[(:train, hotM2), (:train, standM2),
Expand All @@ -218,7 +218,7 @@ enode = @node mae(ys, yhat)
# this change should trigger retraining of all machines except the
# univariate standardizer:
hot2.drop_last = true
@test_mach_sequence(fit!(yhat2),
@test_mach_sequence(fit!(yhat2, acceleration=accel),
[(:skip, standM2), (:update, hotM2),
(:train, knnM2), (:train, oakM2)],
[(:update, hotM2), (:skip, standM2),
Expand Down
Loading

0 comments on commit 1b17082

Please sign in to comment.