Skip to content

Links, materials and notes about reactive programming with reactor

License

Notifications You must be signed in to change notification settings

akiraly/playing-reactor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

31 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Intro to Reactive Programming on the JVM with Reactor

Links, materials and notes about reactive programming with reactor

What is Reactive Programming?

Reactive programming [...] is a subset of asynchronous programming and a paradigm where the availability of new information drives the logic forward rather than having control flow driven by a thread-of-execution. [ref]

Reactive Programming vs Reactive System

They are not the same thing!

A reactive system is an architectural style that allows multiple individual applications to coalesce as a single unit, reacting to its surroundings, while remaining aware of each other—this could manifest as being able to scale up/down, load balancing, and even taking some of these steps proactively. [ref]

We are focusing on Reactive Programming (the coding part) here not on the System.

Reactive Programming on the JVM

  1. Reactive Streams: The API + TCK
  2. RxJava 2: Java 6 (and Android) compatible implementation of Reactive Streams
  3. Reactor: Java 8 based implementation of Reactive Streams. Developed by the Spring Team, to be used in Spring 5 and onwards.
  4. Java 9 Flow API: The Reactive Streams API copy pasted into java.util.concurrent.Flow class. More details here

We are focusing on Reactor here.

The Reactive Streams API

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}

public interface Subscription {
    void request(long n);
    void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

Publisher, Subscriber and Subscription[ref]

Marble diagrams?

Example: example marble diagram[ref]

Backpressure?

the ability for the consumer to signal the producer that the rate of emission is too high for it to keep up [ref]

Flux

Flux marble[ref]

Represents stream of 0 to many elements. Sends 0..N signals + an error or completion signal.

It is similar to java.util.Stream but even more similar to java.util.Iterable as a Flux is reusable.

Example:

Flux<String> flux = Flux.just("Hello,", " world!"); // creates Flux with a static method

flux.subscribe(); // need to subscribe to start the data flow
flux.block(); // subscribes + waits... this operation exits the reactor world.

Mono

Mono marble[ref]

Represents 0 to 1 elements. Sends 0..1 signals + an error or completion signal.

It is similar to java.util.Optional and java.util.concurrent.CompletableFuture. Mono is reusable as well.

Example:

Mono<String> mono = Mono.just("Hello, world!");

mono.subscribe(); // need to subscribe to start the data flow
mono.block(); // subscribes + waits... this operation exits the reactor world.

Schedulers

Nothing happens asynchronously - unless we say so explicitly.

This where Schedulers come into picture: just think about them as thread pools. You use different kind of thread pools for different kind of operations.

