diff --git a/Project.toml b/Project.toml index 10b50a3d..31580fcb 100644 --- a/Project.toml +++ b/Project.toml @@ -46,18 +46,4 @@ ScientificTypes = "3" StatisticalTraits = "3" StatsBase = "0.32, 0.33" Tables = "0.2, 1.0" -julia = "1.6" - -[extras] -DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" -DecisionTree = "7806a523-6efd-50cb-b5f6-3fa6f1930dbb" -Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7" -Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" -MultivariateStats = "6f286f6a-111f-5878-ab1e-185364afe411" -NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce" -StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3" -Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" -TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9" - -[targets] -test = ["DataFrames", "DecisionTree", "Distances", "Logging", "MultivariateStats", "NearestNeighbors", "StableRNGs", "Test", "TypedTables"] +julia = "1.6" \ No newline at end of file diff --git a/src/composition/learning_networks/machines.jl b/src/composition/learning_networks/machines.jl index 26c4a2a1..8e15d86d 100644 --- a/src/composition/learning_networks/machines.jl +++ b/src/composition/learning_networks/machines.jl @@ -328,7 +328,7 @@ end """ - return!(mach::Machine{<:Surrogate}, model, verbosity) + return!(mach::Machine{<:Surrogate}, model, verbosity; acceleration=CPU1()) The last call in custom code defining the `MLJBase.fit` method for a new composite model type. Here `model` is the instance of the new type @@ -345,7 +345,7 @@ the following: handles smart updating (namely, an `MLJBase.update` fallback for composite models). -- Calls `fit!(mach, verbosity=verbosity)`. +- Calls `fit!(mach, verbosity=verbosity, acceleration=acceleration)`. - Moves any data in source nodes of the learning network into `cache` (for data-anonymization purposes). @@ -388,11 +388,12 @@ end """ function return!(mach::Machine{<:Surrogate}, model::Union{Model,Nothing}, - verbosity) + verbosity; + acceleration=CPU1()) network_model_names_ = network_model_names(model, mach) - verbosity isa Nothing || fit!(mach, verbosity=verbosity) + verbosity isa Nothing || fit!(mach, verbosity=verbosity, acceleration=acceleration) setfield!(mach.fitresult, :network_model_names, network_model_names_) # anonymize the data diff --git a/src/composition/learning_networks/nodes.jl b/src/composition/learning_networks/nodes.jl index c6690b6a..ab4d4601 100644 --- a/src/composition/learning_networks/nodes.jl +++ b/src/composition/learning_networks/nodes.jl @@ -201,14 +201,17 @@ end acceleration=CPU1()) Train all machines required to call the node `N`, in an appropriate -order. These machines are those returned by `machines(N)`. +order, but parallelizing where possible using specified `acceleration` +mode. These machines are those returned by `machines(N)`. + +Supported modes of `acceleration`: `CPU1()`, `CPUThreads()`. """ fit!(y::Node; acceleration=CPU1(), kwargs...) = fit!(y::Node, acceleration; kwargs...) fit!(y::Node, ::AbstractResource; kwargs...) = - error("Only `acceleration=CPU1()` currently supported") + error("Only `acceleration=CPU1()` and `acceleration=CPUThreads()` currently supported") function fit!(y::Node, ::CPU1; kwargs...) @@ -230,6 +233,24 @@ function fit!(y::Node, ::CPU1; kwargs...) return y end + +function fit!(y::Node, ::CPUThreads; kwargs...) + _machines = machines(y) + + # flush the fit_okay channels: + for mach in _machines + flush!(mach.fit_okay) + end + + # fit the machines in Multithreading mode + @sync for mach in _machines + Threads.@spawn fit_only!(mach, true; kwargs...) + end + + return y + +end + fit!(S::Source; args...) = S # allow arguments of `Nodes` and `Machine`s to appear diff --git a/src/composition/models/stacking.jl b/src/composition/models/stacking.jl index 6221aabf..c4d8aac3 100644 --- a/src/composition/models/stacking.jl +++ b/src/composition/models/stacking.jl @@ -31,9 +31,11 @@ mutable struct DeterministicStack{modelnames, inp_scitype, tg_scitype} <: Determ metalearner::Deterministic resampling measures::Union{Nothing,AbstractVector} - function DeterministicStack(modelnames, models, metalearner, resampling, measures) + cache::Bool + acceleration::AbstractResource + function DeterministicStack(modelnames, models, metalearner, resampling, measures, cache, acceleration) inp_scitype, tg_scitype = input_target_scitypes(models, metalearner) - return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures) + return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures, cache, acceleration) end end @@ -42,9 +44,11 @@ mutable struct ProbabilisticStack{modelnames, inp_scitype, tg_scitype} <: Probab metalearner::Probabilistic resampling measures::Union{Nothing,AbstractVector} - function ProbabilisticStack(modelnames, models, metalearner, resampling, measures) + cache::Bool + acceleration::AbstractResource + function ProbabilisticStack(modelnames, models, metalearner, resampling, measures, cache, acceleration) inp_scitype, tg_scitype = input_target_scitypes(models, metalearner) - return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures) + return new{modelnames, inp_scitype, tg_scitype}(models, metalearner, resampling, measures, cache, acceleration) end end @@ -54,7 +58,7 @@ const Stack{modelnames, inp_scitype, tg_scitype} = ProbabilisticStack{modelnames, inp_scitype, tg_scitype}} """ - Stack(;metalearner=nothing, resampling=CV(), name1=model1, name2=model2, ...) + Stack(; metalearner=nothing, name1=model1, name2=model2, ..., keyword_options...) Implements the two-layer generalized stack algorithm introduced by [Wolpert @@ -89,12 +93,17 @@ When training a machine bound to such an instance: model will optimize the squared error. - `resampling`: The resampling strategy used - to prepare out-of-sample predictions of the base learners. + to prepare out-of-sample predictions of the base learners. -- `measures`: A measure or iterable over measures, to perform an internal +- `measures`: A measure or iterable over measures, to perform an internal evaluation of the learners in the Stack while training. This is not for the evaluation of the Stack itself. +- `cache`: Whether machines created in the learning network will cache data or not. + +- `acceleration`: A supported `AbstractResource` to define the training parallelization + mode of the stack. + - `name1=model1, name2=model2, ...`: the `Supervised` model instances to be used as base learners. The provided names become properties of the instance created to allow hyper-parameter access @@ -139,7 +148,7 @@ evaluate!(mach; resampling=Holdout(), measure=rmse) ``` -The internal evaluation report can be accessed like this +The internal evaluation report can be accessed like this and provides a PerformanceEvaluation object for each model: ```julia @@ -147,7 +156,7 @@ report(mach).cv_report ``` """ -function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=measure, named_models...) +function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures=measure, cache=true, acceleration=CPU1(), named_models...) metalearner === nothing && throw(ArgumentError("No metalearner specified. Use Stack(metalearner=...)")) @@ -159,9 +168,9 @@ function Stack(;metalearner=nothing, resampling=CV(), measure=nothing, measures= end if metalearner isa Deterministic - stack = DeterministicStack(modelnames, models, metalearner, resampling, measures) + stack = DeterministicStack(modelnames, models, metalearner, resampling, measures, cache, acceleration) elseif metalearner isa Probabilistic - stack = ProbabilisticStack(modelnames, models, metalearner, resampling, measures) + stack = ProbabilisticStack(modelnames, models, metalearner, resampling, measures, cache, acceleration) else throw(ArgumentError("The metalearner should be a subtype of $(Union{Deterministic, Probabilistic})")) @@ -202,13 +211,16 @@ function MMI.clean!(stack::Stack{modelnames, inp_scitype, tg_scitype}) where {mo end -Base.propertynames(::Stack{modelnames}) where modelnames = tuple(:resampling, :metalearner, modelnames...) +Base.propertynames(::Stack{modelnames}) where modelnames = + tuple(:metalearner, :resampling, :measures, :cache, :acceleration, modelnames...) function Base.getproperty(stack::Stack{modelnames}, name::Symbol) where modelnames name === :metalearner && return getfield(stack, :metalearner) name === :resampling && return getfield(stack, :resampling) name == :measures && return getfield(stack, :measures) + name === :cache && return getfield(stack, :cache) + name == :acceleration && return getfield(stack, :acceleration) models = getfield(stack, :models) for j in eachindex(modelnames) name === modelnames[j] && return models[j] @@ -221,6 +233,8 @@ function Base.setproperty!(stack::Stack{modelnames}, _name::Symbol, val) where m _name === :metalearner && return setfield!(stack, :metalearner, val) _name === :resampling && return setfield!(stack, :resampling, val) _name === :measures && return setfield!(stack, :measures, val) + _name === :cache && return setfield!(stack, :cache, val) + _name === :acceleration && return setfield!(stack, :acceleration, val) idx = findfirst(==(_name), modelnames) idx isa Nothing || return getfield(stack, :models)[idx] = val error("type Stack has no property $name") @@ -272,7 +286,7 @@ internal_stack_report(m::Stack, verbosity::Int, tt_pairs, folds_evaluations::Var """ internal_stack_report(m::Stack, verbosity::Int, y::AbstractNode, folds_evaluations::Vararg{AbstractNode}) -When measure/measures is provided, the folds_evaluation will have been filled by `store_for_evaluation`. This function is +When measure/measures is provided, the folds_evaluation will have been filled by `store_for_evaluation`. This function is not doing any heavy work (not constructing nodes corresponding to measures) but just unpacking all the folds_evaluations in a single node that can be evaluated later. """ @@ -304,10 +318,10 @@ function internal_stack_report(stack::Stack{modelnames,}, verbosity::Int, tt_pai fitted_params_per_fold=[], report_per_fold=[], train_test_pairs=tt_pairs - ) + ) for model in getfield(stack, :models)] ) - + # Update the results index = 1 for foldid in 1:nfolds @@ -330,7 +344,7 @@ function internal_stack_report(stack::Stack{modelnames,}, verbosity::Int, tt_pai end # Update per_fold - model_results.per_fold[i][foldid] = + model_results.per_fold[i][foldid] = reports_each_observation(measure) ? MLJBase.aggregate(loss, measure) : loss end index += 1 @@ -366,7 +380,7 @@ end oos_set(m::Stack, folds::AbstractNode, Xs::Source, ys::Source) This function is building the out-of-sample dataset that is later used by the `judge` -for its own training. It also returns the folds_evaluations object if internal +for its own training. It also returns the folds_evaluations object if internal cross-validation results are requested. """ function oos_set(m::Stack, Xs::Source, ys::Source, tt_pairs) @@ -384,7 +398,7 @@ function oos_set(m::Stack, Xs::Source, ys::Source, tt_pairs) # predictions are subsequently used as an input to the metalearner Zfold = [] for model in getfield(m, :models) - mach = machine(model, Xtrain, ytrain) + mach = machine(model, Xtrain, ytrain, cache=m.cache) ypred = predict(mach, Xtest) # Internal evaluation on the fold if required push!(folds_evaluations, store_for_evaluation(mach, Xtest, ytest, m.measures)) @@ -417,15 +431,15 @@ function fit(m::Stack, verbosity::Int, X, y) Xs = source(X) ys = source(y) - + Zval, yval, folds_evaluations = oos_set(m, Xs, ys, tt_pairs) - metamach = machine(m.metalearner, Zval, yval) + metamach = machine(m.metalearner, Zval, yval, cache=m.cache) # Each model is retrained on the original full training set Zpred = [] for model in getfield(m, :models) - mach = machine(model, Xs, ys) + mach = machine(model, Xs, ys, cache=m.cache) ypred = predict(mach, Xs) ypred = pre_judge_transform(ypred, typeof(model), target_scitype(model)) push!(Zpred, ypred) @@ -438,6 +452,6 @@ function fit(m::Stack, verbosity::Int, X, y) # We can infer the Surrogate by two calls to supertype mach = machine(supertype(supertype(typeof(m)))(), Xs, ys; predict=ลท, internal_report...) - - return!(mach, m, verbosity) + + return!(mach, m, verbosity, acceleration=m.acceleration) end diff --git a/test/Project.toml b/test/Project.toml new file mode 100644 index 00000000..ace6054b --- /dev/null +++ b/test/Project.toml @@ -0,0 +1,26 @@ +[deps] +CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597" +ComputationalResources = "ed09eef8-17a6-5b46-8889-db040fac31e3" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +DecisionTree = "7806a523-6efd-50cb-b5f6-3fa6f1930dbb" +Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" +LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" +LossFunctions = "30fc2ffe-d236-52d8-8643-a9d8f7c094a7" +MLJModelInterface = "e80e1ace-859a-464e-9ed9-23947d8ae3ea" +MultivariateStats = "6f286f6a-111f-5878-ab1e-185364afe411" +NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce" +OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d" +Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a" +ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" +Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +ScientificTypes = "321657f4-b219-11e9-178b-2701a2544e81" +Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" +StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3" +Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" +StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" +Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9" diff --git a/test/composition/learning_networks/machines.jl b/test/composition/learning_networks/machines.jl index 13412d9d..85c962d6 100644 --- a/test/composition/learning_networks/machines.jl +++ b/test/composition/learning_networks/machines.jl @@ -155,9 +155,9 @@ zhat = inverse_transform(standM, uhat) yhat = exp(zhat) enode = @node mae(ys, yhat) -@testset "replace method for learning network machines" begin +@testset "replace method for learning network machines, acceleration: $(typeof(accel))" for accel in (CPU1(), CPUThreads()) - fit!(yhat, verbosity=0) + fit!(yhat, verbosity=0, acceleration=accel) # test nested reporting: r = MLJBase.report(yhat) @@ -199,7 +199,7 @@ enode = @node mae(ys, yhat) knnM2 = machines(yhat2, knn) |> first hotM2 = machines(yhat2, hot) |> first - @test_mach_sequence(fit!(yhat2, force=true), + @test_mach_sequence(fit!(yhat2, force=true, acceleration=accel), [(:train, standM2), (:train, hotM2), (:train, knnM2), (:train, oakM2)], [(:train, hotM2), (:train, standM2), @@ -218,7 +218,7 @@ enode = @node mae(ys, yhat) # this change should trigger retraining of all machines except the # univariate standardizer: hot2.drop_last = true - @test_mach_sequence(fit!(yhat2), + @test_mach_sequence(fit!(yhat2, acceleration=accel), [(:skip, standM2), (:update, hotM2), (:train, knnM2), (:train, oakM2)], [(:update, hotM2), (:skip, standM2), diff --git a/test/composition/models/stacking.jl b/test/composition/models/stacking.jl index 66190ea5..3c985ed0 100644 --- a/test/composition/models/stacking.jl +++ b/test/composition/models/stacking.jl @@ -74,8 +74,8 @@ end resampling=CV(;nfolds=3), models...) # Testing attribute access of the stack - @test propertynames(mystack) == (:resampling, :metalearner, :constant, - :decisiontree, :ridge_lambda, :ridge) + @test propertynames(mystack) == (:metalearner, :resampling, :measures, :cache, :acceleration, + :constant, :decisiontree, :ridge_lambda, :ridge) @test mystack.decisiontree isa DecisionTreeRegressor @@ -190,8 +190,10 @@ end models = [DeterministicConstantRegressor(), FooBarRegressor(;lambda=0)] metalearner = DeterministicConstantRegressor() resampling = CV() + cache = true + acceleration = CPU1() - MLJBase.DeterministicStack(modelnames, models, metalearner, resampling, nothing) + MLJBase.DeterministicStack(modelnames, models, metalearner, resampling, nothing, cache, acceleration) # Test input_target_scitypes with non matching target_scitypes models = [KNNRegressor()] @@ -528,5 +530,62 @@ end end end +@testset "Test cache is forwarded to submodels" begin + X, y = make_regression(100, 3; rng=rng) + constant = ConstantRegressor() + ridge = FooBarRegressor() + mystack = Stack(;metalearner=FooBarRegressor(), + cache=false, + ridge=ridge, + constant=constant) + mach = machine(mystack, X, y) + fit!(mach, verbosity = 0) + # The data and resampled_data have not been populated + for mach in fitted_params(mach).machines + @test !isdefined(mach, :data) + @test !isdefined(mach, :resampled_data) + end +end + +# a regression `Stack` which has `model` as one of the base models: +function _stack(model, resource) + models = (constant=DeterministicConstantRegressor(), + ridge_lambda=FooBarRegressor(;lambda=0.1), + model=model) + Stack(; + metalearner=FooBarRegressor(;lambda=0.05), + resampling=CV(;nfolds=3), + acceleration=resource, + models... + ) +end + +# return a nested stack in which `model` appears at two levels, with +# both layers accelerated using `resource`: +_double_stack(model, resource) = + _stack(_stack(model, resource), resource) + +@testset "Test multithreaded version" begin + X, y = make_regression(100, 5; rng=StableRNG(1234)) + + stack = _double_stack(FooBarRegressor(;lambda=0.07), CPU1()) + + mach = machine(stack, X, y) + fit!(mach, verbosity=0) + cpu_fp = fitted_params(mach) + cpu_ypred = predict(mach) + + stack = _double_stack(FooBarRegressor(;lambda=0.07), CPUThreads()) + + mach = machine(stack, X, y) + fit!(mach, verbosity=0) + thread_fp = fitted_params(mach) + thread_ypred = predict(mach) + + @test cpu_ypred โ‰ˆ thread_ypred + @test cpu_fp.metalearner โ‰ˆ thread_fp.metalearner + @test cpu_fp.ridge_lambda โ‰ˆ thread_fp.ridge_lambda +end + end true diff --git a/test/test_utilities.jl b/test/test_utilities.jl index eef0f8a8..0846a91d 100644 --- a/test/test_utilities.jl +++ b/test/test_utilities.jl @@ -45,6 +45,13 @@ function testset_accelerated(name::String, var, ex; exclude=[]) return esc(final_ex) end +""" + sedate!(fit_ex) + +The input is a fit expression as `fit!(mach, kws...)`. This function +throws an error if the verbosity level is set and sets the verbosity level +to -5000 otherwise. +""" function sedate!(fit_ex) kwarg_exs = filter(fit_ex.args) do arg arg isa Expr && arg.head == :kw