Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic implementation of While for resumable code #244

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>

<IsPackable>false</IsPackable>
<GenerateProgramFile>false</GenerateProgramFile>
<ApplicationIcon>..\..\assets\TaskSeq.ico</ApplicationIcon>
</PropertyGroup>

<ItemGroup>
<Content Include="..\..\assets\TaskSeq.ico" Link="TaskSeq.ico" />
<Compile Include="AssemblyInfo.fs" />
Expand Down Expand Up @@ -47,9 +44,9 @@
<Compile Include="TaskSeq.StateTransitionBug-delayed.Tests.CE.fs" />
<Compile Include="TaskSeq.PocTests.fs" />
<Compile Include="TaskSeq.Realworld.fs" />
<Compile Include="TaskSeq.Extensions.Tests.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<!-- align test project with minimal required version for TaskSeq -->
<!-- we use 6.0.3 here and not 6.0.2 because TaskResult lib requires it-->
Expand All @@ -67,9 +64,7 @@
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\FSharp.Control.TaskSeq\FSharp.Control.TaskSeq.fsproj" />
</ItemGroup>

</Project>
28 changes: 28 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module TaskSeq.Extenions

open System
open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharp.Control

//
// TaskSeq.except
// TaskSeq.exceptOfSeq
//


// module TaskBuilder =
// open TaskSeq.Tests

// [<Theory; ClassData(typeof<TestImmTaskSeq>)>]
// let ``TaskSeq-existsAsync happy path last item of seq`` variant =
// task {
// let values = Gen.getSeqImmutable variant
// let mutable sum = 0
// for x in values do
// sum <- sum + x
// }
// |> TaskSeq.existsAsync (fun x -> task { return x = 10 })
// |> Task.map (should be True)
96 changes: 96 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

#nowarn "57"

module TaskSeq =
// F# BUG: the following module is 'AutoOpen' and this isn't needed in the Tests project. Why do we need to open it?
open FSharp.Control.TaskSeqBuilders
Expand Down Expand Up @@ -319,3 +321,97 @@ module TaskSeq =
let fold folder state source = Internal.fold (FolderAction folder) state source

let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source

#nowarn "1204"
#nowarn "3513"


[<AutoOpen>]
module AsyncSeqExtensions =

let rec WhileDynamic
(
sm: byref<TaskStateMachine<'Data>>,
condition: unit -> ValueTask<bool>,
body: TaskCode<'Data, unit>
) : bool =
let vt = condition ()
TaskBuilderBase.BindDynamic(&sm, vt, fun result ->
TaskCode<_,_>(fun sm ->
if result then
if body.Invoke(&sm) then
WhileDynamic(&sm, condition, body)
else
let rf = sm.ResumptionDynamicInfo.ResumptionFunc

sm.ResumptionDynamicInfo.ResumptionFunc <-
(TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf)))

false
else
true
)
)


and WhileBodyDynamicAux
(
sm: byref<TaskStateMachine<'Data>>,
condition: unit -> ValueTask<bool>,
body: TaskCode<'Data, unit>,
rf: TaskResumptionFunc<_>
) : bool =
if rf.Invoke(&sm) then
WhileDynamic(&sm, condition, body)
else
let rf = sm.ResumptionDynamicInfo.ResumptionFunc

sm.ResumptionDynamicInfo.ResumptionFunc <-
(TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf)))

false
open Microsoft.FSharp.Core.CompilerServices

// Add asynchronous for loop to the 'async' computation builder
type Microsoft.FSharp.Control.AsyncBuilder with

member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async<unit>) =
tasksq
|> TaskSeq.iterAsync (action >> Async.StartAsTask)
|> Async.AwaitTask

// Add asynchronous for loop to the 'task' computation builder
type Microsoft.FSharp.Control.TaskBuilder with


member inline this.While(condition : unit -> ValueTask<bool>, body : TaskCode<'TOverall,unit>) =
TaskCode<_,_>(fun sm ->
WhileDynamic(&sm, condition, body)

)



member inline this.For
(
tasksq: IAsyncEnumerable<'T>,
body: 'T -> TaskCode<'TOverall, unit>
) : TaskCode<'TOverall, unit> =
TaskCode<'TOverall, unit>(fun sm ->

this
.Using(
tasksq.GetAsyncEnumerator(CancellationToken()),
(fun e ->
let next () = e.MoveNextAsync()
this.While(next, (fun sm -> (body e.Current).Invoke(&sm))))
)
.Invoke(&sm))

let foo () =
task {
let mutable sum = 0
let xs = taskSeq { 1; 2; 3}
for x in xs do
sum <- sum + x
}
35 changes: 35 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace FSharp.Control

#nowarn "1204"

module TaskSeq =
open System.Collections.Generic
open System.Threading.Tasks
Expand Down Expand Up @@ -562,3 +564,36 @@ module TaskSeq =
/// If the accumulator function <paramref name="folder" /> does not need to be asynchronous, consider using <see cref="TaskSeq.fold" />.
/// </summary>
val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State>



[<AutoOpen>]
module AsyncSeqExtensions =

val WhileDynamic:
sm: byref<TaskStateMachine<'Data>> *
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) *
body: TaskCode<'Data, unit> ->
bool

val WhileBodyDynamicAux:
sm: byref<TaskStateMachine<'Data>> *
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) *
body: TaskCode<'Data, unit> *
rf: TaskResumptionFunc<'Data> ->
bool

type AsyncBuilder with

member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async<unit>) -> Async<unit>

type TaskBuilder with

member inline While:
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) * body: TaskCode<'TOverall, unit> ->
TaskCode<'TOverall, 'a>

member inline For:
tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) ->
TaskCode<'TOverall, unit>