Skip to content

Commit

Permalink
feat: support the flattening syntax for supervising (#1386)
Browse files Browse the repository at this point in the history
* feat: support the flattening syntax for supervising

* update javadsl

* fix compile issue

* rename method

* chore: rename onAnyFailure

* fix: mima filter excludes

* chore: fmt and excludes

* avoid mima filter
  • Loading branch information
Roiocam authored Jul 27, 2024
1 parent 0930982 commit 9885a58
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ public void supervision() {
// #restart-limit

// #multiple
Behaviors.supervise(
Behaviors.supervise(behavior)
.onFailure(IllegalStateException.class, SupervisorStrategy.restart()))
Behaviors.supervise(behavior)
.onFailure(IllegalStateException.class, SupervisorStrategy.restart())
.onFailure(IllegalArgumentException.class, SupervisorStrategy.stop());
// #multiple

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ public Behavior<MyMsg> receive(TypedActorContext<MyMsg> context, MyMsg message)
SupervisorStrategy strategy7 = strategy6.withResetBackoffAfter(Duration.ofSeconds(2));

Behavior<MyMsg> behv =
Behaviors.supervise(
Behaviors.supervise(Behaviors.<MyMsg>ignore())
.onFailure(IllegalStateException.class, strategy6))
Behaviors.supervise(Behaviors.<MyMsg>ignore())
.onFailure(IllegalStateException.class, strategy6)
.onFailure(RuntimeException.class, strategy1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ object SupervisionCompileOnly {

// #multiple
Behaviors
.supervise(Behaviors.supervise(behavior).onFailure[IllegalStateException](SupervisorStrategy.restart))
.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.restart)
.onFailure[IllegalArgumentException](SupervisorStrategy.stop)
// #multiple

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,8 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
"support nesting to handle different exceptions" in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors
.supervise(Behaviors.supervise(targetBehavior(probe.ref)).onFailure[Exc2](SupervisorStrategy.resume))
.supervise(targetBehavior(probe.ref))
.onFailure[Exc2](SupervisorStrategy.resume)
.onFailure[Exc3](SupervisorStrategy.restart)
val ref = spawn(behv)
ref ! IncrementState
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Change the return type of `Behaviors.supervise` to support flattened supervision
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.actor.typed.javadsl.Behaviors#Supervise.onFailure")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.actor.typed.scaladsl.Behaviors#Supervise.onFailure")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.actor.typed.scaladsl.Behaviors#Supervise.onFailure$extension")
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import scala.reflect.ClassTag

import org.apache.pekko
import pekko.actor.InvalidMessageException
import pekko.actor.typed.internal.BehaviorImpl
import pekko.actor.typed.internal.{ BehaviorImpl, BehaviorTags, InterceptorImpl, Supervisor }
import pekko.actor.typed.internal.BehaviorImpl.DeferredBehavior
import pekko.actor.typed.internal.BehaviorImpl.StoppedBehavior
import pekko.actor.typed.internal.BehaviorTags
import pekko.actor.typed.internal.InterceptorImpl
import pekko.annotation.DoNotInherit
import pekko.annotation.InternalApi

Expand Down Expand Up @@ -71,6 +69,33 @@ abstract class Behavior[T](private[pekko] val _tag: Int) { behavior =>

}

/**
* INTERNAL API
* A behavior type that could be supervised, Not for user extension.
*/
@InternalApi
class SuperviseBehavior[T] private[pekko] (
val wrapped: Behavior[T]) extends Behavior[T](BehaviorTags.SuperviseBehavior) {
private final val ThrowableClassTag = ClassTag(classOf[Throwable])

/** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */
def onFailure[Thr <: Throwable](strategy: SupervisorStrategy)(
implicit tag: ClassTag[Thr] = ThrowableClassTag): SuperviseBehavior[T] = {
val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag
new SuperviseBehavior[T](Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag))
}

/**
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws.
*
* Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior.
*/
def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] =
onFailure(strategy)(ClassTag(clazz))

private[pekko] def unwrap: Behavior[T] = wrapped
}

/**
* Extension point for implementing custom behaviors in addition to the existing
* set of behaviors available through the DSLs in [[pekko.actor.typed.scaladsl.Behaviors]] and [[pekko.actor.typed.javadsl.Behaviors]]
Expand Down Expand Up @@ -180,7 +205,8 @@ object Behavior {
val startedInner = start(wrapped.nestedBehavior, ctx.asInstanceOf[TypedActorContext[Any]])
if (startedInner eq wrapped.nestedBehavior) wrapped
else wrapped.replaceNested(startedInner)
case _ => behavior
case supervise: SuperviseBehavior[T] => start(supervise.unwrap, ctx)
case _ => behavior
}
}

Expand Down Expand Up @@ -266,6 +292,8 @@ object Behavior {
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
case BehaviorTags.DeferredBehavior =>
throw new IllegalArgumentException(s"deferred [$behavior] should not be passed to interpreter")
case BehaviorTags.SuperviseBehavior =>
throw new IllegalArgumentException(s"supervise [$behavior] should not be passed to interpreter")
case BehaviorTags.IgnoreBehavior =>
BehaviorImpl.same[T]
case BehaviorTags.StoppedBehavior =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private[pekko] object BehaviorTags {
final val SameBehavior = 6
final val FailedBehavior = 7
final val StoppedBehavior = 8
final val SuperviseBehavior = 9

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@ import scala.reflect.ClassTag

import org.apache.pekko
import pekko.actor.typed._
import pekko.actor.typed.internal.{
BehaviorImpl,
StashBufferImpl,
Supervisor,
TimerSchedulerImpl,
WithMdcBehaviorInterceptor
}
import pekko.actor.typed.internal.{ BehaviorImpl, StashBufferImpl, TimerSchedulerImpl, WithMdcBehaviorInterceptor }
import pekko.japi.function.{ Effect, Function2 => JapiFunction2 }
import pekko.japi.pf.PFBuilder
import pekko.util.ccompat.JavaConverters._
Expand Down Expand Up @@ -271,16 +265,16 @@ object Behaviors {
*
* Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior.
*/
def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): Behavior[T] =
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz))
def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): SuperviseBehavior[T] =
new SuperviseBehavior[T](wrapped).onFailure(clazz, strategy)

/**
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws.
*
* All non-fatal (see [[scala.util.control.NonFatal]]) exceptions types will be handled using the given strategy.
*/
def onFailure(strategy: SupervisorStrategy): Behavior[T] =
onFailure(classOf[Exception], strategy)
new SuperviseBehavior[T](wrapped).onFailure(strategy)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ package org.apache.pekko.actor.typed
package scaladsl

import scala.reflect.ClassTag

import org.apache.pekko
import org.apache.pekko.actor.typed.SuperviseBehavior
import pekko.actor.typed.internal._
import pekko.annotation.{ DoNotInherit, InternalApi }

Expand Down Expand Up @@ -233,9 +233,8 @@ object Behaviors {

/** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */
def onFailure[Thr <: Throwable](strategy: SupervisorStrategy)(
implicit tag: ClassTag[Thr] = ThrowableClassTag): Behavior[T] = {
val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
implicit tag: ClassTag[Thr] = ThrowableClassTag): SuperviseBehavior[T] = {
new SuperviseBehavior[T](wrapped).onFailure(strategy)(tag)
}
}

Expand Down

0 comments on commit 9885a58

Please sign in to comment.