-
Notifications
You must be signed in to change notification settings - Fork 147
FSharpx Async Extensions
The FSharpx.Core library implements various extensions for asynchronous programming
using F# asynchronous workflows and F# agents (the MailboxProcessor
type
in the standard F# library). It defines asynchronous sequences that represent
asynchronous operations returning multiple values (such as reading data from
a stream in chunks), several reusable F# agents and numerous extensions.
- Samples that demonstrate how to use most of the extensions can be found in the samples directory
In FSharp.Control
Asynchronous sequences can be used to work with asynchronous computations that return
multiple results. A value of type AsyncSeq<'T>
can be started (just like an asynchronous
workflow) and it eventually returns. The result is either a special value representing
the end of the sequence or a value of type 'T
(head) together with the rest of the
asynchronous sequence (tail) of type AsyncSeq<'T>
.
Unlike IObservable<'T>
, asynchronous sequences are not push-based. The code that
generates the next value of the asynchronous sequence starts only after previous elements
have been processed. This makes it possible to easily write computations that return
results as long as some component is using them.
However, IObservable<'T>
values can
be converted to asynchronous sequences. The AsyncSeq.ofObservable
combinator creates an
asynchronous sequence that discards values produced by the observable while the
asynchronous sequence was blocked. The AsyncSeq.ofObservableBuffered
combinator stores
all produced values in an unbounded buffer and returns the values from the buffer as soon
as the user of asynchronous sequence requestst the next element.
The library defines an F# computation expression for workfing with asynchronous sequences. For example, sequence that emits numbers in 1 second intervals can be defined as follows:
let rec numbers n = asyncSeq {
yield n
do! Async.Sleep(1000)
yield! numbers (n + 1) }
Asynchronous workflows and asynchronous sequences can use the for
construct to iterate
over all elements of an asynchronous sequence. For example:
let rec evenNumbers = asyncSeq {
for n in numbers 0 do
if n%2=0 then yield n }
The library also provides numerous combinators (similar to functions from the Seq
module).
The result of operations that aggregate values of an asynchronous sequence is an asynchronous
workflow that returns a single value:
let rec sumTenEvenSquares =
numbers 0
|> AsyncSeq.filter (fun n -> n%2 = 0)
|> AsyncSeq.map (fun n -> n*n)
|> AsyncSeq.fold (+) 0
let n =
sumTenEvenSquares
|> Async.RunSynchronously
For some examples that use (earlier versions) of asynchronous sequences, see also the following two F# snippets: first and second.
The library implements several reusable agents for building concurrent applications:
-
Agent is a simple type aliast for
MailboxProcessor
that is more convenient to use -
AutoCancelAgent wraps the standard F# agent and adds support for stopping of the agent's body using the
IDisposable
interface (the type automatically creates a cancellation token, uses it to start the underlying agent and cancels it when the agent is disposed). For example, see this F# snippet. -
BatchProcessingAgent can be used to implement batch processing. It creates groups of messages (added using the
Enqueue
method) and emits them using theBatchProduced
event. A group is produced when it reaches the maximal size or after the timeout elapses. -
BlockingQueueAgent implements an asynchronous queue with blocking put and blocking get operations. It can be used to implement the producer-consumer concurrent pattern. The constructor of the agent takes the maximal size of the buffer.
The library implements extensions for using IObservable<'T>
type from F# asynchronous
workflows. An overloaded extension method Async.AwaitObservable
can be used to wait
for an occurrence of an event (or other observable action):
let counter n = async {
printfn "Counting: %d" n
let! _ = form.MouseDown |> Async.AwaitObservable
return! counter (n + 1) }
Overloaded version of the method allows waiting for the first of multiple events. The
method asynchronously returns Choice<'T1, 'T2>
value that can be used to determine
which of the events has occurred.
For examples using this method see Chapter 16 of Real World Functional Programming
(some examples are available in a free excerpt from the chapter). The
Async.AwaitObservable
method should be used instead of Async.AwaitEvent
to avoid
memory leaks (see also related StackOverflow discussion)