Skip to content

Commit

Permalink
Merge pull request #53 from PhilippMDoerner/docs/#52-expand-channel-docs
Browse files Browse the repository at this point in the history
#52 Expand channel documentation
  • Loading branch information
Araq authored Dec 14, 2023
2 parents 06505e8 + 82b72f6 commit 73e9dcc
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions threading/channels.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# This Channel implementation is a shared memory concurrent queue using
# This Channel implementation is a shared memory, fixed-size, concurrent queue using
# a circular buffer for data. Based on channels implementation[1]_ by
# Mamy André-Ratsimbazafy (@mratsim), which is a C to Nim translation of the
# original[2]_ by Andreas Prell (@aprell)
Expand All @@ -20,14 +20,18 @@
##
## This module implements multi-producer multi-consumer channels - a concurrency
## primitive with a high-level interface intended for communication and
## synchronization between threads. It allows sending and receiving typed data,
## enabling safe and efficient concurrency.
## synchronization between threads. It allows sending and receiving typed, isolated
## data, enabling safe and efficient concurrency.
##
## The `Chan` type represents a generic channel object that internally manages
## The `Chan` type represents a generic fixed-size channel object that internally manages
## the underlying resources and synchronization. It has to be initialized using
## the `newChan` proc. Sending and receiving operations are provided by the
## blocking `send` and `recv` procs, and non-blocking `trySend` and `tryRecv`
## procs.
## procs. Send operations add messages to the channel, receiving operations
## remove them.
##
## See also:
## * [std/isolation](https://nim-lang.org/docs/isolation.html)
##
## The following is a simple example of two different ways to use channels:
## blocking and non-blocking.
Expand Down Expand Up @@ -266,22 +270,37 @@ proc `=copy`*[T](dest: var Chan[T], src: Chan[T]) =
dest.d = src.d

proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
## Sends item to the channel (non-blocking).
## Tries to send a message to a channel.
##
## The memory `src` is moved, not copied. Doesn't block.
##
## Returns `false` if the message was not sent because the number of pending
## items in the channel exceeded its capacity.
var data = src.extract
result = channelSend(c.d, data.unsafeAddr, sizeof(T), false)
if result:
wasMoved(data)

template trySend*[T](c: Chan[T], src: T): bool =
## Helper template for `trySend`.
## Helper template for `trySend <#trySend,Chan[T],sinkIsolated[T]>`_.
trySend(c, isolate(src))

proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} =
## Receives item from the channel (non-blocking).
## Tries to receive a message from the channel `c` and fill `dst` with its value.
## This returns immediately even if no message is found. Doesn't block.
##
## This can fail for all sort of reasons, including a lack of messages in the channel
## to receive or contention.
##
## If it fails it returns `false`. Otherwise it returns `true`.
channelReceive(c.d, dst.addr, sizeof(T), false)

proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
## Sends item to the channel (blocking).
## Sends item to the channel.
## This blocks the sending thread until the item was successfully sent.
##
## If the channel is already full with items this will block the thread until
## items from the channel are removed.
var data = src.extract
when defined(gcOrc) and defined(nimSafeOrcSend):
GC_runOrc()
Expand All @@ -293,24 +312,35 @@ template send*[T](c: Chan[T]; src: T) =
send(c, isolate(src))

proc recv*[T](c: Chan[T], dst: var T) {.inline.} =
## Receives item from the channel (blocking).
## Receives an item from the channel.
## Fills `dist` with the item.
## This blocks the receiving thread until an item was successfully received.
##
## If the channel does not contain any items this will block the thread until
## items get sent to the channel.
discard channelReceive(c.d, dst.addr, sizeof(T), true)

proc recv*[T](c: Chan[T]): T {.inline.} =
## Receives item from the channel (blocking).
## Receives an item from the channel.
## A version of `recv`_ that returns the item.
discard channelReceive(c.d, result.addr, sizeof(result), true)

proc recvIso*[T](c: Chan[T]): Isolated[T] {.inline.} =
## Receives an item from the channel.
## A version of `recv`_ that returns the item and isolates it.
var dst: T
discard channelReceive(c.d, dst.addr, sizeof(T), true)
result = isolate(dst)

func peek*[T](c: Chan[T]): int {.inline.} =
## Returns an estimation of current number of items held by the channel.
## Returns an estimation of the current number of items held by the channel.
numItems(c.d)

proc newChan*[T](elements: Positive = 30): Chan[T] =
## An initialization procedure, necessary for acquiring resources and
## initializing internal state of the channel.
## initializing internal state of the channel.
##
## `elements` is the capacity of the channel and thus how many items it can hold
## before it refuses to accept any further items.
assert elements >= 1, "Elements must be positive!"
result = Chan[T](d: allocChannel(sizeof(T), elements))

0 comments on commit 73e9dcc

Please sign in to comment.