Skip to content

Commit

Permalink
Add test for virtual thread support (#1436)
Browse files Browse the repository at this point in the history
* feat: Add virtual thread support

* use public lookup

* try to add test

* Update VirtualThreadPoolDispatcherSpec.scala

* Update VirtualThreadPoolDispatcherSpec.scala

* Apply suggestions from code review

Co-authored-by: Andy(Jingzhang)Chen <[email protected]>

---------

Co-authored-by: He-Pin <[email protected]>
Co-authored-by: Andy(Jingzhang)Chen <[email protected]>
  • Loading branch information
3 people authored Aug 14, 2024
1 parent d737c44 commit 2219453
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import com.typesafe.config.ConfigFactory

import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }

object VirtualThreadPoolDispatcherSpec {
val config = ConfigFactory.parseString("""
|virtual-thread-dispatcher {
| executor = virtual-thread-executor
|}
""".stripMargin)

class InnocentActor extends Actor {

override def receive = {
case "ping" =>
sender() ! "All fine"
}
}

}

class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
import VirtualThreadPoolDispatcherSpec._

val Iterations = 1000

"VirtualThreadPool support" must {

"handle simple dispatch" in {
val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher"))
innocentActor ! "ping"
expectMsg("All fine")
}

}
}
14 changes: 14 additions & 0 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ pekko {
# Valid options:
# - "default-executor" requires a "default-executor" section
# - "fork-join-executor" requires a "fork-join-executor" section
# - "virtual-thread-executor" requires a "virtual-thread-executor" section
# - "thread-pool-executor" requires a "thread-pool-executor" section
# - "affinity-pool-executor" requires an "affinity-pool-executor" section
# - A FQCN of a class extending ExecutorServiceConfigurator
Expand Down Expand Up @@ -539,6 +540,19 @@ pekko {
allow-core-timeout = on
}

# This will be used if you have set "executor = "virtual-thread-executor"
# This executor will execute the every task on a new virtual thread.
# Underlying thread pool implementation is java.util.concurrent.ForkJoinPool for JDK <= 22
# If the current runtime does not support virtual thread,
# then the executor configured in "fallback" will be used.
virtual-thread-executor {
#Please set the the underlying pool with system properties below:
#jdk.virtualThreadScheduler.parallelism
#jdk.virtualThreadScheduler.maxPoolSize
#jdk.virtualThreadScheduler.minRunnable
#jdk.unparker.maxPoolSize
fallback = "fork-join-executor"
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 1s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ package org.apache.pekko.dispatch
import java.{ util => ju }
import java.util.concurrent._

import scala.annotation.tailrec
import scala.annotation.{ nowarn, tailrec }
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.control.NonFatal

import scala.annotation.nowarn
import com.typesafe.config.Config

import org.apache.pekko
import pekko.actor._
import pekko.annotation.InternalStableApi
Expand All @@ -33,6 +30,8 @@ import pekko.event.EventStream
import pekko.event.Logging.{ Debug, Error, LogEventException }
import pekko.util.{ unused, Index, Unsafe }

import com.typesafe.config.Config

final case class Envelope private (message: Any, sender: ActorRef) {

def copy(message: Any = message, sender: ActorRef = sender) = {
Expand Down Expand Up @@ -367,9 +366,16 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
def dispatcher(): MessageDispatcher

def configureExecutor(): ExecutorServiceConfigurator = {
@tailrec
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" =>
new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "virtual-thread-executor" =>
if (VirtualThreadSupport.isSupported) {
new VirtualThreadExecutorConfigurator(config.getConfig("virtual-thread-executor"), prerequisites)
} else {
configurator(config.getString("virtual-thread-executor.fallback"))
}
case "thread-pool-executor" =>
new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case "affinity-pool-executor" =>
Expand Down Expand Up @@ -401,6 +407,32 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
}
}

final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {

override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
import VirtualThreadSupport._
val tf: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
new ThreadFactory {
private val vtFactory = newVirtualThreadFactory(name)

override def newThread(r: Runnable): Thread = {
val vt = vtFactory.newThread(r)
vt.setUncaughtExceptionHandler(exceptionHandler)
contextClassLoader.foreach(vt.setContextClassLoader)
vt
}
}
case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name);
}
new ExecutorServiceFactory {
import VirtualThreadSupport._
override def createExecutorService: ExecutorService = newThreadPerTaskExecutor(tf)
}
}
}

class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import org.apache.pekko.annotation.InternalApi
import org.apache.pekko.util.JavaVersion

import java.lang.invoke.{ MethodHandles, MethodType }
import java.util.concurrent.{ ExecutorService, ThreadFactory }
import scala.util.control.NonFatal

@InternalApi
private[dispatch] object VirtualThreadSupport {
private val lookup = MethodHandles.publicLookup()

/**
* Is virtual thread supported
*/
val isSupported: Boolean = JavaVersion.majorVersion >= 21

/**
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
* virtual thread.
*/
def newVirtualThreadFactory(prefix: String): ThreadFactory = {
require(isSupported, "Virtual thread is not supported.")
try {
val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass))
var builder = ofVirtualMethod.invoke()
val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
// TODO support replace scheduler when we drop Java 8 support
val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory]))
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L)
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
} catch {
case NonFatal(e) =>
// --add-opens java.base/java.lang=ALL-UNNAMED
throw new UnsupportedOperationException("Failed to create virtual thread factory", e)
}
}

def newThreadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = {
require(threadFactory != null, "threadFactory should not be null.")
try {
val executorsClazz = ClassLoader.getSystemClassLoader.loadClass("java.util.concurrent.Executors")
val newThreadPerTaskExecutorMethod = lookup.findStatic(
executorsClazz,
"newThreadPerTaskExecutor",
MethodType.methodType(classOf[ExecutorService], classOf[ThreadFactory]))
newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService]
} catch {
case NonFatal(e) =>
throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e)
}
}

}
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ lazy val actor = pekkoModule("actor")
.enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9)

lazy val actorTests = pekkoModule("actor-tests")
.configs(Jdk9.TestJdk9)
.dependsOn(testkit % "compile->compile;test->test", actor)
.settings(Dependencies.actorTests)
.enablePlugins(NoPublish)
.enablePlugins(NoPublish, Jdk9)
.disablePlugins(MimaPlugin)

lazy val pekkoScalaNightly = pekkoModule("scala-nightly")
Expand Down

0 comments on commit 2219453

Please sign in to comment.