  • Schedulers.parallel() -> CPU intensive operations
  • Schedulers.elastic() -> Everything that involves waiting, for example db/network/file/ldap write/read.

Differences to Collection.parallelStream() and CompletableFuture.supplyAsync(). The problem with default ForkJoinPool.

Example:

Flux<Integer> flux = Flux.just(10, 15)
	.log() // built in logging, uses slf4j if on classpath
	.publishOn(Schedulers.parallel()) // or subscribeOn()
	.log()
	.flatMap(i -> Mono.fromSupplier(() -> i * 2).log().publishOn(Schedulers.parallel()))
	.log()
	.publishOn(Schedulers.parallel())
	.log()
	.map(i -> i - 2)
	.log();

flux.blockLast();

Output:

[main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[main] INFO reactor.Flux.PublishOn.2 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.FlatMap.3 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.PublishOn.4 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | request(unbounded)
[main] INFO reactor.Flux.PublishOn.4 - | request(unbounded)
[main] INFO reactor.Flux.FlatMap.3 - request(256)
[main] INFO reactor.Flux.PublishOn.2 - | request(256)
[main] INFO reactor.Flux.Array.1 - | request(256)
[main] INFO reactor.Flux.Array.1 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onNext(15)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onComplete()
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(15)
[parallel-3] INFO reactor.Flux.FlatMap.3 - onNext(20)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(20)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(18)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onNext(30)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(30)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(28)
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onComplete()
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onComplete()

Another example:

Scheduler emitter = Schedulers.newElastic("emitter");
Scheduler transformer = Schedulers.newElastic("transformer");
Scheduler consumer = Schedulers.newParallel("consumer");

Flux<Integer> flux = Flux.<Integer>push(e -> {
  // imagine reading from DB row by row or from file line by line
  for (int fi = 0; fi < 30; fi++) {
	Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
	e.next(fi);
  }
})
	.log()
	.subscribeOn(emitter)
	.log();

flux.flatMap(
	// could be some other IO like reading from a second database
	i -> Mono.fromSupplier(() -> i + " - " + i * 2)
		.log()
		.subscribeOn(transformer)
		.log()
		.publishOn(consumer))
	.log()
	.collectList()
	.log().block();

Another example (what is the use case?):

Flux<LocalDateTime> flux = Flux.<LocalDateTime>create(e -> {
  Schedulers.newSingle("brc", true)
	  .schedulePeriodically(
		  () -> {
			LOGGER.info("calculating...");
			e.next(LocalDateTime.now(ZoneOffset.UTC));
		  },
		  0, 100, TimeUnit.MILLISECONDS);
}, OverflowStrategy.LATEST).cache(1);

// ...

flux.blockFirst();

Things that you typically don't need to use when using Flux/Mono/Schedulers

  • synchronized
  • Lock, ReentrantLock, Semaphore
  • CountDownLatch, CyclicBarrier
  • ThreadPoolExecutor, ScheduledThreadPoolExecutor
  • Future, CompletableFuture

Pitfalls to be aware of

As with any concurrent programming tool anything (typicall library) that's relying the thread not changing will break if you do it on multiple threds.

  1. ThreadLocal. A typical programmer never (or at least very, very rarely) should use ThreadLocal directly. However libraries do use it:
    • Slf4j MDC (Mapped Diagnostic Context).
    • Transaction handling with Spring JDBC or Hibernate or MQ-s: Now this is a tricky one. If you have a transaction then you have the following options (as I see it):
      1. Leave the whole operation single threaded
      2. Create a single threaded Scheduler / transaction and push every transactional operation onto that scheduler (including transaction begin and transaction commit/rollback)
      3. You say goodbye to your battle proven libraries and roll your own transaction handling which does not depend on ThreadLocal - only do this if you feel invincible.
  2. Make sure you don't overflow your downstreams (which does not yet support reactive programming)

Articles / Links

General

  1. Reactive Programming wikipedia
  2. The Reactive Manifesto
  3. Reactive Streams
  4. Reactive programming vs. Reactive systems
  5. An update on Reactive Streams and what's coming in Java 9 by Viktor Klang
  6. Introduction to Reactive Programming
  7. Servlet vs Reactive Stacks in Five Use Cases - presentation
  8. The Reactive Scrabble benchmarks by akarnokd blog
  9. An Introduction to Functional Reactive Programming
  10. Introduction to Reactive Streams for Java Developers
  11. RxMarbles: Interactive diagrams of Rx Observables
  12. Marble diagrams examples

Reactor

  1. Reactor reference
  2. Reactor javadoc
  3. Project Reactor
  4. Reactor github
  5. Reactor by Example
  6. Lite Rx API Hands-On with Reactor Core 3
  7. Reactive Programming With Spring Reactor
  8. Reactor – Simple Ways to create Flux/Mono
  9. Reactive Programming with Reactor 3 (interview + playground)
  10. Building Reactive Rest APIs with Spring WebFlux and Reactive MongoDB
  11. File Reading in Reactor

RxJava

  1. 5 Things to Know About Reactive Programming
  2. Best resource for learning RxJava
  3. Marble Diagrams - Rxjava operators
  4. Loading files with backpressure - RxJava FAQ
  5. Introduction to Reactive Programming by a core developer of Vert.x (interview + 2 playgrounds)

About

Links, materials and notes about reactive programming with reactor

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages