From f72077aa2a720621740c424229f00ccd5c90b513 Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Fri, 18 Feb 2022 14:37:50 +0000 Subject: [PATCH 1/7] Added failing test for cats-io 3.x context propagation. --- build.sbt | 4 +-- .../src/main/resources/reference.conf | 5 +-- .../cats/CatsIOInstrumentationSpec.scala | 34 ++++++++++--------- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/build.sbt b/build.sbt index 111594f17..a8b60805a 100644 --- a/build.sbt +++ b/build.sbt @@ -242,9 +242,9 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io")) kanelaAgent % "provided", { if(scalaBinaryVersion.value == "2.11") - "org.typelevel" %% "cats-effect" % "2.0.0" % "provided" + "org.typelevel" %% "cats-effect" % "3.3.5" % "provided" else - "org.typelevel" %% "cats-effect" % "2.1.2" % "provided" + "org.typelevel" %% "cats-effect" % "3.3.5" % "provided" }, scalatest % "test", logbackClassic % "test" diff --git a/instrumentation/kamon-cats-io/src/main/resources/reference.conf b/instrumentation/kamon-cats-io/src/main/resources/reference.conf index 9d31317c2..30de520ec 100644 --- a/instrumentation/kamon-cats-io/src/main/resources/reference.conf +++ b/instrumentation/kamon-cats-io/src/main/resources/reference.conf @@ -4,7 +4,8 @@ kanela.modules { executor-service { - within += "cats.effect.internals.IOShift\\$Tick" - within += "cats.effect.internals.IOTimer\\$ShiftTick" + within += "cats.effect.unsafe..*" + within += "cats.effect.unsafe.SchedulerCompanionPlatform" + within += "cats.effect.unsafe.$Scheduler" } } \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala index d8298338f..1dfa6f187 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala @@ -1,6 +1,7 @@ package kamon.instrumentation.futures.cats -import cats.effect.{ContextShift, IO} +import cats.effect.unsafe.IORuntime +import cats.effect.{IO, Spawn} import kamon.Kamon import kamon.context.Context import kamon.tag.Lookups.plain @@ -24,13 +25,9 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu "an cats.effect IO created when instrumentation is active" should { "capture the active span available when created" which { "must be available across asynchronous boundaries" in { - implicit val ctxShift: ContextShift[IO] = IO.contextShift(global) - val anotherExecutionContext: ExecutionContext = - ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) + val runtime = IORuntime.global + val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) val context = Context.of("key", "value") - - implicit val timer = IO.timer(global) - val contextTagAfterTransformations = for { scope <- IO { @@ -38,23 +35,28 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu } len <- IO("Hello Kamon!").map(_.length) _ <- IO(len.toString) - _ <- IO.shift(global) - _ <- IO.shift - _ <- IO.sleep(Duration.Zero) - _ <- IO.shift(anotherExecutionContext) + beforeChanging <- getKey() + evalOnGlobalRes <- Spawn[IO].evalOn(IO.sleep(Duration.Zero).flatMap(_ => getKey()), global) + evalOnAnotherEx <- Spawn[IO].evalOn(IO.sleep(Duration.Zero).flatMap(_ => getKey()), anotherExecutionContext) } yield { val tagValue = Kamon.currentContext().getTag(plain("key")) scope.close() - tagValue + (beforeChanging, evalOnGlobalRes,evalOnAnotherEx, tagValue) } - val contextTagFuture = contextTagAfterTransformations.unsafeToFuture() - - + val contextTagFuture = contextTagAfterTransformations.unsafeToFuture()(runtime) eventually(timeout(10 seconds)) { - contextTagFuture.value.get.get shouldBe "value" + val (beforeChanging, evalOnGlobalRes,evalOnAnotherEx, tagValue) = contextTagFuture.value.get.get + withClue("before changing")(beforeChanging shouldBe "value") + withClue("on the global exec context")(evalOnGlobalRes shouldBe "value") + withClue("on a different exec context")(evalOnAnotherEx shouldBe "value") + withClue("final result")(tagValue shouldBe "value") } } } } + + private def getKey(): IO[String] = { + IO.delay(Kamon.currentContext().getTag(plain("key"))) + } } \ No newline at end of file From 484190886464ddba7bf8a85fc3617db38d5d298e Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Tue, 22 Feb 2022 00:32:16 +0000 Subject: [PATCH 2/7] First draft. --- .../src/main/resources/reference.conf | 16 ++++++-- .../cats/IOFiberInstrumentation.scala | 25 ++++++++++++ .../cats/CatsIOInstrumentationSpec.scala | 34 +++++++++------- .../src/test/resources/application.conf | 39 ------------------- .../kamon-jdbc/src/test/resources/logback.xml | 12 ------ 5 files changed, 56 insertions(+), 70 deletions(-) create mode 100644 instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala delete mode 100644 instrumentation/kamon-jdbc/src/test/resources/application.conf delete mode 100644 instrumentation/kamon-jdbc/src/test/resources/logback.xml diff --git a/instrumentation/kamon-cats-io/src/main/resources/reference.conf b/instrumentation/kamon-cats-io/src/main/resources/reference.conf index 30de520ec..30ffac0c8 100644 --- a/instrumentation/kamon-cats-io/src/main/resources/reference.conf +++ b/instrumentation/kamon-cats-io/src/main/resources/reference.conf @@ -1,11 +1,19 @@ ############################################# # Kamon Cats IO Reference Configuration # ############################################# - kanela.modules { executor-service { - within += "cats.effect.unsafe..*" - within += "cats.effect.unsafe.SchedulerCompanionPlatform" - within += "cats.effect.unsafe.$Scheduler" + within += "cats.effect.*" + } + cats-fibers { + name = "Cats-IO Instrumentation" + description = "Provides instrumentation for Cats IO Fibers" + + instrumentations = [ + "kamon.instrumentation.cats.IOFiberInstrumentation" + ] + within = [ + "^cats.effect.*" + ] } } \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala new file mode 100644 index 000000000..c8091c1b0 --- /dev/null +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala @@ -0,0 +1,25 @@ +package kamon.instrumentation.cats + +import kamon.Kamon +import kamon.instrumentation.context.HasContext +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + + +class IOFiberInstrumentation extends InstrumentationBuilder { + onTypes("cats.effect.IOFiber") + .advise(method("run"), IOFiberInstrumentation) +} + +object IOFiberInstrumentation { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This fiber: Any): Unit = { + Kamon.storeContext(fiber.asInstanceOf[HasContext].context) + } + + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.This fiber: Any): Unit = { + fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) + Kamon.storeContext(null) + } +} diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala index 1dfa6f187..91cc1c686 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala @@ -5,6 +5,7 @@ import cats.effect.{IO, Spawn} import kamon.Kamon import kamon.context.Context import kamon.tag.Lookups.plain +import kamon.trace.Span import org.scalatest.OptionValues import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} import org.scalatest.matchers.should.Matchers @@ -28,35 +29,38 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu val runtime = IORuntime.global val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) val context = Context.of("key", "value") - val contextTagAfterTransformations = + val test = for { - scope <- IO { - Kamon.storeContext(context) - } + scope <- IO.delay(Kamon.storeContext(context)) len <- IO("Hello Kamon!").map(_.length) _ <- IO(len.toString) beforeChanging <- getKey() evalOnGlobalRes <- Spawn[IO].evalOn(IO.sleep(Duration.Zero).flatMap(_ => getKey()), global) + outerSpanIdBeginning <- IO.delay(Kamon.currentSpan().id.string) + innerSpan <- IO.delay(Kamon.clientSpanBuilder("Foo", "attempt").context(context).start()) + innerSpanId1 <- Spawn[IO].evalOn(IO.delay(Kamon.currentSpan()), anotherExecutionContext) + innerSpanId2 <- IO.delay(Kamon.currentSpan()) + _ <- IO.delay(innerSpan.finish()) + outerSpanIdEnd <- IO.delay(Kamon.currentSpan().id.string) evalOnAnotherEx <- Spawn[IO].evalOn(IO.sleep(Duration.Zero).flatMap(_ => getKey()), anotherExecutionContext) } yield { - val tagValue = Kamon.currentContext().getTag(plain("key")) scope.close() - (beforeChanging, evalOnGlobalRes,evalOnAnotherEx, tagValue) + withClue("before changing")(beforeChanging shouldBe "value") + withClue("on the global exec context")(evalOnGlobalRes shouldBe "value") + withClue("on a different exec context")(evalOnAnotherEx shouldBe "value") + withClue("final result")(evalOnAnotherEx shouldBe "value") + withClue("inner span should be the same on different exec")(innerSpanId1 shouldBe innerSpan) + withClue("inner span should be the same on same exec")(innerSpanId2 shouldBe innerSpan) + withClue("inner and outer should be different")(outerSpanIdBeginning should not equal innerSpan) } - val contextTagFuture = contextTagAfterTransformations.unsafeToFuture()(runtime) - eventually(timeout(10 seconds)) { - val (beforeChanging, evalOnGlobalRes,evalOnAnotherEx, tagValue) = contextTagFuture.value.get.get - withClue("before changing")(beforeChanging shouldBe "value") - withClue("on the global exec context")(evalOnGlobalRes shouldBe "value") - withClue("on a different exec context")(evalOnAnotherEx shouldBe "value") - withClue("final result")(tagValue shouldBe "value") - } + test.unsafeRunSync()(runtime) } } } - private def getKey(): IO[String] = { IO.delay(Kamon.currentContext().getTag(plain("key"))) } + + } \ No newline at end of file diff --git a/instrumentation/kamon-jdbc/src/test/resources/application.conf b/instrumentation/kamon-jdbc/src/test/resources/application.conf deleted file mode 100644 index c0ba6d72e..000000000 --- a/instrumentation/kamon-jdbc/src/test/resources/application.conf +++ /dev/null @@ -1,39 +0,0 @@ -kamon { - - instrumentation.jdbc { - parse-sql-for-operation-name = true - } - - trace { - tick-inteval = 200 milliseconds - hooks.pre-start = [ "kamon.trace.Hooks$PreStart$FromContext" ] - } - - metric.factory.default-settings { - range-sampler.auto-update-interval = 1 millisecond - } - - jdbc { - slow-query-threshold = 400 milliseconds - - # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. - slow-query-processor = kamon.instrumentation.jdbc.NoOpSlowQueryProcessor - - # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. - sql-error-processor = kamon.instrumentation.jdbc.NoOpSqlErrorProcessor - } -} - -slick-h2 = { - url = "jdbc:h2:mem:slick" - driver = org.h2.Driver - connectionPool = disabled - keepAliveConnection = true -} - -slick-h2-with-pool = { - url = "jdbc:h2:mem:slick_with_pool" - driver = org.h2.Driver - connectionPool = "HikariCP" - keepAliveConnection = true -} diff --git a/instrumentation/kamon-jdbc/src/test/resources/logback.xml b/instrumentation/kamon-jdbc/src/test/resources/logback.xml deleted file mode 100644 index c336bbfe4..000000000 --- a/instrumentation/kamon-jdbc/src/test/resources/logback.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - \ No newline at end of file From 8d4163f811a441651050de0edbc1731fb9960fcd Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Tue, 22 Feb 2022 14:15:55 +0000 Subject: [PATCH 3/7] Couple more changes. --- .../cats/IOFiberInstrumentation.scala | 4 +- .../cats/CatsIOInstrumentationSpec.scala | 50 +++++++++++++++++-- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala index c8091c1b0..f8b24de44 100644 --- a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala @@ -8,17 +8,19 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice class IOFiberInstrumentation extends InstrumentationBuilder { onTypes("cats.effect.IOFiber") - .advise(method("run"), IOFiberInstrumentation) + .advise(method("runLoop"), IOFiberInstrumentation) } object IOFiberInstrumentation { @Advice.OnMethodEnter(suppress = classOf[Throwable]) def enter(@Advice.This fiber: Any): Unit = { + //println(s"Entering: ${fiber.asInstanceOf[HasContext].context.tags.get(Lookups.plain("key"))}, ${Thread.currentThread().getName}") Kamon.storeContext(fiber.asInstanceOf[HasContext].context) } @Advice.OnMethodExit(suppress = classOf[Throwable]) def exit(@Advice.This fiber: Any): Unit = { + // println(s"Exiting: ${fiber.asInstanceOf[HasContext].context.tags.get(Lookups.plain("key"))}, ${Thread.currentThread().getName}") fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) Kamon.storeContext(null) } diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala index 91cc1c686..29b5755dc 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala @@ -1,20 +1,21 @@ package kamon.instrumentation.futures.cats import cats.effect.unsafe.IORuntime -import cats.effect.{IO, Spawn} +import cats.effect.{IO, Resource, Spawn} import kamon.Kamon import kamon.context.Context import kamon.tag.Lookups.plain -import kamon.trace.Span import org.scalatest.OptionValues import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import java.util.concurrent.Executors -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.ExecutionContext.global import scala.concurrent.duration._ +import cats.implicits._ +import kamon.trace.Span class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues with Eventually { @@ -26,8 +27,9 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu "an cats.effect IO created when instrumentation is active" should { "capture the active span available when created" which { "must be available across asynchronous boundaries" in { + val runtime = IORuntime.global - val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) + val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)) val context = Context.of("key", "value") val test = for { @@ -56,11 +58,51 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu test.unsafeRunSync()(runtime) } + + "must allow complex Span topologies to be created" in { + val context = Context.of("key", "value") + /** + * test + * - nestedLevel0 + * - nestedUpToLevel2 + * - nestedUpToLevel2._2._1 + * - fiftyInParallel + */ + val test = for { + span <- IO.delay(Kamon.currentSpan()) + nestedLevel0 <- meteredWithSpanCapture("level1-A")(IO.sleep(1.seconds)) + nestedUpToLevel2 <- meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(IO.sleep(1.seconds))) + fiftyInParallel <- (0 to 50).toList.parTraverse(i => meteredWithSpanCapture(s"operation$i")(IO.sleep(1.seconds))) + } yield { + span.id.string should not be empty + span.id.string shouldBe nestedLevel0._1.parentId.string + span.id.string shouldBe nestedUpToLevel2._1.parentId.string + nestedUpToLevel2._1.id.string shouldBe nestedUpToLevel2._2._1.parentId.string + fiftyInParallel.map(_._1.parentId.string).toSet shouldBe Set(span.id.string) + } + + val runtime = IORuntime.global + (IO.delay(Kamon.storeContext(context)) *> meteredWithSpanCapture("test")(test)).unsafeRunSync()(runtime) + } } } private def getKey(): IO[String] = { IO.delay(Kamon.currentContext().getTag(plain("key"))) } + private def meteredWithSpanCapture[A](operation: String)(io: IO[A]): IO[(Span, A)] = { + Resource + .make{ + for { + ctx <- IO.delay(Kamon.currentContext()) + parentSpan <- IO.delay(Kamon.currentSpan()) + span <- IO.delay(Kamon.spanBuilder(operation).context(ctx).asChildOf(parentSpan).start()) + _ <- IO.delay(Kamon.storeContext(ctx.withEntry(Context.key("span", span), span))) + } yield span + }(span => IO.delay(span.finish())) + .use(_ => (IO.delay(Kamon.currentSpan()), io).parBisequence) + } + + } \ No newline at end of file From c9b0686c13db12a077b89ec0e58d8d8c052eff63 Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Tue, 22 Feb 2022 18:27:50 +0000 Subject: [PATCH 4/7] Fixed tests. --- .../cats/CatsIOInstrumentationSpec.scala | 44 +++++++++++++------ .../src/test/resources/application.conf | 39 ++++++++++++++++ .../kamon-jdbc/src/test/resources/logback.xml | 0 3 files changed, 70 insertions(+), 13 deletions(-) create mode 100644 instrumentation/kamon-jdbc/src/test/resources/application.conf create mode 100644 instrumentation/kamon-jdbc/src/test/resources/logback.xml diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala index 29b5755dc..30d3f7c87 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala @@ -10,12 +10,17 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import java.util.concurrent.Executors +import java.util.concurrent.{Executors, ScheduledExecutorService} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.ExecutionContext.global import scala.concurrent.duration._ import cats.implicits._ -import kamon.trace.Span +import kamon.tag.TagSet +import kamon.trace.Identifier.Scheme +import kamon.trace.Tracer.LocalTailSamplerSettings +import kamon.trace.{ConstantSampler, Identifier, Sampler, Span, Trace} + +import java.time.{Clock, Instant} class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues with Eventually { @@ -23,7 +28,6 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu // NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there // is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the // kamon-executors module should take care of all non-JDK Runnable/Callable implementations. - "an cats.effect IO created when instrumentation is active" should { "capture the active span available when created" which { "must be available across asynchronous boundaries" in { @@ -60,7 +64,12 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu } "must allow complex Span topologies to be created" in { - val context = Context.of("key", "value") + val parentSpan = Span.Remote( + Scheme.Single.spanIdFactory.generate(), + Identifier.Empty, + Trace.create(Scheme.Single.traceIdFactory.generate(), Trace.SamplingDecision.Sample) + ) + val context = Context.of(Span.Key, parentSpan) /** * test * - nestedLevel0 @@ -70,19 +79,22 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu */ val test = for { span <- IO.delay(Kamon.currentSpan()) - nestedLevel0 <- meteredWithSpanCapture("level1-A")(IO.sleep(1.seconds)) - nestedUpToLevel2 <- meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(IO.sleep(1.seconds))) - fiftyInParallel <- (0 to 50).toList.parTraverse(i => meteredWithSpanCapture(s"operation$i")(IO.sleep(1.seconds))) + nestedLevel0 <- meteredWithSpanCapture("level1-A")(IO.sleep(100.millis)) + nestedUpToLevel2 <- meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(IO.sleep(100.millis))) + fiftyInParallel <- (0 to 49).toList.parTraverse(i => meteredWithSpanCapture(s"operation$i")(IO.sleep(100.millis))) + afterEverything <- IO.delay(Kamon.currentSpan()) } yield { span.id.string should not be empty span.id.string shouldBe nestedLevel0._1.parentId.string span.id.string shouldBe nestedUpToLevel2._1.parentId.string nestedUpToLevel2._1.id.string shouldBe nestedUpToLevel2._2._1.parentId.string fiftyInParallel.map(_._1.parentId.string).toSet shouldBe Set(span.id.string) + fiftyInParallel.map(_._1.id.string).toSet should have size 50 + afterEverything.id.string shouldBe span.id.string } val runtime = IORuntime.global - (IO.delay(Kamon.storeContext(context)) *> meteredWithSpanCapture("test")(test)).unsafeRunSync()(runtime) + (IO.delay(Kamon.init()) *> IO.delay(Kamon.storeContext(context)) *> test).unsafeRunSync()(runtime) } } } @@ -94,12 +106,18 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu Resource .make{ for { - ctx <- IO.delay(Kamon.currentContext()) + initialCtx <- IO.delay(Kamon.currentContext()) parentSpan <- IO.delay(Kamon.currentSpan()) - span <- IO.delay(Kamon.spanBuilder(operation).context(ctx).asChildOf(parentSpan).start()) - _ <- IO.delay(Kamon.storeContext(ctx.withEntry(Context.key("span", span), span))) - } yield span - }(span => IO.delay(span.finish())) + newSpan <- IO.delay(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start()) + _ <- IO.delay(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan))) + } yield (initialCtx, newSpan) + }{ + case (initialCtx, span) => + for { + _ <- IO.delay(span.finish()) + _ <- IO.delay(Kamon.storeContext(initialCtx)) + } yield () + } .use(_ => (IO.delay(Kamon.currentSpan()), io).parBisequence) } diff --git a/instrumentation/kamon-jdbc/src/test/resources/application.conf b/instrumentation/kamon-jdbc/src/test/resources/application.conf new file mode 100644 index 000000000..c0ba6d72e --- /dev/null +++ b/instrumentation/kamon-jdbc/src/test/resources/application.conf @@ -0,0 +1,39 @@ +kamon { + + instrumentation.jdbc { + parse-sql-for-operation-name = true + } + + trace { + tick-inteval = 200 milliseconds + hooks.pre-start = [ "kamon.trace.Hooks$PreStart$FromContext" ] + } + + metric.factory.default-settings { + range-sampler.auto-update-interval = 1 millisecond + } + + jdbc { + slow-query-threshold = 400 milliseconds + + # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. + slow-query-processor = kamon.instrumentation.jdbc.NoOpSlowQueryProcessor + + # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. + sql-error-processor = kamon.instrumentation.jdbc.NoOpSqlErrorProcessor + } +} + +slick-h2 = { + url = "jdbc:h2:mem:slick" + driver = org.h2.Driver + connectionPool = disabled + keepAliveConnection = true +} + +slick-h2-with-pool = { + url = "jdbc:h2:mem:slick_with_pool" + driver = org.h2.Driver + connectionPool = "HikariCP" + keepAliveConnection = true +} diff --git a/instrumentation/kamon-jdbc/src/test/resources/logback.xml b/instrumentation/kamon-jdbc/src/test/resources/logback.xml new file mode 100644 index 000000000..e69de29bb From 596dc708d32abe9aee1584a1d854372d1dab5fc4 Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Thu, 24 Feb 2022 23:00:27 +0000 Subject: [PATCH 5/7] Small improvement? --- .../cats/IOFiberInstrumentation.scala | 122 +++++++++++++++++- ....scala => CatsIoInstrumentationSpec.scala} | 31 ++--- 2 files changed, 131 insertions(+), 22 deletions(-) rename instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/{CatsIOInstrumentationSpec.scala => CatsIoInstrumentationSpec.scala} (84%) diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala index f8b24de44..456e9edf6 100644 --- a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala @@ -1,6 +1,7 @@ package kamon.instrumentation.cats import kamon.Kamon +import kamon.context.Context import kamon.instrumentation.context.HasContext import kanela.agent.api.instrumentation.InstrumentationBuilder import kanela.agent.libs.net.bytebuddy.asm.Advice @@ -8,20 +9,127 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice class IOFiberInstrumentation extends InstrumentationBuilder { onTypes("cats.effect.IOFiber") - .advise(method("runLoop"), IOFiberInstrumentation) + .advise(method("run"), RestoreContextFromFiber) + .advise(method("run"), SaveCurrentContextOnExit) + + onTypes("cats.effect.IOFiber") + .advise(anyMethods( + "rescheduleFiber", + "scheduleFiber", + "scheduleOnForeignEC", + ), SetContextOnNewFiber) + +/* onTypes("cats.effect.IOFiber") + .advise(anyMethods( + "runLoop", + ), Debug) + + onTypes("cats.effect.unsafe.WorkerThread") + .advise(anyMethods("schedule", "reschedule"), DebugWT)*/ + + /* onTypes("cats.effect.IOFiber") + .advise(method("run"), IOFiberInstrumentationSetOnRun) + .advise(method("run"), IOFiberInstrumentationSaveCurrentOnExit) + + + onTypes("cats.effect.IOFiber") + .advise(anyMethods( + "rescheduleFiber", + "scheduleFiber", + "scheduleOnForeignEC", + ), IOFiberInstrumentationSaveCurrentOnExit) + +*/ + + +} +/*object Helper { + def padTo(obj: Any, len: Int): String = + obj.toString.take(len).padTo(len, " ").mkString("") + } +import Helper._*/ + -object IOFiberInstrumentation { +object RestoreContextFromFiber { @Advice.OnMethodEnter(suppress = classOf[Throwable]) def enter(@Advice.This fiber: Any): Unit = { - //println(s"Entering: ${fiber.asInstanceOf[HasContext].context.tags.get(Lookups.plain("key"))}, ${Thread.currentThread().getName}") - Kamon.storeContext(fiber.asInstanceOf[HasContext].context) + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"RestoreCtx(run)| Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + val ctxFiber = fiber.asInstanceOf[HasContext].context + if(ctxFiber.nonEmpty()){ + Kamon.storeContext(ctxFiber) + } } +} +object SaveCurrentContextOnExit { @Advice.OnMethodExit(suppress = classOf[Throwable]) def exit(@Advice.This fiber: Any): Unit = { - // println(s"Exiting: ${fiber.asInstanceOf[HasContext].context.tags.get(Lookups.plain("key"))}, ${Thread.currentThread().getName}") - fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) - Kamon.storeContext(null) + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"SaveCtx | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + if(currentCtx.nonEmpty()){ + fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) + } + //Kamon.storeContext(null) + } +} + +object SetContextOnNewFiber { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.Argument(1) fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"SetOnFiber | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + if(currentCtx.nonEmpty()){ + fiber.asInstanceOf[HasContext].setContext(currentCtx) + } + + } +} + + + + + + + +/* +object Debug { + + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This fiber: Any, @Advice.Argument(0) io :Any,@Advice.Argument(1) cancellation: Int, @Advice.Argument(2) autoCede: Int): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + println(s"RunLoop(Enter) | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(cancellation, 5)} | Autocede: ${autoCede} | Thread ${Thread.currentThread().getName}") + } + + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.This fiber: Any, @Advice.Argument(0) io :Any,@Advice.Argument(1) cancellation: Int, @Advice.Argument(2) autoCede: Int): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + println(s"RunLoop(Exit) | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(cancellation, 5)} | Autocede: ${autoCede} | Thread ${Thread.currentThread().getName}") } } + +object DebugWT { + + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(0, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + } + + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(0, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + Kamon.storeContext(fiber.asInstanceOf[HasContext].context) + } +}*/ diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala similarity index 84% rename from instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala rename to instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala index 30d3f7c87..48f9def61 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala @@ -10,17 +10,13 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import java.util.concurrent.{Executors, ScheduledExecutorService} -import scala.concurrent.{ExecutionContext, Future} +import java.util.concurrent.Executors +import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.ExecutionContext.global import scala.concurrent.duration._ import cats.implicits._ -import kamon.tag.TagSet import kamon.trace.Identifier.Scheme -import kamon.trace.Tracer.LocalTailSamplerSettings -import kamon.trace.{ConstantSampler, Identifier, Sampler, Span, Trace} - -import java.time.{Clock, Instant} +import kamon.trace.{Identifier, Span, Trace} class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues with Eventually { @@ -70,6 +66,7 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu Trace.create(Scheme.Single.traceIdFactory.generate(), Trace.SamplingDecision.Sample) ) val context = Context.of(Span.Key, parentSpan) + implicit val ec = ExecutionContext.global /** * test * - nestedLevel0 @@ -82,6 +79,7 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu nestedLevel0 <- meteredWithSpanCapture("level1-A")(IO.sleep(100.millis)) nestedUpToLevel2 <- meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(IO.sleep(100.millis))) fiftyInParallel <- (0 to 49).toList.parTraverse(i => meteredWithSpanCapture(s"operation$i")(IO.sleep(100.millis))) + afterCede <- meteredWithSpanCapture("cede")(IO.cede *> IO.delay(Kamon.currentSpan())) afterEverything <- IO.delay(Kamon.currentSpan()) } yield { span.id.string should not be empty @@ -90,11 +88,15 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu nestedUpToLevel2._1.id.string shouldBe nestedUpToLevel2._2._1.parentId.string fiftyInParallel.map(_._1.parentId.string).toSet shouldBe Set(span.id.string) fiftyInParallel.map(_._1.id.string).toSet should have size 50 + afterCede._1.id.string shouldBe afterCede._2.id.string //A cede should not cause the span to be lost afterEverything.id.string shouldBe span.id.string } - val runtime = IORuntime.global - (IO.delay(Kamon.init()) *> IO.delay(Kamon.storeContext(context)) *> test).unsafeRunSync()(runtime) + + val result = scala.concurrent.Future.sequence( + (1 to 100).toList.map(_ => (IO.delay(Kamon.init()) *> IO.delay(Kamon.storeContext(context)) *> test).unsafeToFuture()(runtime)) + ) + Await.result(result, 10.seconds) } } } @@ -103,13 +105,12 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu } private def meteredWithSpanCapture[A](operation: String)(io: IO[A]): IO[(Span, A)] = { - Resource - .make{ + Resource.make{ for { - initialCtx <- IO.delay(Kamon.currentContext()) - parentSpan <- IO.delay(Kamon.currentSpan()) - newSpan <- IO.delay(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start()) - _ <- IO.delay(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan))) + initialCtx <- IO(Kamon.currentContext()) + parentSpan <- IO(Kamon.currentSpan()) + newSpan <- IO(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start()) + _ <- IO(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan))) } yield (initialCtx, newSpan) }{ case (initialCtx, span) => From b02bff128852ba6f0eb1dd2f0c27a4ade08cc57e Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Fri, 25 Feb 2022 15:24:04 +0000 Subject: [PATCH 6/7] A few more changes. --- .../cats/IOFiberInstrumentation.scala | 125 +++++++++++------- .../cats/CatsIoInstrumentationSpec.scala | 33 ++++- 2 files changed, 103 insertions(+), 55 deletions(-) diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala index 456e9edf6..185983c1b 100644 --- a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala @@ -8,6 +8,17 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice class IOFiberInstrumentation extends InstrumentationBuilder { + + /**Approach: RunLoop Instrumentation**/ + //onTypes("cats.effect.IOFiber") + // .advise(anyMethods( + // "runLoop", + // ), InstrumentRunLoop) + /**Approach: RunLoop Instrumentation**/ + + + + /**Approach: Instrumenting run() and "forks" **/ onTypes("cats.effect.IOFiber") .advise(method("run"), RestoreContextFromFiber) .advise(method("run"), SaveCurrentContextOnExit) @@ -18,37 +29,35 @@ class IOFiberInstrumentation extends InstrumentationBuilder { "scheduleFiber", "scheduleOnForeignEC", ), SetContextOnNewFiber) + onTypes("cats.effect.unsafe.WorkStealingThreadPool") + .advise(anyMethods("scheduleFiber", "rescheduleFiber", "scheduleExternal"),SetContextOnNewFiberForWSTP) + /**Approach: More efficient solution**/ + + -/* onTypes("cats.effect.IOFiber") + /**Debug: begin**/ + onTypes("cats.effect.IOFiber") .advise(anyMethods( "runLoop", ), Debug) - - onTypes("cats.effect.unsafe.WorkerThread") - .advise(anyMethods("schedule", "reschedule"), DebugWT)*/ - - /* onTypes("cats.effect.IOFiber") - .advise(method("run"), IOFiberInstrumentationSetOnRun) - .advise(method("run"), IOFiberInstrumentationSaveCurrentOnExit) - - - onTypes("cats.effect.IOFiber") - .advise(anyMethods( - "rescheduleFiber", - "scheduleFiber", - "scheduleOnForeignEC", - ), IOFiberInstrumentationSaveCurrentOnExit) - -*/ - + /**Debug: end**/ } -/*object Helper { +object Helper { def padTo(obj: Any, len: Int): String = obj.toString.take(len).padTo(len, " ").mkString("") + def setIfNotEmpty(f: HasContext)(ctx: Context): Unit = + if(ctx.nonEmpty()){ + f.setContext(ctx) + } + + def setCurrentCtxIfNotEmpty(ctx: Context): Unit = + if(ctx.nonEmpty()){ + Kamon.storeContext(ctx) + } } -import Helper._*/ +import Helper._ object RestoreContextFromFiber { @@ -56,11 +65,9 @@ object RestoreContextFromFiber { def enter(@Advice.This fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - //println(s"RestoreCtx(run)| Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val ctxFiber = fiber.asInstanceOf[HasContext].context - if(ctxFiber.nonEmpty()){ - Kamon.storeContext(ctxFiber) - } + setCurrentCtxIfNotEmpty(ctxFiber) } } @@ -69,67 +76,85 @@ object SaveCurrentContextOnExit { def exit(@Advice.This fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - //println(s"SaveCtx | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val currentCtx = Kamon.currentContext() - if(currentCtx.nonEmpty()){ - fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) - } - //Kamon.storeContext(null) + setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) } } + object SetContextOnNewFiber { @Advice.OnMethodEnter(suppress = classOf[Throwable]) - def enter(@Advice.Argument(1) fiber: Any): Unit = { + def enter(@Advice.This currFiber: Any, @Advice.Argument(1) fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - //println(s"SetOnFiber | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(currFiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val currentCtx = Kamon.currentContext() - if(currentCtx.nonEmpty()){ - fiber.asInstanceOf[HasContext].setContext(currentCtx) - } - + setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) } } +object SetContextOnNewFiberForWSTP { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.Argument(0) fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("unknown", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) + } +} +object InstrumentRunLoop { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val ctxFiber = fiber.asInstanceOf[HasContext].context + Kamon.storeContext(ctxFiber) + } + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: {${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + fiber.asInstanceOf[HasContext].setContext(currentCtx) + } +} - - - -/* object Debug { @Advice.OnMethodEnter(suppress = classOf[Throwable]) - def enter(@Advice.This fiber: Any, @Advice.Argument(0) io :Any,@Advice.Argument(1) cancellation: Int, @Advice.Argument(2) autoCede: Int): Unit = { + def enter(@Advice.This fiber: Any, @Advice.Argument(0) io :Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"RunLoop(Enter) | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(cancellation, 5)} | Autocede: ${autoCede} | Thread ${Thread.currentThread().getName}") + println(s"runLoop(Enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") } @Advice.OnMethodExit(suppress = classOf[Throwable]) - def exit(@Advice.This fiber: Any, @Advice.Argument(0) io :Any,@Advice.Argument(1) cancellation: Int, @Advice.Argument(2) autoCede: Int): Unit = { + def exit(@Advice.This fiber: Any, @Advice.Argument(0) io :Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"RunLoop(Exit) | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Cancellation: ${padTo(cancellation, 5)} | Autocede: ${autoCede} | Thread ${Thread.currentThread().getName}") + println(s"runLoop(Exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") } } object DebugWT { @Advice.OnMethodEnter(suppress = classOf[Throwable]) - def enter(@Advice.This fiber: Any): Unit = { + def enter(@Advice.Argument(0) fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(0, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") + println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") } @Advice.OnMethodExit(suppress = classOf[Throwable]) - def exit(@Advice.This fiber: Any): Unit = { + def exit(@Advice.Argument(0) fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | FiberId: ${fiber.hashCode()} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(0, 25)} | Cancellation: ${padTo(0, 5)} | Autocede: ${0} | Thread ${Thread.currentThread().getName}") - Kamon.storeContext(fiber.asInstanceOf[HasContext].context) + println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") } -}*/ +} diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala index 48f9def61..78c9f1edb 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala @@ -1,6 +1,6 @@ package kamon.instrumentation.futures.cats -import cats.effect.unsafe.IORuntime +import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler} import cats.effect.{IO, Resource, Spawn} import kamon.Kamon import kamon.context.Context @@ -26,14 +26,37 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu // kamon-executors module should take care of all non-JDK Runnable/Callable implementations. "an cats.effect IO created when instrumentation is active" should { "capture the active span available when created" which { - "must be available across asynchronous boundaries" in { + "must allow the context to be cleaned" in { val runtime = IORuntime.global val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)) val context = Context.of("key", "value") val test = for { - scope <- IO.delay(Kamon.storeContext(context)) + _ <- IO.delay(Kamon.storeContext(Context.Empty)) + _ <- IO.delay(Kamon.storeContext(context)) + _ <- Spawn[IO].evalOn(IO.sleep(0.seconds), anotherExecutionContext) + afterCleaning <- IO.delay(Kamon.currentContext()) + } yield { + afterCleaning shouldBe Context.Empty + } + + test.unsafeRunSync()(runtime) + } + + "must be available across asynchronous boundaries" in { + val runtime = IORuntime.apply( + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)), //pool 4 + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)), // pool 5 + Scheduler.fromScheduledExecutor(Executors.newSingleThreadScheduledExecutor()), //pool 6 + () => (), + IORuntimeConfig.apply() + ) + val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) //pool 7 + val context = Context.of("key", "value") + val test = + for { + scope <- IO.delay({println("Context will be set"); Kamon.storeContext(context);}) len <- IO("Hello Kamon!").map(_.length) _ <- IO(len.toString) beforeChanging <- getKey() @@ -59,7 +82,7 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu test.unsafeRunSync()(runtime) } - "must allow complex Span topologies to be created" in { + /*"must allow complex Span topologies to be created" in { val parentSpan = Span.Remote( Scheme.Single.spanIdFactory.generate(), Identifier.Empty, @@ -97,7 +120,7 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu (1 to 100).toList.map(_ => (IO.delay(Kamon.init()) *> IO.delay(Kamon.storeContext(context)) *> test).unsafeToFuture()(runtime)) ) Await.result(result, 10.seconds) - } + }*/ } } private def getKey(): IO[String] = { From f9ddb404917e6ed51fd78488862da51ac55156f6 Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Thu, 3 Mar 2022 15:22:26 +0000 Subject: [PATCH 7/7] Fixed misconfigured test. --- .../cats/IOFiberInstrumentation.scala | 21 ++++++++++--------- .../cats/CatsIoInstrumentationSpec.scala | 7 ++++--- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala index 185983c1b..5db08cbdc 100644 --- a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala @@ -1,5 +1,6 @@ package kamon.instrumentation.cats +import cats.effect.{IO, IOLocal} import kamon.Kamon import kamon.context.Context import kamon.instrumentation.context.HasContext @@ -65,7 +66,7 @@ object RestoreContextFromFiber { def enter(@Advice.This fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val ctxFiber = fiber.asInstanceOf[HasContext].context setCurrentCtxIfNotEmpty(ctxFiber) } @@ -76,7 +77,7 @@ object SaveCurrentContextOnExit { def exit(@Advice.This fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val currentCtx = Kamon.currentContext() setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) } @@ -88,7 +89,7 @@ object SetContextOnNewFiber { def enter(@Advice.This currFiber: Any, @Advice.Argument(1) fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(currFiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(currFiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val currentCtx = Kamon.currentContext() setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) } @@ -99,7 +100,7 @@ object SetContextOnNewFiberForWSTP { def enter(@Advice.Argument(0) fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("unknown", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("unknown", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val currentCtx = Kamon.currentContext() setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) } @@ -110,7 +111,7 @@ object InstrumentRunLoop { def enter(@Advice.This fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val ctxFiber = fiber.asInstanceOf[HasContext].context Kamon.storeContext(ctxFiber) } @@ -119,7 +120,7 @@ object InstrumentRunLoop { def exit(@Advice.This fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: {${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: {${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") val currentCtx = Kamon.currentContext() fiber.asInstanceOf[HasContext].setContext(currentCtx) } @@ -131,14 +132,14 @@ object Debug { def enter(@Advice.This fiber: Any, @Advice.Argument(0) io :Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"runLoop(Enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"runLoop(Enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") } @Advice.OnMethodExit(suppress = classOf[Throwable]) def exit(@Advice.This fiber: Any, @Advice.Argument(0) io :Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"runLoop(Exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"runLoop(Exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") } } @@ -148,13 +149,13 @@ object DebugWT { def enter(@Advice.Argument(0) fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") } @Advice.OnMethodExit(suppress = classOf[Throwable]) def exit(@Advice.Argument(0) fiber: Any): Unit = { val field = fiber.getClass.getDeclaredField("resumeTag") field.setAccessible(true) - println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") + //println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") } } diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala index 78c9f1edb..7b6579a57 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala @@ -33,9 +33,10 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu val context = Context.of("key", "value") val test = for { - _ <- IO.delay(Kamon.storeContext(Context.Empty)) _ <- IO.delay(Kamon.storeContext(context)) _ <- Spawn[IO].evalOn(IO.sleep(0.seconds), anotherExecutionContext) + _ <- IO.delay(Kamon.storeContext(Context.Empty)) + _ <- Spawn[IO].evalOn(IO.sleep(0.seconds), anotherExecutionContext) afterCleaning <- IO.delay(Kamon.currentContext()) } yield { afterCleaning shouldBe Context.Empty @@ -82,7 +83,7 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu test.unsafeRunSync()(runtime) } - /*"must allow complex Span topologies to be created" in { + "must allow complex Span topologies to be created" in { val parentSpan = Span.Remote( Scheme.Single.spanIdFactory.generate(), Identifier.Empty, @@ -120,7 +121,7 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu (1 to 100).toList.map(_ => (IO.delay(Kamon.init()) *> IO.delay(Kamon.storeContext(context)) *> test).unsafeToFuture()(runtime)) ) Await.result(result, 10.seconds) - }*/ + } } } private def getKey(): IO[String] = {