To understand what a Flux and Mono is, context is important. This context has layers, and in our case, there are four.
Layer 1 -> The Reactive Manifesto
The Flux and Mono concepts all echo back to the Reactive Manifesto. The manifesto has a goal rooted in creating more resilient, robust, flexible systems which it calls Reactive Systems.
Reactive Systems are defined by four tenets:
- Responsive
- Resilient
- Elastic
- Message Driven
To read more, here is The Reactive Manifesto
Layer 2 -> Reactive Streams API and JDK 9 Flow API
The Reactive Manifesto addresses the reasons why systems should be Reactive, and the benefits that come with being Reactive. Reactive Streams API is the how, but so is JDK 9 Flow API. How will the system become Reactive? The first steps are to create a shared understanding, a contract on how a system can adhere to the four tenets of the Reactive Manifesto.
Reactive Streams API lays out contracts for a framework, in four parts:
Publisher<T>
Subscriber<T>
Subscription
Processor<T, R>
The JDK 9 Flow API lays out very similar (read: identical) contracts, also in four parts:
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription
Flow.Processor<T, R>
JDK 9 Flow API, as the name suggests, is only available in Java 9 and above, whereas Reactive Streams JVM API is supported by Java 6 and above, which allows much wider adoption.
To read more,
Layer 3 -> Reactive Streams Implementations
An API by itself is just a contract, who fulfills the contract? There are a few in the JVM ecosystem.
There is RxJava, short for Reactive Extensions for Java. RxJava is part of the ReactiveX API set of projects that focus on creating asynchronous observable streams. RxJava adopts Reactive Streams API, but ReactiveX has a rich API definition itself which RxJava adheres to. Since version 2 of RxJava, the maintainers boast of a single dependency in the project: Reactive Streams.
There is also Project Reactor. Project Reactor is an implementation of the Reactive Streams API for the JVM. Maintained by Pivotal as an open source project, it is now integrated into the Spring 5 ecosystem with projects such as Spring Webflux, Spring Cloud Stream, and others.
Flux
and Mono
concepts were created in Project Reactor implementation of Reactive Streams API. But what are they?
To read more,
Layer 4 -> Flux and Mono are Publishers
Rewinding to Reactive Streams API, one of the contracts that it defines is the concept of a Publisher
, of a defined type. To put it simply, a Publisher
is a stream of objects that can potentially go on forever (continuous) or it can be limited to a certain number of objects (discrete). A publisher is inherently lazy, and that means you have to subscribe to the stream for the stream to push objects to the subscriber. Under the hood, a Subscription
is created when a publisher is subscribed to.
Mono
A Mono
is a type of a Publisher that can publish either one object or no objects.
Examples
A Mono of one integer
The examples that follow are written in Kotlin
import reactor.core.publisher.Mono
val monoOfInteger: Mono<Int> = Mono.just(123)
// subscribe to Mono since it does not push unless subscribed to
monoOfInteger.subscribe { intInStream -> println(intInStream) }
A Mono of no integers (no objects in the stream)
import reactor.core.publisher.Mono
val streamOfNoIntegers: Mono<Int> = Mono.empty()
// subscribe to the Mono but it will not do anything since it is empty
streamOfNoIntegers.subscribe { intInStream -> println(intInStream) }
Flux
A Flux
is a type of Publisher that can publish either many objects (potentially endless) or no objects.
Examples
A Flux of a discrete amount of integers
import reactor.core.publisher.Flux
val fluxOfIntegers: Flux<Int> = Flux.just(9,8,7,6,5,4,3,2,1)
fluxOfIntegers.subscribe { intInStream -> println(intInStream)}
A Flux of no integers (no objects in the stream)
import reactor.core.publisher.Flux
val fluxOfNoIntegers: Flux<Int> = Flux.empty()
// subscribe to the Flux but it will not do anything since it is empty
fluxOfNoIntegers.subscribe { intInStream -> println(intInStream)}
A Flux of continuous data (endless), with a delay of 300ms between each publish event
import reactor.core.publisher.Flux
import java.time.Duration
import kotlin.random.Random.Default.nextInt
val fluxOfIntegers: Flux<Int> = Flux.generate { sink -> sink.next(nextInt()) }
fluxOfIntegers
.delayElements(Duration.ofMillis(300))
.subscribe { intInStream -> println(intInStream)}
Flux and Mono are publishers equipped with the ability to react to how subscribers accept the data. The concept of backpressure is introduced in Reactive Streams API that allows subscribers to dictate the amount of traffic they can handle and at what pace, so the subscriber is never overloaded by the amount of objects the publisher produces. When a Subscription
is established between a Publisher
and Subscriber
, the subscriber demands data through the subscription which the publisher honors. Via the Subscription
contract, control of the flow of data is established.