Skip to content

Commit

Permalink
Use a LinkedBlockingQueue instead of polling.
Browse files Browse the repository at this point in the history
  • Loading branch information
zainab-ali authored and Baccata committed May 15, 2024
1 parent 52916a5 commit 0e0ca96
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
6 changes: 3 additions & 3 deletions modules/framework/jvm/src/main/scala/RunnerCompat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package framework
import org.typelevel.scalaccompat.annotation.unused

import java.io.PrintStream
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }

import scala.concurrent.duration._
Expand All @@ -16,6 +15,7 @@ import cats.effect.{ Ref, Sync, _ }
import cats.syntax.all._

import sbt.testing.{ Task, TaskDef }
import java.util.concurrent.LinkedBlockingQueue

trait RunnerCompat[F[_]] { self: sbt.testing.Runner =>

Expand Down Expand Up @@ -73,7 +73,7 @@ trait RunnerCompat[F[_]] { self: sbt.testing.Runner =>
// dispatching logs through a single logger at a time.
val loggerPermit = new java.util.concurrent.Semaphore(1, true)

val queue = new ConcurrentLinkedQueue[SuiteEvent]()
val queue = new LinkedBlockingQueue[SuiteEvent]()
val broker = new ConcurrentQueueEventBroker(queue)
val startingBlock = unsafeRun.fromFuture {
promise.future.map(_ => ())(ExecutionContext.global)
Expand Down Expand Up @@ -252,7 +252,7 @@ trait RunnerCompat[F[_]] { self: sbt.testing.Runner =>
}

class ConcurrentQueueEventBroker(
concurrentQueue: ConcurrentLinkedQueue[SuiteEvent])
concurrentQueue: LinkedBlockingQueue[SuiteEvent])
extends SuiteEventBroker {
def send(suiteEvent: SuiteEvent): F[Unit] = {
Sync[F].delay(concurrentQueue.add(suiteEvent)).void
Expand Down
5 changes: 3 additions & 2 deletions modules/framework/jvm/src/main/scala/SbtTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import cats.data.Chain

import sbt.testing.{ Event, EventHandler, Logger, Task, TaskDef }
import java.util.concurrent.TimeUnit

private[framework] class SbtTask(
val taskDef: TaskDef,
isDone: AtomicBoolean,
stillRunning: AtomicInteger,
waitForResourcesShutdown: java.util.concurrent.Semaphore,
start: scala.concurrent.Promise[Unit],
queue: java.util.concurrent.ConcurrentLinkedQueue[SuiteEvent],
queue: java.util.concurrent.LinkedBlockingQueue[SuiteEvent],
loggerPermit: java.util.concurrent.Semaphore,
readFailed: () => Chain[(SuiteName, TestOutcome)]
) extends Task {
Expand All @@ -30,7 +31,7 @@ private[framework] class SbtTask(
loggerPermit.acquire()
try {
while (!finished && !isDone.get()) {
val nextEvent = Option(queue.poll())
val nextEvent = Option(queue.poll(1, TimeUnit.SECONDS))

nextEvent.foreach {
case s @ SuiteStarted(_) => log(s)
Expand Down

0 comments on commit 0e0ca96

Please sign in to comment.