Links, materials and notes about reactive programming with reactor
Reactive programming [...] is a subset of asynchronous programming and a paradigm where the availability of new information drives the logic forward rather than having control flow driven by a thread-of-execution. [ref]
They are not the same thing!
A reactive system is an architectural style that allows multiple individual applications to coalesce as a single unit, reacting to its surroundings, while remaining aware of each other—this could manifest as being able to scale up/down, load balancing, and even taking some of these steps proactively. [ref]
We are focusing on Reactive Programming (the coding part) here not on the System.
- Reactive Streams: The API + TCK
- RxJava 2: Java 6 (and Android) compatible implementation of Reactive Streams
- Reactor: Java 8 based implementation of Reactive Streams. Developed by the Spring Team, to be used in Spring 5 and onwards.
- Java 9 Flow API: The Reactive Streams API copy pasted into java.util.concurrent.Flow class. More details here
We are focusing on Reactor here.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
public interface Subscription {
void request(long n);
void cancel();
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Example: [ref]
the ability for the consumer to signal the producer that the rate of emission is too high for it to keep up [ref]
Represents stream of 0 to many elements. Sends 0..N signals + an error or completion signal.
It is similar to java.util.Stream
but even more similar to java.util.Iterable
as a Flux
is reusable.
Flux<String> flux = Flux.just("Hello,", " world!"); // creates Flux with a static method
flux.subscribe(); // need to subscribe to start the data flow
flux.block(); // subscribes + waits... this operation exits the reactor world.
Represents 0 to 1 elements. Sends 0..1 signals + an error or completion signal.
It is similar to java.util.Optional
and java.util.concurrent.CompletableFuture
. Mono
is reusable as well.
Mono<String> mono = Mono.just("Hello, world!");
mono.subscribe(); // need to subscribe to start the data flow
mono.block(); // subscribes + waits... this operation exits the reactor world.
Nothing happens asynchronously - unless we say so explicitly.
This where Schedulers
come into picture: just think about them as thread pools. You use different kind of thread pools for different kind of operations.
-> CPU intensive operationsSchedulers.elastic()
-> Everything that involves waiting, for example db/network/file/ldap write/read.
Differences to Collection.parallelStream()
and CompletableFuture.supplyAsync()
. The problem with default ForkJoinPool
Flux<Integer> flux = Flux.just(10, 15)
.log() // built in logging, uses slf4j if on classpath
.publishOn(Schedulers.parallel()) // or subscribeOn()
.flatMap(i -> Mono.fromSupplier(() -> i * 2).log().publishOn(Schedulers.parallel()))
.map(i -> i - 2)
[main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[main] INFO reactor.Flux.PublishOn.2 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.FlatMap.3 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.PublishOn.4 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | request(unbounded)
[main] INFO reactor.Flux.PublishOn.4 - | request(unbounded)
[main] INFO reactor.Flux.FlatMap.3 - request(256)
[main] INFO reactor.Flux.PublishOn.2 - | request(256)
[main] INFO reactor.Flux.Array.1 - | request(256)
[main] INFO reactor.Flux.Array.1 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onNext(15)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onComplete()
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(15)
[parallel-3] INFO reactor.Flux.FlatMap.3 - onNext(20)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(20)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(18)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onNext(30)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(30)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(28)
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onComplete()
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onComplete()
Another example:
Scheduler emitter = Schedulers.newElastic("emitter");
Scheduler transformer = Schedulers.newElastic("transformer");
Scheduler consumer = Schedulers.newParallel("consumer");
Flux<Integer> flux = Flux.<Integer>push(e -> {
// imagine reading from DB row by row or from file line by line
for (int fi = 0; fi < 30; fi++) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);;
// could be some other IO like reading from a second database
i -> Mono.fromSupplier(() -> i + " - " + i * 2)
Another example (what is the use case?):
Flux<LocalDateTime> flux = Flux.<LocalDateTime>create(e -> {
Schedulers.newSingle("brc", true)
() -> {"calculating...");;
0, 100, TimeUnit.MILLISECONDS);
}, OverflowStrategy.LATEST).cache(1);
// ...
As with any concurrent programming tool anything (typicall library) that's relying the thread not changing will break if you do it on multiple threds.
. A typical programmer never (or at least very, very rarely) should useThreadLocal
directly. However libraries do use it:- Slf4j MDC (Mapped Diagnostic Context).
- Transaction handling with Spring JDBC or Hibernate or MQ-s: Now this is a tricky one. If you have a transaction then you have the following options (as I see it):
- Leave the whole operation single threaded
- Create a single threaded
/ transaction and push every transactional operation onto that scheduler (including transaction begin and transaction commit/rollback) - You say goodbye to your battle proven libraries and roll your own transaction handling which does not depend on
- only do this if you feel invincible.
- Make sure you don't overflow your downstreams (which does not yet support reactive programming)
