We have two different approach to expose our REST endpoints. First, we can use annotation based controller similar to Spring MVC and the other with functional endpoints using Router Function and Handler Function. In this demo, we will learn both the approach.
You can follow this article to add basic authentication in this example and this example to add JWT authentication.
What is Reactive Programming
Reactive programming is a declarative programming paradigm used to create non-blocking applications that are asynchronous and event-driven and allows an application to react on any changes. A key aspect of reactive applications is the concept of backpressure which is a mechanism to ensure producers don’t overwhelm consumers.
Reactive Programming Features
Responsive - It should respond to any kind of changes which is achieved through Resilient.
Event Driven - Reactive system should be event-driven to facilitate loose coupling. An event propagates data and it is propagated to everyone listening to that event. It is like producer and consumer i.e. observer pattern
Imperative vs Reactive Programming
Below is an example of Imperative programming model. The value of b does not change at line 6 although the value of a has changed at line 5.
int a = 10;
int b = a * 10;
System.out.println(b);
a=3;
System.out.println(b); // line 6
Ideally, variable b should react on the change of variable a in a reactive programming model.
Reactive Programming Model
Reactive programming should be non-blocking, asynchronous and functional or declarative.
Non-blocking : Non-blocking systems basically works on event loop model. In this model, we have limited no of threads handling all the requests. When a request is handled by a thread, the request is divided into multiple events in the event queue and each of these smaller events is executed by different threads in order to complete the whole request. In the meantime, if we have an event in the event queue that has to perform bulkier tasks such as reading from a file system which may take more time to process the request then that execution is again handled by other worker thread and upon the completion of that process or task it emits an event as operation completed and the result can be processed again by the event of event queue.
Servlet API is a Blocking API whereas Servlet API 3.1+ is a non-blocking API.
Image Source : o7planning
Asynchronous : Asynchronous programming is a form of parallel programming where one process does not wait for the completion of another process. This can be realized with an Ajax call.
$("button").click(function(){ $("p").hide("slow", function(){ alert("The paragraph is now hidden"); }); });
To achieve this behavior in Java we can use thread pools, parallel streams and completable future. We have to very careful while using these approach in Java as this may lead to a blocking execution if not used efficiently.
Functional/Declarative Progamming : Functional programming in Java is implemented through Lambda expressions and Immutability. Below is a usage example of the stream. It connects to a source and pulls values from the source and process it. The stream values can only be consumed once by the subscriber.
public static void main(String [] args){ List<Integer> a = Arrays.asList(1, 2, 3, 4, 5); a.stream() .map(num -> process(num)) .forEach(System.out :: println); } private static String process(Integer num) { return "p" + num; }
Reactive Programming with Reactor
The standard library that process streams in an asynchronous way in a non-blocking manner with back-pressure capability is called reactive streams. In this case, the value is actually pushed to the subscriber by the producer rather then the subscriber pulling it from the producer.
a.stream() .map(num -> process(num)) .subscribe(System.out :: println);
The stream API connects to a source and pulls values from the source and the values can be used only once whereas in a reactive streams values are not pulled. They are pushed when the subsriber subsribes to the publisher and hence the back pressure mechanism can be applied. This back pressure is only applicable in reactive stream.
Reactive Streams
Reactive streams provides a contract for asynchronous non-blocking processing of the stream of data with backpressure. There are 3 main abstractions - Publisher, Subscriber and Subscription.
First, the subscriber subscribes to the publisher and the method subscribe() method is called on the publisher and then the subscription object is created and then the onSubscriber() method of Subscriber is called passing the subscription object. The Subscriber must call the request method to signify how much elements a subscriber can process to maintain the back pressure. Now, the subscriber starts receiving elements by onNext().
Now, let us learn how this concept is implemented in Project reactor.
Project Reactor
Reactor is the default programming language of spring webflux but it also supports the use of RxJava at the application level. Reactor provides 2 implementations of the publisher interface provided by reactive streams - Mono and Flux.
Mono to publish 0..1 element and Flux to publish 0..N element. Mono is like the Optional type provided in Java 8. We use Mono to return a single object or VOid and Flux is used to return lists.
Mono ExampleMono.just("A") .log() .subscribe(System.out :: println);Flux Example
Flux.just("A", "B", "C")
.log()
.subscribe(System.out :: println);
Reactor provides many versions of Subscribe method. The data starts flowing from publisher to subscriber once this method is called. Below are the signtures of these methods:
subscribe() subscribe(Consumer super T> consumer, Consumer super Throwable> errorConsumer) subscribe(Consumer super T> consumer, Consumer super Throwable> errorConsumer, Runnable completeConsumer) subscribe(Consumer super T> consumer, Consumer super Throwable> errorConsumer, Runnable completeConsumer, Consumer super Subscription> subscriptionConsumer)
Reactive Operators
There are many reactive operators. We will be discussing about map, flatMap, concat, merge and zip.Map
Map transforms the elements emitted by a Flux to other value by applying a synchronous function to each item.
Flux.just(1, 5, 10) .map(num -> num * 10) .subscribe(System.out :: println);
FlatMap
FlatMap transform the elements emitted by the Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging.
Flux.just(1, 5, 10) .flatMap(num -> Flux.just(num * 10)) .subscribe(System.out :: println);
Concat
Below is 2 different flux that produces elements with a predefined delay. This process runs parallelly and hence we have a Thread.sleep() to view the final result of concat.
Flux<Integer> flux1 = Flux.just(1, 5, 10) .delayElements(Duration.ofMillis(200)); Flux<Integer> flux2 = Flux.just(15, 20, 25) .delayElements(Duration.ofMillis(400)); Mono<Integer> mono1 = Mono.just(1); /*flux1.concat(flux2).concat(mono1) .subscribe(System.out :: println);*/ flux1.concatWith(flux2).concatWith(mono1) .subscribe(System.out :: println); Thread.sleep(4000);
concatWith() returns a Flux whereas concat() returns Mono. Hence, with the commented line of code, you will see only 3 elements in the output.
mergeWith() merge data from a Flux and a Publisher into an interleaved merged sequence whereas concatWith() merge adds with no interleave.
ZIP
It zips given Flux with another Publisher source, that is to say, wait for both to emit one element and combine these elements once into a Tuple
Flux<Integer> flux1 = Flux.just(1, 5, 10) .delayElements(Duration.ofMillis(200)); Flux<Integer> flux2 = Flux.just(15, 20, 25) .delayElements(Duration.ofMillis(400)); Mono<Integer> mono1 = Mono.just(1); flux1.zipWith(flux2)Output
[1,15] [5,20] [10,25]
Spring WebFlux
Spring WebFlux is the new reactive web framework that comes with spring 5. Spring WebFlux is not a replacement of Spring MVC, rather it offers a reactive programming model in spring 5. It is fully non-blocking, supports Reactive Streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.
We have 2 different web stack in Spring 5 - spring-web-mvc and spring-web-reactive. Each module is optional. Applications can use one or the other module or, in some cases, both?—?for example, Spring MVC controllers with the reactive WebClient.
Features:
Supports the same annotation such as @RequestMapping similar to spring web mvc
Works on functional programming model.
The only difference between spring-web-mvc and spring-web-reactive is how the reactive request and response is handled in the reactive application.
In spring WebFLux we have a ServerWebExchange class that acts as a container for HTTP request and response. Similar to HTTPRequest and HTTPResponse we have ServerHttpRequest and ServerHTTPResponse as a reactive version in Spring WebFlux.
Now, let us start building our Spring WebFlux REST APIs that performs CRUD operation on a user entity.
Creating Spring WebFlux Reactive App
Spring WebFlux Project Setup
Head over to start.spring.io and download a sample project with spring reactive web and spring data reactive mongoDB artifact. The spring reactive web provides reactive feature to our applications and reactive mongoDB includes reactive DB driver to perform DB operations asynchronously.
Spring WebFlux Maven Dependency
Below is the pom.xml that includes all the dependencies required for this project.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Project Structure
Below is our final project structure.
Reactive MongoDB Configuration
We already have reactive streams MongoDB driver on our classpath to perform non-blocking DB operations. Now, we need to provide our configuration properties. We have defined these in our application.properties file.
application.propertiesspring.data.mongodb.uri=mongodb://root:root@localhost:27017/test_db
Actually, there are many ways to configure MongoDB with spring boot. You can follow this article to find out different ways to configure MongoDB with Spring Boot.
Model Class Implementation
We have user class defined annotated with @Document
annotation to identify a domain object to be persisted to MongoDB.
@Document public class User { @Id private String id; private String name; private int age; private double salary; public User(){ } public User(String name, int age, double salary){ this.name = name; this.age = age; this.salary = salary; } }
Spring WebFlux REST API Annotation Style
Now, let us implement our APIs in a spring controller class that performs the CRUD operation. Here, UserRepository is injected that extends ReactiveMongoRepository which is an interface with reactive support.
Create Operation
Create operation is a POST request and takes User model object as a request parameter. It makes a call to save() method defined in ReactiveCrudRepository to save the user. The save() method emits the saved entity. Spring internally calls the subscribe() method and hence no need to subscribe() again to get the saved entity.
@PostMapping public Mono<ResponseEntity<User>> create(@RequestBody User user){ return userRepository.save(user) .map(savedUser -> ResponseEntity.ok(savedUser)); }
Get Operation
This operation lists all the users in the DB and wraps the result into ResponseEntity. As discussed above, Flux is more 1..N elements.
@GetMapping public Flux<User> listUsers(){ return userRepository.findAll(); }
Update Operation
Update operation is a PUT request. First, it pulls the user details from the DB by user id, sets the updated parameter and updates user object to DB. It emits bad request exception if the user object is not found in the DB.
@PutMapping("/{userId}") public Mono<ResponseEntity<User>> getUserById(@PathVariable String userId, @RequestBody User user){ return userRepository.findById(userId) .flatMap(dbUser -> { dbUser.setAge(user.getAge()); dbUser.setSalary(user.getSalary()); return userRepository.save(dbUser); }) .map(updatedUser -> ResponseEntity.ok(updatedUser)) .defaultIfEmpty(ResponseEntity.badRequest().build()); }
Delete Operation
The implementation is very similar to update user. As delete()
method returns a Mono of Void, we have to explicitly wrap it into ResponseEntity.
@DeleteMapping("/{userId}") public Mono<ResponseEntity<Void>> deleteUserById(@PathVariable String userId){ return userRepository.findById(userId) .flatMap(existingUser -> userRepository.delete(existingUser) .then(Mono.just(ResponseEntity.ok().<Void>build())) ) .defaultIfEmpty(ResponseEntity.notFound().build()); }
Spring WebFlux Repository Implementation
The UserRepository extends ReactiveMongoRepository which is a Mongo specific interface wih reactive support. We have not defined any custom method for our use. We are using pre-defined methods in ReactiveCrudRepository. Spring Boot automatically plugs in an implementation of this interface called SimpleReactiveMongoRepository at runtime.
You can visit this article to write dynamic MongoDB queries using Spring Data MongoDB.
UserRepository.javapublic interface UserRepository extends ReactiveMongoRepository{ }
Server Side Events
Server-Sent Events (SSE) is a server push technology enabling a browser to receive automatic updates from a server via HTTP connection. In a server-sent event, a web page automatically gets updates from a server and the server produces the response in a text/event-stream format.
Below is our implementation that generates event in a duration of a second and this would be very useful in streaming server side events to browsers.
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<UserEvent> emitEvents(){ return Flux.interval(Duration.ofSeconds(1)) .map(val -> new UserEvent("" + val, "User Event")); }
To view this in reality, after deploying the app, you can hit localhost:8080/users/events.
Putting It All Together
@RestController @RequestMapping("/users") public class UserController { @Autowired private UserRepository userRepository; @PostMapping public Mono<ResponseEntity<User>> create(@RequestBody User user){ return userRepository.save(user) .map(savedUser -> ResponseEntity.ok(savedUser)); } @GetMapping public Flux<User> listUsers(){ return userRepository.findAll(); } @GetMapping("/{userId}") public Mono<ResponseEntity<User>> getUserById(@PathVariable String userId){ return userRepository.findById(userId) .map(user -> ResponseEntity.ok(user)) .defaultIfEmpty(ResponseEntity.notFound().build()); } @PutMapping("/{userId}") public Mono<ResponseEntity<User>> getUserById(@PathVariable String userId, @RequestBody User user){ return userRepository.findById(userId) .flatMap(dbUser -> { dbUser.setAge(user.getAge()); dbUser.setSalary(user.getSalary()); return userRepository.save(dbUser); }) .map(updatedUser -> ResponseEntity.ok(updatedUser)) .defaultIfEmpty(ResponseEntity.badRequest().build()); } @DeleteMapping("/{userId}") public Mono<ResponseEntity<Void>> deleteUserById(@PathVariable String userId){ return userRepository.findById(userId) .flatMap(existingUser -> userRepository.delete(existingUser) .then(Mono.just(ResponseEntity.ok().<Void>build())) ) .defaultIfEmpty(ResponseEntity.notFound().build()); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<UserEvent> emitEvents(){ return Flux.interval(Duration.ofSeconds(1)) .map(val -> new UserEvent("" + val, "Devglan User Event")); } }
Spring WebFlux Functional Endpoints
With this approach, we basically use RouterFunction and HandlerFunction to expose our REST endpoints. Router function has all the route mappings for our APIs for different HTTP methods similar to @RequestMapping and upon the match of a route, the request is then forwarded to a matching handler function that handles the request for further processing.
Router Function
Router Function represents a function that routes the request to a Handler function. It is a function that takes a ServerRequest as input and returns a Mono
Below is our implementation of Router function. This is the same implementation in the functional style of the APIs that we exposed above using @RequestMapping annotation.
@Configuration public class BeanConfig { @Autowired private UserRepository userRepository; @Bean public RouterFunction<ServerResponse> route() { UserHandler userHandler = new UserHandler(userRepository); return RouterFunctions .route(POST("/users").and(contentType(APPLICATION_JSON)), userHandler::createUser) .andRoute(GET("/users").and(accept(APPLICATION_JSON)), userHandler::listUser) .andRoute(GET("/users/{userId}").and(accept(APPLICATION_JSON)), userHandler::getUserById) .andRoute(PUT("/users").and(accept(APPLICATION_JSON)), userHandler :: createUser) .andRoute(DELETE("/users/userId").and(accept(APPLICATION_JSON)), userHandler :: deleteUser) .andRoute(GET("/users/events/stream").and(accept(APPLICATION_JSON)), userHandler :: streamEvents); } }
On the match of any route, the request is handled by the corresponding Handler function. Below is our handler function.
HandlerFunction is a function that takes a ServerRequest and returns a Mono
public class UserHandler { private UserRepository userRepository; public UserHandler(UserRepository userRepository){ this.userRepository = userRepository; } public Mono<ServerResponse> createUser(ServerRequest request) { Mono<User> userMono = request.bodyToMono(User.class).flatMap(user -> userRepository.save(user)); return ServerResponse.ok().contentType(APPLICATION_JSON).body(userMono, User.class); } public Mono<ServerResponse> listUser(ServerRequest request) { Flux<User> user = userRepository.findAll(); return ServerResponse.ok().contentType(APPLICATION_JSON).body(user, User.class); } public Mono<ServerResponse> getUserById(ServerRequest request) { String userId = request.pathVariable("userId"); Mono<ServerResponse> notFound = ServerResponse.notFound().build(); Mono<User> userMono = userRepository.findById(userId); return userMono.flatMap(user -> ServerResponse.ok() .contentType(APPLICATION_JSON) .body(BodyInserters.fromObject(user))) .switchIfEmpty(notFound); } public Mono<ServerResponse> deleteUser(ServerRequest request) { String userId = request.pathVariable("userId"); userRepository.deleteById(userId); return ServerResponse.ok().build(); } public Mono<ServerResponse> streamEvents(ServerRequest serverRequest){ return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(Flux.interval(Duration.ofSeconds(1)) .map(val -> new UserEvent("" + val, "Devglan User Event")), UserEvent.class); } }
Enable CORS in Spring WebFlux
Suppose, your server is running at 8080 port and any browser client app is running at 3000 port, then it is very obvious to get this error as Response to preflight request doesn't pass access control check: No 'Access-Control-Allow-Origin' header is present on the requested resource. Origin 'http://127.0.0.1:3000' is therefore not allowed access.
This is to stop executing cross browser scripts. To prevent this, we can addCorsMappings that will set the header Access-Control-Allow-Origin as http://127.0.0.1:3000 so that the browser is happy or else we can make * during development phase.
@Configuration @EnableWebFlux public class CORSConfig implements WebFluxConfigurer { @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/**") .allowedOrigins("*") .allowedMethods("GET, POST, PUT, DELETE") .allowedHeaders("*"); } }
Populating Dummy Data using CommandLineRunner
Let us see the capabilities of Reactive programming to populate dummy user records in the DB during application startup. It actually cleans the DB everytime you run this app and creates new entry. With this, you can also see the operators that we discussed above in action.
@SpringBootApplication public class ReactiveApiDemoApplication { public static void main(String[] args) { SpringApplication.run(ReactiveApiDemoApplication.class, args); } @Bean CommandLineRunner run(UserRepository userRepository) { return args -> { userRepository.deleteAll() .thenMany(Flux.just( new User("Dhiraj", 23, 3456), new User("Mike", 34, 3421), new User("Hary", 21, 8974) ) .flatMap(userRepository::save)) .thenMany(userRepository.findAll()) .subscribe(System.out::println); }; } }
Testing the App
GET Request: As we have a command line runner implementation that saves user document in the MongoDB at application start, let us first use the GET operation to list the users from DB.
GET Request to Fetch By Id:
Update Request:
Conclusion
In this tutorial, we learnt about reactive programming in Spring WebFlux and created REST endpoints to perform different CRUD operations. Next, you can follow this article to add basic authentication in this app using spring webflux reactive security.