Links, materials and notes about reactive programming with reactor
Reactive programming [...] is a subset of asynchronous programming and a paradigm where the availability of new information drives the logic forward rather than having control flow driven by a thread-of-execution. [ref]
They are not the same thing!
A reactive system is an architectural style that allows multiple individual applications to coalesce as a single unit, reacting to its surroundings, while remaining aware of each other—this could manifest as being able to scale up/down, load balancing, and even taking some of these steps proactively. [ref]
We are focusing on Reactive Programming (the coding part) here not on the System.
- Reactive Streams: The API + TCK
- RxJava 2: Java 6 (and Android) compatible implementation of Reactive Streams
- Reactor: Java 8 based implementation of Reactive Streams. Developed by the Spring Team, to be used in Spring 5 and onwards.
- Java 9 Flow API: The Reactive Streams API copy pasted into java.util.concurrent.Flow class. More details here
We are focusing on Reactor here.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
public interface Subscription {
void request(long n);
void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
[ref]
Example: [ref]
the ability for the consumer to signal the producer that the rate of emission is too high for it to keep up [ref]
[ref]
Represents stream of 0 to many elements. Sends 0..N signals + an error or completion signal.
It is similar to java.util.Stream
but even more similar to java.util.Iterable
as a Flux
is reusable.
Example:
Flux<String> flux = Flux.just("Hello,", " world!"); // creates Flux with a static method
flux.subscribe(); // need to subscribe to start the data flow
flux.block(); // subscribes + waits... this operation exits the reactor world.
[ref]
Represents 0 to 1 elements. Sends 0..1 signals + an error or completion signal.
It is similar to java.util.Optional
and java.util.concurrent.CompletableFuture
. Mono
is reusable as well.
Example:
Mono<String> mono = Mono.just("Hello, world!");
mono.subscribe(); // need to subscribe to start the data flow
mono.block(); // subscribes + waits... this operation exits the reactor world.
Nothing happens asynchronously - unless we say so explicitly.
This where Schedulers
come into picture: just think about them as thread pools. You use different kind of thread pools for different kind of operations.
Schedulers.parallel()
-> CPU intensive operationsSchedulers.elastic()
-> Everything that involves waiting, for example db/network/file/ldap write/read.
Differences to Collection.parallelStream()
and CompletableFuture.supplyAsync()
. The problem with default ForkJoinPool
.
Example:
Flux<Integer> flux = Flux.just(10, 15)
.log() // built in logging, uses slf4j if on classpath
.publishOn(Schedulers.parallel()) // or subscribeOn()
.log()
.flatMap(i -> Mono.fromSupplier(() -> i * 2).log().publishOn(Schedulers.parallel()))
.log()
.publishOn(Schedulers.parallel())
.log()
.map(i -> i - 2)
.log();
flux.blockLast();
Output:
[main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[main] INFO reactor.Flux.PublishOn.2 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.FlatMap.3 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.PublishOn.4 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | request(unbounded)
[main] INFO reactor.Flux.PublishOn.4 - | request(unbounded)
[main] INFO reactor.Flux.FlatMap.3 - request(256)
[main] INFO reactor.Flux.PublishOn.2 - | request(256)
[main] INFO reactor.Flux.Array.1 - | request(256)
[main] INFO reactor.Flux.Array.1 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onNext(15)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onComplete()
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(15)
[parallel-3] INFO reactor.Flux.FlatMap.3 - onNext(20)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(20)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(18)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onNext(30)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(30)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(28)
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onComplete()
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onComplete()
Another example:
Scheduler emitter = Schedulers.newElastic("emitter");
Scheduler transformer = Schedulers.newElastic("transformer");
Scheduler consumer = Schedulers.newParallel("consumer");
Flux<Integer> flux = Flux.<Integer>push(e -> {
// imagine reading from DB row by row or from file line by line
for (int fi = 0; fi < 30; fi++) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
e.next(fi);
}
})
.log()
.subscribeOn(emitter)
.log();
flux.flatMap(
// could be some other IO like reading from a second database
i -> Mono.fromSupplier(() -> i + " - " + i * 2)
.log()
.subscribeOn(transformer)
.log()
.publishOn(consumer))
.log()
.collectList()
.log().block();
Another example (what is the use case?):
Flux<LocalDateTime> flux = Flux.<LocalDateTime>create(e -> {
Schedulers.newSingle("brc", true)
.schedulePeriodically(
() -> {
LOGGER.info("calculating...");
e.next(LocalDateTime.now(ZoneOffset.UTC));
},
0, 100, TimeUnit.MILLISECONDS);
}, OverflowStrategy.LATEST).cache(1);
// ...
flux.blockFirst();
synchronized
Lock
,ReentrantLock
,Semaphore
CountDownLatch
,CyclicBarrier
ThreadPoolExecutor
,ScheduledThreadPoolExecutor
Future
,CompletableFuture
As with any concurrent programming tool anything (typicall library) that's relying the thread not changing will break if you do it on multiple threds.
ThreadLocal
. A typical programmer never (or at least very, very rarely) should useThreadLocal
directly. However libraries do use it:- Slf4j MDC (Mapped Diagnostic Context).
- Transaction handling with Spring JDBC or Hibernate or MQ-s: Now this is a tricky one. If you have a transaction then you have the following options (as I see it):
- Leave the whole operation single threaded
- Create a single threaded
Scheduler
/ transaction and push every transactional operation onto that scheduler (including transaction begin and transaction commit/rollback) - You say goodbye to your battle proven libraries and roll your own transaction handling which does not depend on
ThreadLocal
- only do this if you feel invincible.
- Make sure you don't overflow your downstreams (which does not yet support reactive programming)
- Reactive Programming wikipedia
- The Reactive Manifesto
- Reactive Streams
- Reactive programming vs. Reactive systems
- An update on Reactive Streams and what's coming in Java 9 by Viktor Klang
- Introduction to Reactive Programming
- Servlet vs Reactive Stacks in Five Use Cases - presentation
- The Reactive Scrabble benchmarks by akarnokd blog
- An Introduction to Functional Reactive Programming
- Introduction to Reactive Streams for Java Developers
- RxMarbles: Interactive diagrams of Rx Observables
- Marble diagrams examples
- Reactor reference
- Reactor javadoc
- Project Reactor
- Reactor github
- Reactor by Example
- Lite Rx API Hands-On with Reactor Core 3
- Reactive Programming With Spring Reactor
- Reactor – Simple Ways to create Flux/Mono
- Reactive Programming with Reactor 3 (interview + playground)
- Building Reactive Rest APIs with Spring WebFlux and Reactive MongoDB
- File Reading in Reactor