Skip to content

Commit

Permalink
feat: Add isCompleted method to BoundedSourceQueue (#1374)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Jun 18, 2024
1 parent 979964e commit 1d87923
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
}

queue.complete()
queue.isCompleted shouldBe true

val subIt = Iterator.continually(sub.requestNext())
subIt.zip(elements.iterator).foreach {
Expand Down Expand Up @@ -81,6 +82,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
queue.isCompleted shouldBe true
assertThrows[IllegalStateException](queue.complete())
}

Expand All @@ -89,6 +91,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
queue.isCompleted shouldBe true
assertThrows[IllegalStateException](queue.fail(ex))
}

Expand All @@ -98,6 +101,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()

queue.complete()
queue.isCompleted shouldBe true
queue.offer(1) should be(QueueOfferResult.QueueClosed)
sub.expectSubscriptionAndComplete()
}
Expand All @@ -108,6 +112,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()

queue.fail(ex)
queue.isCompleted shouldBe true
queue.offer(1) should be(QueueOfferResult.Failure(ex))
sub.request(1)
sub.expectError(ex)
Expand Down Expand Up @@ -180,6 +185,7 @@ class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
// where enqueueing an element concurrently with Done reaching the stage can lead to Enqueued being returned
// but the element dropped (no guarantee of entering stream as documented in BoundedSourceQueue.offer
queue.complete()
queue.isCompleted shouldBe true

result.futureValue should be(counter.get())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add BoundedSourceQueue.isCompleted method
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.BoundedSourceQueue.isCompleted")
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ trait BoundedSourceQueue[T] {
*/
def complete(): Unit

/**
* Returns true if the stream has been completed, either normally or with failure.
*
* @since 1.1.0
*/
def isCompleted: Boolean

/**
* Completes the stream with a failure.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou
}

object Mat extends BoundedSourceQueue[T] {
override def offer(elem: T): QueueOfferResult = state.get() match {
final override def offer(elem: T): QueueOfferResult = state.get() match {
case Running | NeedsActivation =>
if (queue.add(elem)) {
// need to query state again because stage might have switched from Running -> NeedsActivation only after
Expand All @@ -130,21 +130,23 @@ import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, Ou
case Done(result) => result
}

override def complete(): Unit = {
final override def complete(): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.QueueClosed)))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}

override def fail(ex: Throwable): Unit = {
final override def isCompleted: Boolean = state.get().isInstanceOf[Done]

final override def fail(ex: Throwable): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.Failure(ex))))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}

override def size(): Int = queue.size()
final override def size(): Int = queue.size()
}

// some state transition helpers
Expand Down

0 comments on commit 1d87923

Please sign in to comment.